290 lines
8.6 KiB
Python
290 lines
8.6 KiB
Python
"""Memory and disk cache helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import shutil
|
|
from dataclasses import asdict, dataclass, field
|
|
from pathlib import Path
|
|
from time import time
|
|
from typing import Any
|
|
|
|
from platformdirs import user_cache_dir
|
|
|
|
from .exceptions import CacheError
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class CacheStats:
|
|
"""Public cache statistics snapshot."""
|
|
|
|
memory_tiles: int = 0
|
|
memory_max_tiles: int = 0
|
|
memory_hits: int = 0
|
|
memory_misses: int = 0
|
|
disk_bytes: int = 0
|
|
disk_max_bytes: int | None = None
|
|
disk_hits: int = 0
|
|
disk_misses: int = 0
|
|
disk_path: Path | None = None
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class MemoryCacheConfig:
|
|
"""Initial memory cache configuration."""
|
|
|
|
max_tiles: int = 512
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class DiskCacheConfig:
|
|
"""Initial persistent disk cache configuration."""
|
|
|
|
path: Path | None = None
|
|
max_bytes: int | None = 2_000_000_000
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class MemoryCacheEntry:
|
|
"""Metadata for one in-memory tile."""
|
|
|
|
tile_id: object
|
|
size_bytes: int = 0
|
|
last_accessed_at: float = field(default_factory=time)
|
|
protected: bool = False
|
|
texture_tag: object | None = None
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class MemoryCacheModel:
|
|
"""Small LRU metadata model for decoded/runtime tiles."""
|
|
|
|
max_tiles: int = 512
|
|
entries: dict[object, MemoryCacheEntry] = field(default_factory=dict)
|
|
hits: int = 0
|
|
misses: int = 0
|
|
|
|
def record_access(self, tile_id: object) -> MemoryCacheEntry | None:
|
|
"""Mark an entry as recently used and return it if present."""
|
|
|
|
entry = self.entries.get(tile_id)
|
|
if entry is None:
|
|
self.misses += 1
|
|
return None
|
|
self.hits += 1
|
|
entry.last_accessed_at = time()
|
|
return entry
|
|
|
|
def put(self, entry: MemoryCacheEntry) -> None:
|
|
"""Insert or replace entry metadata."""
|
|
|
|
entry.last_accessed_at = time()
|
|
self.entries[entry.tile_id] = entry
|
|
|
|
def plan_evictions(self) -> list[object]:
|
|
"""Return tile IDs that can be evicted without touching GUI resources."""
|
|
|
|
overflow = len(self.entries) - self.max_tiles
|
|
if overflow <= 0:
|
|
return []
|
|
candidates = [entry for entry in self.entries.values() if not entry.protected]
|
|
candidates.sort(key=lambda entry: entry.last_accessed_at)
|
|
return [entry.tile_id for entry in candidates[:overflow]]
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class DiskCacheMetadata:
|
|
"""Persistent metadata stored next to a tile file."""
|
|
|
|
url: str = ""
|
|
etag: str | None = None
|
|
last_modified: str | None = None
|
|
expires: str | None = None
|
|
downloaded_at: float = 0.0
|
|
last_accessed_at: float = 0.0
|
|
size_bytes: int = 0
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class DiskCacheEntry:
|
|
"""Scanned disk cache file plus metadata."""
|
|
|
|
tile_path: Path
|
|
metadata_path: Path
|
|
metadata: DiskCacheMetadata
|
|
|
|
|
|
def default_cache_dir() -> Path:
|
|
"""Return the default persistent cache directory."""
|
|
|
|
return Path(user_cache_dir("dpg-map", appauthor=False))
|
|
|
|
|
|
def disk_cache_root(cache_dir: str | Path | None = None) -> Path:
|
|
"""Resolve the disk cache root path."""
|
|
|
|
return Path(cache_dir).expanduser() if cache_dir is not None else default_cache_dir()
|
|
|
|
|
|
def tile_cache_path(
|
|
cache_dir: str | Path | None,
|
|
provider_name: str,
|
|
z: int,
|
|
x: int,
|
|
y: int,
|
|
extension: str | None = None,
|
|
) -> Path:
|
|
"""Return the provider-namespaced persistent tile path."""
|
|
|
|
ext = (extension or "png").lstrip(".")
|
|
safe_provider = provider_name.replace("/", "_")
|
|
return disk_cache_root(cache_dir) / safe_provider / str(z) / str(x) / f"{y}.{ext}"
|
|
|
|
|
|
def tile_metadata_path(tile_path: Path) -> Path:
|
|
"""Return the metadata path for a tile path."""
|
|
|
|
return tile_path.with_suffix(".json")
|
|
|
|
|
|
def read_disk_metadata(path: Path) -> DiskCacheMetadata:
|
|
"""Read a metadata JSON file, returning defaults for missing metadata."""
|
|
|
|
if not path.exists():
|
|
return DiskCacheMetadata()
|
|
try:
|
|
raw: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
|
|
except OSError as exc:
|
|
raise CacheError(f"could not read cache metadata: {path}") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise CacheError(f"invalid cache metadata JSON: {path}") from exc
|
|
|
|
return DiskCacheMetadata(
|
|
url=str(raw.get("url", "")),
|
|
etag=raw.get("etag"),
|
|
last_modified=raw.get("last_modified"),
|
|
expires=raw.get("expires"),
|
|
downloaded_at=float(raw.get("downloaded_at", 0.0)),
|
|
last_accessed_at=float(raw.get("last_accessed_at", 0.0)),
|
|
size_bytes=int(raw.get("size_bytes", 0)),
|
|
)
|
|
|
|
|
|
def write_disk_metadata(path: Path, metadata: DiskCacheMetadata) -> None:
|
|
"""Write metadata JSON next to a tile file."""
|
|
|
|
try:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(asdict(metadata), sort_keys=True), encoding="utf-8")
|
|
except OSError as exc:
|
|
raise CacheError(f"could not write cache metadata: {path}") from exc
|
|
|
|
|
|
def touch_disk_metadata(path: Path, *, accessed_at: float | None = None) -> None:
|
|
"""Update only the last access timestamp for a metadata file."""
|
|
|
|
metadata = read_disk_metadata(path)
|
|
write_disk_metadata(
|
|
path,
|
|
DiskCacheMetadata(
|
|
url=metadata.url,
|
|
etag=metadata.etag,
|
|
last_modified=metadata.last_modified,
|
|
expires=metadata.expires,
|
|
downloaded_at=metadata.downloaded_at,
|
|
last_accessed_at=time() if accessed_at is None else accessed_at,
|
|
size_bytes=metadata.size_bytes,
|
|
),
|
|
)
|
|
|
|
|
|
def scan_disk_cache(cache_dir: str | Path | None) -> list[DiskCacheEntry]:
|
|
"""Scan tile files under a disk cache root."""
|
|
|
|
root = disk_cache_root(cache_dir)
|
|
if not root.exists():
|
|
return []
|
|
entries: list[DiskCacheEntry] = []
|
|
for path in root.rglob("*"):
|
|
if not path.is_file() or path.suffix == ".json":
|
|
continue
|
|
metadata_path = tile_metadata_path(path)
|
|
metadata = read_disk_metadata(metadata_path)
|
|
size_bytes = metadata.size_bytes or path.stat().st_size
|
|
if size_bytes != metadata.size_bytes:
|
|
metadata = DiskCacheMetadata(
|
|
url=metadata.url,
|
|
etag=metadata.etag,
|
|
last_modified=metadata.last_modified,
|
|
expires=metadata.expires,
|
|
downloaded_at=metadata.downloaded_at,
|
|
last_accessed_at=metadata.last_accessed_at,
|
|
size_bytes=size_bytes,
|
|
)
|
|
entries.append(DiskCacheEntry(path, metadata_path, metadata))
|
|
return entries
|
|
|
|
|
|
def disk_cache_size_bytes(cache_dir: str | Path | None) -> int:
|
|
"""Return total bytes for cached tile files."""
|
|
|
|
return sum(entry.metadata.size_bytes for entry in scan_disk_cache(cache_dir))
|
|
|
|
|
|
def plan_disk_prune(
|
|
cache_dir: str | Path | None,
|
|
max_bytes: int | None,
|
|
*,
|
|
protected_paths: set[Path] | None = None,
|
|
) -> list[Path]:
|
|
"""Return tile paths that should be pruned by LRU order without deleting them."""
|
|
|
|
if max_bytes is None:
|
|
return []
|
|
protected = {path.resolve() for path in protected_paths or set()}
|
|
entries = scan_disk_cache(cache_dir)
|
|
total = sum(entry.metadata.size_bytes for entry in entries)
|
|
if total <= max_bytes:
|
|
return []
|
|
|
|
candidates = [entry for entry in entries if entry.tile_path.resolve() not in protected]
|
|
candidates.sort(key=lambda entry: entry.metadata.last_accessed_at)
|
|
prune: list[Path] = []
|
|
for entry in candidates:
|
|
if total <= max_bytes:
|
|
break
|
|
prune.append(entry.tile_path)
|
|
total -= entry.metadata.size_bytes
|
|
return prune
|
|
|
|
|
|
def prune_disk_cache(
|
|
cache_dir: str | Path | None,
|
|
max_bytes: int | None,
|
|
*,
|
|
protected_paths: set[Path] | None = None,
|
|
) -> list[Path]:
|
|
"""Delete LRU tile files until the cache fits the configured limit."""
|
|
|
|
planned = plan_disk_prune(cache_dir, max_bytes, protected_paths=protected_paths)
|
|
for path in planned:
|
|
metadata_path = tile_metadata_path(path)
|
|
try:
|
|
path.unlink(missing_ok=True)
|
|
metadata_path.unlink(missing_ok=True)
|
|
except OSError as exc:
|
|
raise CacheError(f"could not prune cached tile: {path}") from exc
|
|
return planned
|
|
|
|
|
|
def clear_disk_cache_path(cache_dir: str | Path | None) -> None:
|
|
"""Remove all persistent tile cache files under a cache root."""
|
|
|
|
root = disk_cache_root(cache_dir)
|
|
if not root.exists():
|
|
return
|
|
try:
|
|
shutil.rmtree(root)
|
|
except OSError as exc:
|
|
raise CacheError(f"could not clear disk cache: {root}") from exc
|