"""Memory and disk cache helpers.""" from __future__ import annotations import json import shutil from dataclasses import asdict, dataclass, field from pathlib import Path from time import time from typing import Any from platformdirs import user_cache_dir from .exceptions import CacheError @dataclass(frozen=True, slots=True) class CacheStats: """Public cache statistics snapshot.""" memory_tiles: int = 0 memory_max_tiles: int = 0 memory_hits: int = 0 memory_misses: int = 0 disk_bytes: int = 0 disk_max_bytes: int | None = None disk_hits: int = 0 disk_misses: int = 0 disk_path: Path | None = None @dataclass(slots=True) class MemoryCacheConfig: """Initial memory cache configuration.""" max_tiles: int = 512 @dataclass(slots=True) class DiskCacheConfig: """Initial persistent disk cache configuration.""" path: Path | None = None max_bytes: int | None = 2_000_000_000 @dataclass(slots=True) class MemoryCacheEntry: """Metadata for one in-memory tile.""" tile_id: object size_bytes: int = 0 last_accessed_at: float = field(default_factory=time) protected: bool = False texture_tag: object | None = None @dataclass(slots=True) class MemoryCacheModel: """Small LRU metadata model for decoded/runtime tiles.""" max_tiles: int = 512 entries: dict[object, MemoryCacheEntry] = field(default_factory=dict) hits: int = 0 misses: int = 0 def record_access(self, tile_id: object) -> MemoryCacheEntry | None: """Mark an entry as recently used and return it if present.""" entry = self.entries.get(tile_id) if entry is None: self.misses += 1 return None self.hits += 1 entry.last_accessed_at = time() return entry def put(self, entry: MemoryCacheEntry) -> None: """Insert or replace entry metadata.""" entry.last_accessed_at = time() self.entries[entry.tile_id] = entry def plan_evictions(self) -> list[object]: """Return tile IDs that can be evicted without touching GUI resources.""" overflow = len(self.entries) - self.max_tiles if overflow <= 0: return [] candidates = [entry for entry in self.entries.values() if not entry.protected] candidates.sort(key=lambda entry: entry.last_accessed_at) return [entry.tile_id for entry in candidates[:overflow]] @dataclass(frozen=True, slots=True) class DiskCacheMetadata: """Persistent metadata stored next to a tile file.""" url: str = "" etag: str | None = None last_modified: str | None = None expires: str | None = None downloaded_at: float = 0.0 last_accessed_at: float = 0.0 size_bytes: int = 0 @dataclass(frozen=True, slots=True) class DiskCacheEntry: """Scanned disk cache file plus metadata.""" tile_path: Path metadata_path: Path metadata: DiskCacheMetadata def default_cache_dir() -> Path: """Return the default persistent cache directory.""" return Path(user_cache_dir("dpg-map", appauthor=False)) def disk_cache_root(cache_dir: str | Path | None = None) -> Path: """Resolve the disk cache root path.""" return Path(cache_dir).expanduser() if cache_dir is not None else default_cache_dir() def tile_cache_path( cache_dir: str | Path | None, provider_name: str, z: int, x: int, y: int, extension: str | None = None, ) -> Path: """Return the provider-namespaced persistent tile path.""" ext = (extension or "png").lstrip(".") safe_provider = provider_name.replace("/", "_") return disk_cache_root(cache_dir) / safe_provider / str(z) / str(x) / f"{y}.{ext}" def tile_metadata_path(tile_path: Path) -> Path: """Return the metadata path for a tile path.""" return tile_path.with_suffix(".json") def read_disk_metadata(path: Path) -> DiskCacheMetadata: """Read a metadata JSON file, returning defaults for missing metadata.""" if not path.exists(): return DiskCacheMetadata() try: raw: dict[str, Any] = json.loads(path.read_text(encoding="utf-8")) except OSError as exc: raise CacheError(f"could not read cache metadata: {path}") from exc except json.JSONDecodeError as exc: raise CacheError(f"invalid cache metadata JSON: {path}") from exc return DiskCacheMetadata( url=str(raw.get("url", "")), etag=raw.get("etag"), last_modified=raw.get("last_modified"), expires=raw.get("expires"), downloaded_at=float(raw.get("downloaded_at", 0.0)), last_accessed_at=float(raw.get("last_accessed_at", 0.0)), size_bytes=int(raw.get("size_bytes", 0)), ) def write_disk_metadata(path: Path, metadata: DiskCacheMetadata) -> None: """Write metadata JSON next to a tile file.""" try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(asdict(metadata), sort_keys=True), encoding="utf-8") except OSError as exc: raise CacheError(f"could not write cache metadata: {path}") from exc def touch_disk_metadata(path: Path, *, accessed_at: float | None = None) -> None: """Update only the last access timestamp for a metadata file.""" metadata = read_disk_metadata(path) write_disk_metadata( path, DiskCacheMetadata( url=metadata.url, etag=metadata.etag, last_modified=metadata.last_modified, expires=metadata.expires, downloaded_at=metadata.downloaded_at, last_accessed_at=time() if accessed_at is None else accessed_at, size_bytes=metadata.size_bytes, ), ) def scan_disk_cache(cache_dir: str | Path | None) -> list[DiskCacheEntry]: """Scan tile files under a disk cache root.""" root = disk_cache_root(cache_dir) if not root.exists(): return [] entries: list[DiskCacheEntry] = [] for path in root.rglob("*"): if not path.is_file() or path.suffix == ".json": continue metadata_path = tile_metadata_path(path) metadata = read_disk_metadata(metadata_path) size_bytes = metadata.size_bytes or path.stat().st_size if size_bytes != metadata.size_bytes: metadata = DiskCacheMetadata( url=metadata.url, etag=metadata.etag, last_modified=metadata.last_modified, expires=metadata.expires, downloaded_at=metadata.downloaded_at, last_accessed_at=metadata.last_accessed_at, size_bytes=size_bytes, ) entries.append(DiskCacheEntry(path, metadata_path, metadata)) return entries def disk_cache_size_bytes( cache_dir: str | Path | None, *, provider: str | None = None, ) -> int: """Return total bytes for cached tile files, optionally scoped to one provider.""" if provider is None: return sum(entry.metadata.size_bytes for entry in scan_disk_cache(cache_dir)) safe_provider = provider.replace("/", "_") provider_root = disk_cache_root(cache_dir) / safe_provider if not provider_root.exists(): return 0 return sum(entry.metadata.size_bytes for entry in scan_disk_cache(provider_root)) def plan_disk_prune( cache_dir: str | Path | None, max_bytes: int | None, *, protected_paths: set[Path] | None = None, ) -> list[Path]: """Return tile paths that should be pruned by LRU order without deleting them.""" if max_bytes is None: return [] protected = {path.resolve() for path in protected_paths or set()} entries = scan_disk_cache(cache_dir) total = sum(entry.metadata.size_bytes for entry in entries) if total <= max_bytes: return [] candidates = [entry for entry in entries if entry.tile_path.resolve() not in protected] candidates.sort(key=lambda entry: entry.metadata.last_accessed_at) prune: list[Path] = [] for entry in candidates: if total <= max_bytes: break prune.append(entry.tile_path) total -= entry.metadata.size_bytes return prune def prune_disk_cache( cache_dir: str | Path | None, max_bytes: int | None, *, protected_paths: set[Path] | None = None, ) -> list[Path]: """Delete LRU tile files until the cache fits the configured limit.""" planned = plan_disk_prune(cache_dir, max_bytes, protected_paths=protected_paths) for path in planned: metadata_path = tile_metadata_path(path) try: path.unlink(missing_ok=True) metadata_path.unlink(missing_ok=True) except OSError as exc: raise CacheError(f"could not prune cached tile: {path}") from exc return planned def clear_disk_cache_path(cache_dir: str | Path | None, *, provider: str | None = None) -> None: """Remove persistent tile cache files under a cache root.""" root = disk_cache_root(cache_dir) if provider is not None: root = root / provider.replace("/", "_") if not root.exists(): return try: shutil.rmtree(root) except OSError as exc: raise CacheError(f"could not clear disk cache: {root}") from exc