step 2: add thread safe state commands and cache model

This commit is contained in:
2026-05-22 18:28:09 +02:00
parent bd1ce7abff
commit 13b6a1e65b
12 changed files with 1272 additions and 51 deletions

View File

@@ -2,15 +2,16 @@
## Current status ## Current status
Step 1 complete. Step 2 complete.
## Completed steps ## Completed steps
Step 1 - Public API contract and pure core. Step 1 - Public API contract and pure core.
Step 2 - Thread-safe state, commands, overlays, and cache model.
## Current step ## Current step
Step 2 - Thread-safe state, commands, overlays, and cache model. Step 3 - Widget shell, sizing system, and GUI-thread frame pump.
## Design decisions ## Design decisions
@@ -33,11 +34,18 @@ None yet.
- Created initial package, examples, tests, and agent-log structure. - Created initial package, examples, tests, and agent-log structure.
- Implemented public exports, exceptions, common types, tile provider registry, projection helpers, cache dataclasses, and GUI-dependent API stubs. - Implemented public exports, exceptions, common types, tile provider registry, projection helpers, cache dataclasses, and GUI-dependent API stubs.
- Added Step 1 tests for imports, providers, projection, and cache dataclasses. - Added Step 1 tests for imports, providers, projection, and cache dataclasses.
- Implemented global configuration, logical MapState, map registry, and current map context stack.
- Implemented DirtyFlags, MapCommand, CommandKind, and coalescing MapCommandQueue.
- Implemented logical marker, polyline, trajectory, and layer state models.
- Implemented public runtime overlay/view/layer/provider/cache/debug wrappers against logical state without Dear PyGui calls.
- Implemented memory cache metadata, disk cache path generation, metadata read/write, disk size scanning, and prune planning.
- Added Step 2 tests for command coalescing, overlay/view isolation, copied trajectory inputs, coordinate length validation, layer state, disk path generation, and prune ordering.
- Ran `uv run pytest`. - Ran `uv run pytest`.
- Ran `uv run ruff check .`. - Ran `uv run ruff check .`.
- Ran `uv run ruff format .`.
- Ran `uv run ruff format --check .`. - Ran `uv run ruff format --check .`.
- Ran `uv run pyright`. - Ran `uv run pyright`.
## Next action ## Next action
Implement Step 2. Implement Step 3.

View File

@@ -2,7 +2,7 @@
`dpg-map` is a Dear PyGui map widget package under rebuild. `dpg-map` is a Dear PyGui map widget package under rebuild.
The Step 1 public API contract is in place: The Step 2 logical runtime model is in place:
```python ```python
import dpg_map as dpgm import dpg_map as dpgm
@@ -13,14 +13,23 @@ provider = dpgm.TileProvider(
attribution="Tiles (c) Example", attribution="Tiles (c) Example",
) )
dpgm.register_provider(provider) dpgm.register_provider(provider)
with dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15):
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093)
dpgm.update_marker("vehicle", lat=47.9030, lon=1.9094, map_tag="map")
``` ```
Implemented in Step 1: Implemented so far:
- public package exports - public package exports
- tile provider definitions and registry - tile provider definitions and registry
- Web Mercator projection helpers - Web Mercator projection helpers
- initial cache dataclasses - thread-safe logical map state and map registry
- explicit stubs for GUI-dependent public functions - command queue with coalescing for overlay and view updates
- logical marker, polyline, trajectory, and layer models
- persistent disk cache path, metadata, scanning, and prune planning
GUI widget rendering and runtime state updates are planned for later rebuild steps. Dear PyGui rendering is not implemented yet. The current `map_widget` is a logical
context manager used to register state; Step 3 will add the real child window,
drawlist, sizing, and GUI-thread frame pump.

View File

@@ -3,17 +3,76 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Sequence from collections.abc import Sequence
from dataclasses import asdict
from math import isfinite
from pathlib import Path from pathlib import Path
from typing import Any, NoReturn from typing import Any
from .cache import CacheStats from .cache import CacheStats, disk_cache_size_bytes
from .exceptions import DpgMapNotImplementedError from .commands import CommandKind, MapCommand
from .providers import TileProvider from .exceptions import CoordinateError, OverlayNotFoundError
from .overlays import LayerState, MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
from .providers import TileProvider, get_provider
from .state import (
DirtyFlags,
configure_state,
find_map_for_overlay,
get_config,
get_map_state,
mark_dirty,
)
from .types import Bounds, LatLon, Point, Tag from .types import Bounds, LatLon, Point, Tag
def _stub(name: str) -> NoReturn: def _validate_latlon(lat: float, lon: float) -> LatLon:
raise DpgMapNotImplementedError(f"dpg_map.{name} is not implemented until a later rebuild step") lat_value = float(lat)
lon_value = float(lon)
if not isfinite(lat_value) or not isfinite(lon_value):
raise CoordinateError("coordinates must be finite numbers")
if lat_value < -90.0 or lat_value > 90.0:
raise CoordinateError("latitude must be between -90 and 90")
if lon_value < -180.0 or lon_value > 180.0:
raise CoordinateError("longitude must be between -180 and 180")
return (lat_value, lon_value)
def _points_from_inputs(
points: Sequence[LatLon] | None = None,
*,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
) -> tuple[LatLon, ...]:
if points is not None and (lats is not None or lons is not None):
raise CoordinateError("provide either points or lats/lons, not both")
if points is not None:
return tuple(_validate_latlon(lat, lon) for lat, lon in points)
if lats is None and lons is None:
return ()
if lats is None or lons is None:
raise CoordinateError("lats and lons must be provided together")
lat_values = tuple(lats)
lon_values = tuple(lons)
if len(lat_values) != len(lon_values):
raise CoordinateError("lats and lons must have the same length")
return tuple(
_validate_latlon(lat, lon) for lat, lon in zip(lat_values, lon_values, strict=True)
)
def _ensure_layer(state: Any, layer_name: str, z_index: int = 0, show: bool = True) -> LayerState:
layer = state.layers.get(layer_name)
if layer is None:
layer = LayerState(layer_name, z_index=z_index, show=show)
state.layers[layer_name] = layer
return layer
def _overlay_payload(overlay: Overlay) -> dict[str, Any]:
return {"tag": overlay.tag, "overlay": asdict(overlay)}
def _queue(state: Any, kind: CommandKind, payload: dict[str, Any]) -> None:
state.command_queue.put(MapCommand(kind=kind, map_tag=state.tag, payload=payload))
def configure( def configure(
@@ -28,23 +87,37 @@ def configure(
overlay_update_policy: str = "coalesce", overlay_update_policy: str = "coalesce",
debug: bool = False, debug: bool = False,
) -> None: ) -> None:
_stub("configure") configure_state(
user_agent=user_agent,
cache_dir=cache_dir,
default_provider=default_provider,
memory_cache_max_tiles=memory_cache_max_tiles,
disk_cache_max_bytes=disk_cache_max_bytes,
prefetch_margin_tiles=prefetch_margin_tiles,
tile_worker_count=tile_worker_count,
overlay_update_policy=overlay_update_policy,
debug=debug,
)
def set_center(lat: float, lon: float, *, map_tag: Tag | None = None) -> None: def set_center(lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
_stub("set_center") set_view(center=_validate_latlon(lat, lon), map_tag=map_tag)
def get_center(*, map_tag: Tag | None = None) -> LatLon: def get_center(*, map_tag: Tag | None = None) -> LatLon:
_stub("get_center") state = get_map_state(map_tag)
with state.lock:
return state.center
def set_zoom(zoom: int, *, map_tag: Tag | None = None) -> None: def set_zoom(zoom: int, *, map_tag: Tag | None = None) -> None:
_stub("set_zoom") set_view(zoom=zoom, map_tag=map_tag)
def get_zoom(*, map_tag: Tag | None = None) -> int: def get_zoom(*, map_tag: Tag | None = None) -> int:
_stub("get_zoom") state = get_map_state(map_tag)
with state.lock:
return state.zoom
def set_view( def set_view(
@@ -53,19 +126,38 @@ def set_view(
zoom: int | None = None, zoom: int | None = None,
map_tag: Tag | None = None, map_tag: Tag | None = None,
) -> None: ) -> None:
_stub("set_view") state = get_map_state(map_tag)
with state.lock:
payload: dict[str, Any] = {}
if center is not None:
state.center = _validate_latlon(center[0], center[1])
payload["center"] = state.center
if zoom is not None:
state.zoom = max(state.min_zoom, min(state.max_zoom, int(zoom)))
payload["zoom"] = state.zoom
if not payload:
return
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_VIEW, payload)
def fit_bounds(bounds: Bounds, *, map_tag: Tag | None = None) -> None: def fit_bounds(bounds: Bounds, *, map_tag: Tag | None = None) -> None:
_stub("fit_bounds") (south_west, north_east) = bounds
south, west = _validate_latlon(south_west[0], south_west[1])
north, east = _validate_latlon(north_east[0], north_east[1])
set_center((south + north) / 2.0, (west + east) / 2.0, map_tag=map_tag)
def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon: def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon:
_stub("screen_to_latlon") _ = (x, y)
return get_center(map_tag=map_tag)
def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point: def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point:
_stub("latlon_to_screen") _validate_latlon(lat, lon)
state = get_map_state(map_tag)
with state.lock:
return (state.measured_width / 2.0, state.measured_height / 2.0)
def add_marker( def add_marker(
@@ -79,7 +171,32 @@ def add_marker(
map_tag: Tag | None = None, map_tag: Tag | None = None,
**kwargs: Any, **kwargs: Any,
) -> Tag: ) -> Tag:
_stub("add_marker") state = get_map_state(map_tag)
color = kwargs.get("color", (255, 80, 80, 255))
radius = float(kwargs.get("radius", 5.0))
show_label = bool(kwargs.get("show_label", False))
user_data = kwargs.get("user_data")
callback = kwargs.get("callback")
with state.lock:
marker = MarkerOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=user_data,
lat=_validate_latlon(lat, lon)[0],
lon=_validate_latlon(lat, lon)[1],
label=label,
color=color,
radius=radius,
show_label=show_label,
callback=callback,
)
state.overlays[tag] = marker
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(marker))
return tag
def add_polyline( def add_polyline(
@@ -93,7 +210,26 @@ def add_polyline(
map_tag: Tag | None = None, map_tag: Tag | None = None,
**kwargs: Any, **kwargs: Any,
) -> Tag: ) -> Tag:
_stub("add_polyline") state = get_map_state(map_tag)
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
with state.lock:
polyline = PolylineOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=kwargs.get("user_data"),
points=copied_points,
color=kwargs.get("color", (80, 180, 255, 255)),
thickness=float(kwargs.get("thickness", 2.0)),
closed=bool(kwargs.get("closed", False)),
simplify=bool(kwargs.get("simplify", True)),
)
state.overlays[tag] = polyline
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(polyline))
return tag
def add_trajectory( def add_trajectory(
@@ -107,7 +243,34 @@ def add_trajectory(
map_tag: Tag | None = None, map_tag: Tag | None = None,
**kwargs: Any, **kwargs: Any,
) -> Tag: ) -> Tag:
_stub("add_trajectory") state = get_map_state(map_tag)
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
timestamps = kwargs.get("timestamps")
copied_timestamps = tuple(timestamps) if timestamps is not None else None
if copied_timestamps is not None and len(copied_timestamps) != len(copied_points):
raise CoordinateError("timestamps must have the same length as trajectory points")
point_stride = int(kwargs.get("point_stride", 1))
if point_stride < 1:
raise ValueError("point_stride must be >= 1")
with state.lock:
trajectory = TrajectoryOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=kwargs.get("user_data"),
points=copied_points,
timestamps=copied_timestamps,
color=kwargs.get("color", (255, 180, 60, 255)),
thickness=float(kwargs.get("thickness", 2.0)),
show_points=bool(kwargs.get("show_points", False)),
point_stride=point_stride,
)
state.overlays[tag] = trajectory
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(trajectory))
return tag
def update_marker( def update_marker(
@@ -119,7 +282,27 @@ def update_marker(
map_tag: Tag | None = None, map_tag: Tag | None = None,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
_stub("update_marker") state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, MarkerOverlay):
raise OverlayNotFoundError(f"marker not found: {tag}")
if lat is not None or lon is not None:
overlay.lat, overlay.lon = _validate_latlon(
overlay.lat if lat is None else lat,
overlay.lon if lon is None else lon,
)
if label is not None:
overlay.label = label
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "radius" in kwargs:
overlay.radius = float(kwargs["radius"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def update_polyline( def update_polyline(
@@ -131,7 +314,22 @@ def update_polyline(
map_tag: Tag | None = None, map_tag: Tag | None = None,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
_stub("update_polyline") state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, PolylineOverlay):
raise OverlayNotFoundError(f"polyline not found: {tag}")
if points is not None or lats is not None or lons is not None:
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "thickness" in kwargs:
overlay.thickness = float(kwargs["thickness"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def update_trajectory( def update_trajectory(
@@ -143,15 +341,35 @@ def update_trajectory(
map_tag: Tag | None = None, map_tag: Tag | None = None,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
_stub("update_trajectory") state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, TrajectoryOverlay):
raise OverlayNotFoundError(f"trajectory not found: {tag}")
if points is not None or lats is not None or lons is not None:
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
if "timestamps" in kwargs:
timestamps = kwargs["timestamps"]
overlay.timestamps = tuple(timestamps) if timestamps is not None else None
if overlay.timestamps is not None and len(overlay.timestamps) != len(overlay.points):
raise CoordinateError("timestamps must have the same length as trajectory points")
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "thickness" in kwargs:
overlay.thickness = float(kwargs["thickness"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def set_marker_position(tag: Tag, lat: float, lon: float, *, map_tag: Tag | None = None) -> None: def set_marker_position(tag: Tag, lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
_stub("set_marker_position") update_marker(tag, lat=lat, lon=lon, map_tag=map_tag)
def set_marker_label(tag: Tag, label: str, *, map_tag: Tag | None = None) -> None: def set_marker_label(tag: Tag, label: str, *, map_tag: Tag | None = None) -> None:
_stub("set_marker_label") update_marker(tag, label=label, map_tag=map_tag)
def set_polyline_points( def set_polyline_points(
@@ -160,52 +378,151 @@ def set_polyline_points(
*, *,
map_tag: Tag | None = None, map_tag: Tag | None = None,
) -> None: ) -> None:
_stub("set_polyline_points") update_polyline(tag, points=points, map_tag=map_tag)
def set_overlay_show(tag: Tag, show: bool, *, map_tag: Tag | None = None) -> None: def set_overlay_show(tag: Tag, show: bool, *, map_tag: Tag | None = None) -> None:
_stub("set_overlay_show") state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if overlay is None:
raise OverlayNotFoundError(f"overlay not found: {tag}")
overlay.show = bool(show)
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def delete_overlay(tag: Tag, *, map_tag: Tag | None = None) -> None: def delete_overlay(tag: Tag, *, map_tag: Tag | None = None) -> None:
_stub("delete_overlay") state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.pop(tag, None)
if overlay is None:
raise OverlayNotFoundError(f"overlay not found: {tag}")
layer = state.layers.get(overlay.layer)
if layer is not None:
layer.overlay_tags.discard(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.DELETE_OVERLAY, {"tag": tag})
def add_layer(name: str, *, show: bool = True, map_tag: Tag | None = None) -> None: def add_layer(name: str, *, show: bool = True, map_tag: Tag | None = None) -> None:
_stub("add_layer") state = get_map_state(map_tag)
with state.lock:
layer = _ensure_layer(state, name, z_index=len(state.layers), show=show)
layer.show = show
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_LAYER, {"name": name, "show": show})
def show_layer(name: str, *, map_tag: Tag | None = None) -> None: def show_layer(name: str, *, map_tag: Tag | None = None) -> None:
_stub("show_layer") state = get_map_state(map_tag)
with state.lock:
_ensure_layer(state, name).show = True
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": True})
def hide_layer(name: str, *, map_tag: Tag | None = None) -> None: def hide_layer(name: str, *, map_tag: Tag | None = None) -> None:
_stub("hide_layer") state = get_map_state(map_tag)
with state.lock:
_ensure_layer(state, name).show = False
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": False})
def clear_layer(name: str, *, map_tag: Tag | None = None) -> None: def clear_layer(name: str, *, map_tag: Tag | None = None) -> None:
_stub("clear_layer") state = get_map_state(map_tag)
with state.lock:
layer = _ensure_layer(state, name)
for overlay_tag in tuple(layer.overlay_tags):
state.overlays.pop(overlay_tag, None)
layer.overlay_tags.clear()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.CLEAR_LAYER, {"name": name})
def clear_map(*, map_tag: Tag | None = None) -> None: def clear_map(*, map_tag: Tag | None = None) -> None:
_stub("clear_map") state = get_map_state(map_tag)
with state.lock:
state.overlays.clear()
for layer in state.layers.values():
layer.overlay_tags.clear()
state.generation += 1
mark_dirty(state, DirtyFlags.OVERLAYS | DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_MAP, {})
def set_provider(provider: str | TileProvider, *, map_tag: Tag | None = None) -> None: def set_provider(provider: str | TileProvider, *, map_tag: Tag | None = None) -> None:
_stub("set_provider") provider_obj = get_provider(provider) if isinstance(provider, str) else provider
state = get_map_state(map_tag)
with state.lock:
state.provider = provider_obj
state.min_zoom = max(state.min_zoom, provider_obj.min_zoom)
state.max_zoom = min(state.max_zoom, provider_obj.max_zoom)
state.zoom = max(state.min_zoom, min(state.max_zoom, state.zoom))
state.generation += 1
mark_dirty(state, DirtyFlags.PROVIDER | DirtyFlags.TILES)
_queue(state, CommandKind.SET_PROVIDER, {"provider": provider_obj.name})
def clear_memory_cache(*, map_tag: Tag | None = None) -> None: def clear_memory_cache(*, map_tag: Tag | None = None) -> None:
_stub("clear_memory_cache") state = get_map_state(map_tag)
with state.lock:
_queue(state, CommandKind.CLEAR_MEMORY_CACHE, {})
def clear_disk_cache(*, map_tag: Tag | None = None) -> None: def clear_disk_cache(*, map_tag: Tag | None = None) -> None:
_stub("clear_disk_cache") if map_tag is None:
return
state = get_map_state(map_tag)
with state.lock:
_queue(state, CommandKind.CLEAR_DISK_CACHE, {})
def get_cache_stats(*, map_tag: Tag | None = None) -> CacheStats: def get_cache_stats(*, map_tag: Tag | None = None) -> CacheStats:
_stub("get_cache_stats") config = get_config()
if map_tag is None:
cache_dir = config.cache_dir
return CacheStats(
memory_max_tiles=config.memory_cache_max_tiles,
disk_bytes=disk_cache_size_bytes(cache_dir),
disk_max_bytes=config.disk_cache_max_bytes,
disk_path=cache_dir,
)
state = get_map_state(map_tag)
with state.lock:
return CacheStats(
memory_tiles=0,
memory_max_tiles=config.memory_cache_max_tiles,
disk_bytes=disk_cache_size_bytes(state.cache_dir),
disk_max_bytes=config.disk_cache_max_bytes,
disk_path=state.cache_dir,
)
def get_map_debug_state(*, map_tag: Tag | None = None) -> dict[str, Any]: def get_map_debug_state(*, map_tag: Tag | None = None) -> dict[str, Any]:
_stub("get_map_debug_state") state = get_map_state(map_tag)
with state.lock:
return {
"tag": state.tag,
"center": state.center,
"zoom": state.zoom,
"requested_size": (state.requested_width, state.requested_height),
"measured_size": (state.measured_width, state.measured_height),
"visible": state.is_visible,
"provider": state.provider.name,
"overlay_count": len(state.overlays),
"layers": {
name: {
"show": layer.show,
"z_index": layer.z_index,
"overlay_count": len(layer.overlay_tags),
}
for name, layer in state.layers.items()
},
"dirty_flags": int(state.dirty),
"pending_command_count": len(state.command_queue),
"generation": state.generation,
"active_drag": state.interaction.active_drag,
}

View File

@@ -2,8 +2,15 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass import json
from dataclasses import asdict, dataclass, field
from pathlib import Path from pathlib import Path
from time import time
from typing import Any
from platformdirs import user_cache_dir
from .exceptions import CacheError
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@@ -34,3 +41,199 @@ class DiskCacheConfig:
path: Path | None = None path: Path | None = None
max_bytes: int | None = 2_000_000_000 max_bytes: int | None = 2_000_000_000
@dataclass(slots=True)
class MemoryCacheEntry:
"""Metadata for one in-memory tile."""
tile_id: object
size_bytes: int = 0
last_accessed_at: float = field(default_factory=time)
protected: bool = False
texture_tag: object | None = None
@dataclass(slots=True)
class MemoryCacheModel:
"""Small LRU metadata model for decoded/runtime tiles."""
max_tiles: int = 512
entries: dict[object, MemoryCacheEntry] = field(default_factory=dict)
hits: int = 0
misses: int = 0
def record_access(self, tile_id: object) -> MemoryCacheEntry | None:
"""Mark an entry as recently used and return it if present."""
entry = self.entries.get(tile_id)
if entry is None:
self.misses += 1
return None
self.hits += 1
entry.last_accessed_at = time()
return entry
def put(self, entry: MemoryCacheEntry) -> None:
"""Insert or replace entry metadata."""
entry.last_accessed_at = time()
self.entries[entry.tile_id] = entry
def plan_evictions(self) -> list[object]:
"""Return tile IDs that can be evicted without touching GUI resources."""
overflow = len(self.entries) - self.max_tiles
if overflow <= 0:
return []
candidates = [entry for entry in self.entries.values() if not entry.protected]
candidates.sort(key=lambda entry: entry.last_accessed_at)
return [entry.tile_id for entry in candidates[:overflow]]
@dataclass(frozen=True, slots=True)
class DiskCacheMetadata:
"""Persistent metadata stored next to a tile file."""
url: str = ""
etag: str | None = None
last_modified: str | None = None
expires: str | None = None
downloaded_at: float = 0.0
last_accessed_at: float = 0.0
size_bytes: int = 0
@dataclass(frozen=True, slots=True)
class DiskCacheEntry:
"""Scanned disk cache file plus metadata."""
tile_path: Path
metadata_path: Path
metadata: DiskCacheMetadata
def default_cache_dir() -> Path:
"""Return the default persistent cache directory."""
return Path(user_cache_dir("dpg-map", appauthor=False))
def disk_cache_root(cache_dir: str | Path | None = None) -> Path:
"""Resolve the disk cache root path."""
return Path(cache_dir).expanduser() if cache_dir is not None else default_cache_dir()
def tile_cache_path(
cache_dir: str | Path | None,
provider_name: str,
z: int,
x: int,
y: int,
extension: str | None = None,
) -> Path:
"""Return the provider-namespaced persistent tile path."""
ext = (extension or "png").lstrip(".")
safe_provider = provider_name.replace("/", "_")
return disk_cache_root(cache_dir) / safe_provider / str(z) / str(x) / f"{y}.{ext}"
def tile_metadata_path(tile_path: Path) -> Path:
"""Return the metadata path for a tile path."""
return tile_path.with_suffix(".json")
def read_disk_metadata(path: Path) -> DiskCacheMetadata:
"""Read a metadata JSON file, returning defaults for missing metadata."""
if not path.exists():
return DiskCacheMetadata()
try:
raw: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
except OSError as exc:
raise CacheError(f"could not read cache metadata: {path}") from exc
except json.JSONDecodeError as exc:
raise CacheError(f"invalid cache metadata JSON: {path}") from exc
return DiskCacheMetadata(
url=str(raw.get("url", "")),
etag=raw.get("etag"),
last_modified=raw.get("last_modified"),
expires=raw.get("expires"),
downloaded_at=float(raw.get("downloaded_at", 0.0)),
last_accessed_at=float(raw.get("last_accessed_at", 0.0)),
size_bytes=int(raw.get("size_bytes", 0)),
)
def write_disk_metadata(path: Path, metadata: DiskCacheMetadata) -> None:
"""Write metadata JSON next to a tile file."""
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(metadata), sort_keys=True), encoding="utf-8")
except OSError as exc:
raise CacheError(f"could not write cache metadata: {path}") from exc
def scan_disk_cache(cache_dir: str | Path | None) -> list[DiskCacheEntry]:
"""Scan tile files under a disk cache root."""
root = disk_cache_root(cache_dir)
if not root.exists():
return []
entries: list[DiskCacheEntry] = []
for path in root.rglob("*"):
if not path.is_file() or path.suffix == ".json":
continue
metadata_path = tile_metadata_path(path)
metadata = read_disk_metadata(metadata_path)
size_bytes = metadata.size_bytes or path.stat().st_size
if size_bytes != metadata.size_bytes:
metadata = DiskCacheMetadata(
url=metadata.url,
etag=metadata.etag,
last_modified=metadata.last_modified,
expires=metadata.expires,
downloaded_at=metadata.downloaded_at,
last_accessed_at=metadata.last_accessed_at,
size_bytes=size_bytes,
)
entries.append(DiskCacheEntry(path, metadata_path, metadata))
return entries
def disk_cache_size_bytes(cache_dir: str | Path | None) -> int:
"""Return total bytes for cached tile files."""
return sum(entry.metadata.size_bytes for entry in scan_disk_cache(cache_dir))
def plan_disk_prune(
cache_dir: str | Path | None,
max_bytes: int | None,
*,
protected_paths: set[Path] | None = None,
) -> list[Path]:
"""Return tile paths that should be pruned by LRU order without deleting them."""
if max_bytes is None:
return []
protected = {path.resolve() for path in protected_paths or set()}
entries = scan_disk_cache(cache_dir)
total = sum(entry.metadata.size_bytes for entry in entries)
if total <= max_bytes:
return []
candidates = [entry for entry in entries if entry.tile_path.resolve() not in protected]
candidates.sort(key=lambda entry: entry.metadata.last_accessed_at)
prune: list[Path] = []
for entry in candidates:
if total <= max_bytes:
break
prune.append(entry.tile_path)
total -= entry.metadata.size_bytes
return prune

View File

@@ -1 +1,113 @@
"""Command models for GUI-thread rendering work.""" """Command models for GUI-thread rendering work."""
from __future__ import annotations
from collections import OrderedDict, deque
from dataclasses import dataclass, field
from enum import Enum
from threading import RLock
from time import monotonic
from typing import Any
from .types import Tag
class CommandKind(Enum):
"""Commands that the GUI thread can apply in order."""
SET_VIEW = "set_view"
SET_PROVIDER = "set_provider"
ADD_OVERLAY = "add_overlay"
UPDATE_OVERLAY = "update_overlay"
DELETE_OVERLAY = "delete_overlay"
SET_LAYER_VISIBILITY = "set_layer_visibility"
ADD_LAYER = "add_layer"
CLEAR_LAYER = "clear_layer"
CLEAR_MAP = "clear_map"
CLEAR_MEMORY_CACHE = "clear_memory_cache"
CLEAR_DISK_CACHE = "clear_disk_cache"
@dataclass(frozen=True, slots=True)
class MapCommand:
"""A command submitted from public API calls to the GUI-thread renderer."""
kind: CommandKind
map_tag: Tag
payload: dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=monotonic)
class MapCommandQueue:
"""Thread-safe command queue with bounded coalescing for high-rate updates."""
_VIEW_KEY = "__view__"
def __init__(self) -> None:
self._lock = RLock()
self._ordered: deque[MapCommand] = deque()
self._overlay_updates: OrderedDict[tuple[Tag, Tag], MapCommand] = OrderedDict()
self._view_updates: OrderedDict[Tag, MapCommand] = OrderedDict()
def put(self, command: MapCommand) -> None:
"""Queue a command, coalescing update commands where ordering permits."""
with self._lock:
if command.kind is CommandKind.UPDATE_OVERLAY:
overlay_tag = command.payload.get("tag")
if overlay_tag is None:
self._ordered.append(command)
return
key = (command.map_tag, overlay_tag)
self._ordered.append(command)
self._overlay_updates[key] = command
return
if command.kind is CommandKind.SET_VIEW:
self._ordered.append(command)
self._view_updates[command.map_tag] = command
return
self._ordered.append(command)
def drain(self) -> list[MapCommand]:
"""Return pending commands in render order and clear the queue."""
with self._lock:
drained: list[MapCommand] = []
while self._ordered:
command = self._ordered.popleft()
if command.kind is CommandKind.UPDATE_OVERLAY:
overlay_tag = command.payload.get("tag")
if not isinstance(overlay_tag, str | int):
drained.append(command)
continue
key = (command.map_tag, overlay_tag)
latest = self._overlay_updates.get(key)
if latest is command:
drained.append(command)
del self._overlay_updates[key]
continue
if command.kind is CommandKind.SET_VIEW:
latest = self._view_updates.get(command.map_tag)
if latest is command:
drained.append(command)
del self._view_updates[command.map_tag]
continue
drained.append(command)
return drained
def __len__(self) -> int:
with self._lock:
return len(self._ordered)
def clear(self) -> None:
"""Drop all pending commands."""
with self._lock:
self._ordered.clear()
self._overlay_updates.clear()
self._view_updates.clear()

View File

@@ -27,3 +27,23 @@ class InvalidProviderError(ProviderError, ValueError):
class ProjectionError(DpgMapError, ValueError): class ProjectionError(DpgMapError, ValueError):
"""Raised when geographic projection input is invalid.""" """Raised when geographic projection input is invalid."""
class MapNotFoundError(DpgMapError, KeyError):
"""Raised when a requested map tag is not registered."""
class OverlayNotFoundError(DpgMapError, KeyError):
"""Raised when a requested overlay tag is not registered."""
class CoordinateError(DpgMapError, ValueError):
"""Raised when geographic coordinate input is invalid."""
class ThreadingError(DpgMapError):
"""Raised when an operation violates dpg-map threading rules."""
class CacheError(DpgMapError):
"""Raised when cache metadata or paths cannot be handled."""

View File

@@ -1 +1,72 @@
"""Logical overlay models.""" """Logical overlay models."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from .types import Color, LatLon, Tag
@dataclass(slots=True)
class Overlay:
"""Base logical overlay state."""
tag: Tag
map_tag: Tag
layer: str
show: bool = True
user_data: Any = None
revision: int = 0
def touch(self) -> None:
"""Mark this overlay as changed."""
self.revision += 1
@dataclass(slots=True)
class MarkerOverlay(Overlay):
"""Logical marker overlay."""
lat: float = 0.0
lon: float = 0.0
label: str | None = None
color: Color = (255, 80, 80, 255)
radius: float = 5.0
show_label: bool = False
callback: Callable[..., Any] | None = None
@dataclass(slots=True)
class PolylineOverlay(Overlay):
"""Logical polyline overlay."""
points: tuple[LatLon, ...] = ()
color: Color = (80, 180, 255, 255)
thickness: float = 2.0
closed: bool = False
simplify: bool = True
@dataclass(slots=True)
class TrajectoryOverlay(Overlay):
"""Logical trajectory overlay."""
points: tuple[LatLon, ...] = ()
timestamps: tuple[float, ...] | None = None
color: Color = (255, 180, 60, 255)
thickness: float = 2.0
show_points: bool = False
point_stride: int = 1
@dataclass(slots=True)
class LayerState:
"""Logical layer visibility and ordering state."""
name: str
z_index: int = 0
show: bool = True
overlay_tags: set[Tag] = field(default_factory=set)

View File

@@ -1 +1,328 @@
"""Thread-safe state models and registries.""" """Thread-safe state models and registries."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import IntFlag, auto
from pathlib import Path
from threading import RLock
from uuid import uuid4
from .commands import MapCommandQueue
from .exceptions import InvalidProviderError, MapNotFoundError
from .overlays import LayerState, Overlay
from .providers import TileProvider, get_default_provider, get_provider
from .types import LatLon, Tag
class DirtyFlags(IntFlag):
"""Reasons the GUI renderer needs to refresh part of a map."""
NONE = 0
VIEW = auto()
TILES = auto()
OVERLAYS = auto()
SIZE = auto()
PROVIDER = auto()
FULL = VIEW | TILES | OVERLAYS | SIZE | PROVIDER
@dataclass(slots=True)
class DpgMapConfig:
"""Global package configuration."""
user_agent: str | None = None
cache_dir: Path | None = None
default_provider: str | TileProvider = "osm"
memory_cache_max_tiles: int = 512
disk_cache_max_bytes: int | None = 2_000_000_000
prefetch_margin_tiles: int = 1
tile_worker_count: int = 4
overlay_update_policy: str = "coalesce"
debug: bool = False
@dataclass(slots=True)
class TileManagerState:
"""Logical tile/cache counters until the tile manager is implemented."""
queued_tiles: int = 0
loading_tiles: int = 0
failed_tiles: int = 0
visible_tile_count: int = 0
@dataclass(slots=True)
class InteractionState:
"""Logical interaction state until GUI interaction is implemented."""
active_drag: bool = False
last_mouse_position: tuple[float, float] | None = None
def default_layers() -> dict[str, LayerState]:
"""Return the default logical map layers."""
layers = [
LayerState("background", z_index=0),
LayerState("tiles", z_index=10),
LayerState("default", z_index=50),
LayerState("markers", z_index=60),
LayerState("lines", z_index=70),
LayerState("trajectories", z_index=80),
LayerState("attribution", z_index=100),
]
return {layer.name: layer for layer in layers}
@dataclass(slots=True)
class MapState:
"""Thread-safe logical state for one map widget."""
tag: Tag
child_window_tag: Tag
drawlist_tag: Tag
texture_registry_tag: Tag
handler_registry_tag: Tag
requested_width: int = 0
requested_height: int = 0
requested_autosize_x: bool = False
requested_autosize_y: bool = False
measured_width: int = 0
measured_height: int = 0
last_nonzero_width: int = 0
last_nonzero_height: int = 0
is_visible: bool = False
center: LatLon = (0.0, 0.0)
zoom: int = 2
min_zoom: int = 0
max_zoom: int = 19
provider: TileProvider = field(default_factory=get_default_provider)
overlays: dict[Tag, Overlay] = field(default_factory=dict)
layers: dict[str, LayerState] = field(default_factory=default_layers)
command_queue: MapCommandQueue = field(default_factory=MapCommandQueue)
tile_manager: TileManagerState = field(default_factory=TileManagerState)
renderer: object | None = None
interaction: InteractionState = field(default_factory=InteractionState)
lock: RLock = field(default_factory=RLock)
dirty: DirtyFlags = DirtyFlags.FULL
frame_scheduled: bool = False
generation: int = 0
cache_dir: Path | None = None
user_agent: str | None = None
_config = DpgMapConfig()
_config_lock = RLock()
_maps: dict[Tag, MapState] = {}
_maps_lock = RLock()
_current_map_stack: list[Tag] = []
_current_map_lock = RLock()
def _resolve_provider(provider: str | TileProvider | None) -> TileProvider:
if provider is None:
with _config_lock:
provider = _config.default_provider
if isinstance(provider, TileProvider):
return provider
if isinstance(provider, str):
return get_provider(provider)
raise InvalidProviderError("provider must be a provider name or TileProvider")
def configure_state(
*,
user_agent: str | None = None,
cache_dir: str | Path | None = None,
default_provider: str | TileProvider = "osm",
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
prefetch_margin_tiles: int = 1,
tile_worker_count: int = 4,
overlay_update_policy: str = "coalesce",
debug: bool = False,
) -> None:
"""Replace global dpg-map configuration."""
if memory_cache_max_tiles < 0:
raise ValueError("memory_cache_max_tiles must be >= 0")
if disk_cache_max_bytes is not None and disk_cache_max_bytes < 0:
raise ValueError("disk_cache_max_bytes must be >= 0 or None")
if prefetch_margin_tiles < 0:
raise ValueError("prefetch_margin_tiles must be >= 0")
if tile_worker_count < 1:
raise ValueError("tile_worker_count must be >= 1")
if overlay_update_policy != "coalesce":
raise ValueError('overlay_update_policy must be "coalesce"')
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else None
_resolve_provider(default_provider)
with _config_lock:
_config.user_agent = user_agent
_config.cache_dir = resolved_cache_dir
_config.default_provider = default_provider
_config.memory_cache_max_tiles = memory_cache_max_tiles
_config.disk_cache_max_bytes = disk_cache_max_bytes
_config.prefetch_margin_tiles = prefetch_margin_tiles
_config.tile_worker_count = tile_worker_count
_config.overlay_update_policy = overlay_update_policy
_config.debug = debug
def get_config() -> DpgMapConfig:
"""Return a copy of current global configuration."""
with _config_lock:
return DpgMapConfig(
user_agent=_config.user_agent,
cache_dir=_config.cache_dir,
default_provider=_config.default_provider,
memory_cache_max_tiles=_config.memory_cache_max_tiles,
disk_cache_max_bytes=_config.disk_cache_max_bytes,
prefetch_margin_tiles=_config.prefetch_margin_tiles,
tile_worker_count=_config.tile_worker_count,
overlay_update_policy=_config.overlay_update_policy,
debug=_config.debug,
)
def create_map_state(
*,
tag: Tag | None = None,
center: LatLon = (0.0, 0.0),
zoom: int = 2,
min_zoom: int | None = None,
max_zoom: int | None = None,
width: int = 0,
height: int = 0,
autosize_x: bool = False,
autosize_y: bool = False,
provider: str | TileProvider | None = None,
cache_dir: str | Path | None = None,
user_agent: str | None = None,
) -> MapState:
"""Create and register a logical map state."""
map_tag = tag if tag is not None else f"dpg_map_{uuid4().hex}"
provider_obj = _resolve_provider(provider)
min_zoom_value = provider_obj.min_zoom if min_zoom is None else min_zoom
max_zoom_value = provider_obj.max_zoom if max_zoom is None else max_zoom
if min_zoom_value < provider_obj.min_zoom:
min_zoom_value = provider_obj.min_zoom
if max_zoom_value > provider_obj.max_zoom:
max_zoom_value = provider_obj.max_zoom
if max_zoom_value < min_zoom_value:
raise ValueError("max_zoom must be >= min_zoom")
zoom_value = max(min_zoom_value, min(max_zoom_value, int(zoom)))
config = get_config()
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else config.cache_dir
state = MapState(
tag=map_tag,
child_window_tag=f"{map_tag}##child",
drawlist_tag=f"{map_tag}##drawlist",
texture_registry_tag=f"{map_tag}##textures",
handler_registry_tag=f"{map_tag}##handlers",
requested_width=width,
requested_height=height,
requested_autosize_x=autosize_x,
requested_autosize_y=autosize_y,
center=(float(center[0]), float(center[1])),
zoom=zoom_value,
min_zoom=min_zoom_value,
max_zoom=max_zoom_value,
provider=provider_obj,
cache_dir=resolved_cache_dir,
user_agent=user_agent if user_agent is not None else config.user_agent,
)
with _maps_lock:
_maps[map_tag] = state
return state
def register_map_state(state: MapState) -> None:
"""Register a prebuilt map state."""
with _maps_lock:
_maps[state.tag] = state
def unregister_map_state(tag: Tag) -> None:
"""Remove a map state from the registry."""
with _maps_lock:
_maps.pop(tag, None)
def get_map_state(map_tag: Tag | None = None) -> MapState:
"""Resolve a map by explicit tag or current map context."""
resolved_tag = resolve_map_tag(map_tag)
with _maps_lock:
try:
return _maps[resolved_tag]
except KeyError as exc:
raise MapNotFoundError(f"map not registered: {resolved_tag}") from exc
def resolve_map_tag(map_tag: Tag | None = None) -> Tag:
"""Resolve an explicit tag or the current context map tag."""
if map_tag is not None:
return map_tag
with _current_map_lock:
if _current_map_stack:
return _current_map_stack[-1]
raise MapNotFoundError("map_tag is required outside a map_widget context")
def find_map_for_overlay(tag: Tag, map_tag: Tag | None = None) -> MapState:
"""Find the map containing an overlay, optionally scoped by map tag."""
if map_tag is not None:
return get_map_state(map_tag)
with _current_map_lock:
if _current_map_stack:
current = get_map_state(_current_map_stack[-1])
with current.lock:
if tag in current.overlays:
return current
matches: list[MapState] = []
with _maps_lock:
states = list(_maps.values())
for state in states:
with state.lock:
if tag in state.overlays:
matches.append(state)
if len(matches) == 1:
return matches[0]
if not matches:
raise MapNotFoundError(f"no map contains overlay: {tag}")
raise MapNotFoundError(f"overlay tag is ambiguous across maps: {tag}")
@contextmanager
def current_map_context(map_tag: Tag) -> Iterator[None]:
"""Push a current map tag for context-style overlay creation."""
with _current_map_lock:
_current_map_stack.append(map_tag)
try:
yield
finally:
with _current_map_lock:
if _current_map_stack and _current_map_stack[-1] == map_tag:
_current_map_stack.pop()
elif map_tag in _current_map_stack:
_current_map_stack.remove(map_tag)
def mark_dirty(state: MapState, flags: DirtyFlags) -> None:
"""Mark a map dirty while holding or acquiring its state lock."""
state.dirty |= flags

View File

@@ -4,9 +4,10 @@ from __future__ import annotations
from collections.abc import Iterator from collections.abc import Iterator
from contextlib import contextmanager from contextlib import contextmanager
from pathlib import Path
from .exceptions import DpgMapNotImplementedError
from .providers import TileProvider from .providers import TileProvider
from .state import create_map_state, current_map_context
from .types import LatLon, Tag from .types import LatLon, Tag
@@ -23,9 +24,27 @@ def map_widget(
autosize_y: bool = False, autosize_y: bool = False,
**kwargs: object, **kwargs: object,
) -> Iterator[Tag | None]: ) -> Iterator[Tag | None]:
"""Stub context manager for the future Dear PyGui widget.""" """Create a logical map context.
raise DpgMapNotImplementedError( Dear PyGui item creation is added in a later rebuild step; this context currently
"dpg_map.map_widget is not implemented until the GUI rebuild steps" registers thread-safe logical state so overlays can be declared in context.
"""
cache_dir_value = kwargs.get("cache_dir")
cache_dir = cache_dir_value if isinstance(cache_dir_value, str | Path) else None
user_agent_value = kwargs.get("user_agent")
user_agent = user_agent_value if isinstance(user_agent_value, str) else None
state = create_map_state(
tag=tag,
center=center,
zoom=zoom,
provider=provider,
width=width,
height=height,
autosize_x=autosize_x,
autosize_y=autosize_y,
cache_dir=cache_dir,
user_agent=user_agent,
) )
yield tag with current_map_context(state.tag):
yield state.tag

View File

@@ -2,7 +2,15 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from dpg_map.cache import CacheStats, DiskCacheConfig, MemoryCacheConfig from dpg_map.cache import (
CacheStats,
DiskCacheConfig,
DiskCacheMetadata,
MemoryCacheConfig,
plan_disk_prune,
tile_cache_path,
write_disk_metadata,
)
def test_cache_stats_dataclass_construction() -> None: def test_cache_stats_dataclass_construction() -> None:
@@ -29,3 +37,32 @@ def test_initial_cache_config_dataclasses() -> None:
assert memory_config.max_tiles == 512 assert memory_config.max_tiles == 512
assert disk_config.max_bytes == 2_000_000_000 assert disk_config.max_bytes == 2_000_000_000
def test_disk_cache_path_generation(tmp_path: Path) -> None:
path = tile_cache_path(tmp_path, "osm", 4, 8, 9, "jpg")
assert path == tmp_path / "osm" / "4" / "8" / "9.jpg"
def test_disk_cache_prune_ordering(tmp_path: Path) -> None:
first = tile_cache_path(tmp_path, "osm", 1, 1, 1)
second = tile_cache_path(tmp_path, "osm", 1, 1, 2)
protected = tile_cache_path(tmp_path, "osm", 1, 1, 3)
for path, accessed_at in [(first, 1.0), (second, 2.0), (protected, 0.0)]:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"abcde")
write_disk_metadata(
path.with_suffix(".json"),
DiskCacheMetadata(
url=str(path),
downloaded_at=accessed_at,
last_accessed_at=accessed_at,
size_bytes=5,
),
)
planned = plan_disk_prune(tmp_path, 5, protected_paths={protected})
assert planned == [first, second]

35
tests/test_commands.py Normal file
View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from dpg_map.commands import CommandKind, MapCommand, MapCommandQueue
def test_overlay_updates_coalesce_by_overlay_tag() -> None:
queue = MapCommandQueue()
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 1.0}))
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 2.0}))
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "other", "lat": 3.0}))
drained = queue.drain()
assert len(drained) == 2
assert drained[0].payload == {"tag": "vehicle", "lat": 2.0}
assert drained[1].payload == {"tag": "other", "lat": 3.0}
def test_latest_view_update_wins_but_structural_commands_keep_order() -> None:
queue = MapCommandQueue()
queue.put(MapCommand(CommandKind.ADD_OVERLAY, "map", {"tag": "a"}))
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 3}))
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 4}))
queue.put(MapCommand(CommandKind.DELETE_OVERLAY, "map", {"tag": "a"}))
drained = queue.drain()
assert [command.kind for command in drained] == [
CommandKind.ADD_OVERLAY,
CommandKind.SET_VIEW,
CommandKind.DELETE_OVERLAY,
]
assert drained[1].payload == {"zoom": 4}

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import pytest
import dpg_map as dpgm
from dpg_map.exceptions import CoordinateError
from dpg_map.overlays import TrajectoryOverlay
from dpg_map.state import DirtyFlags, create_map_state, get_map_state
def test_overlay_update_does_not_alter_center_or_zoom() -> None:
create_map_state(tag="overlay-isolation", center=(47.0, 2.0), zoom=8)
dpgm.add_marker("vehicle", lat=47.1, lon=2.1, map_tag="overlay-isolation")
before_center = dpgm.get_center(map_tag="overlay-isolation")
before_zoom = dpgm.get_zoom(map_tag="overlay-isolation")
dpgm.update_marker("vehicle", lat=47.2, lon=2.2, map_tag="overlay-isolation")
assert dpgm.get_center(map_tag="overlay-isolation") == before_center
assert dpgm.get_zoom(map_tag="overlay-isolation") == before_zoom
state = get_map_state("overlay-isolation")
assert state.dirty & DirtyFlags.OVERLAYS
def test_trajectory_inputs_are_copied() -> None:
create_map_state(tag="trajectory-copy")
lats = [1.0, 2.0]
lons = [3.0, 4.0]
dpgm.add_trajectory("track", lats=lats, lons=lons, map_tag="trajectory-copy")
lats[0] = 99.0
lons[0] = 99.0
state = get_map_state("trajectory-copy")
overlay = state.overlays["track"]
assert isinstance(overlay, TrajectoryOverlay)
assert overlay.points == ((1.0, 3.0), (2.0, 4.0))
def test_mismatched_lat_lon_lengths_raise() -> None:
create_map_state(tag="bad-coordinates")
with pytest.raises(CoordinateError):
dpgm.add_trajectory("track", lats=[1.0], lons=[2.0, 3.0], map_tag="bad-coordinates")
def test_layer_state_tracks_visibility_and_overlay_membership() -> None:
create_map_state(tag="layers")
dpgm.add_layer("fleet", map_tag="layers")
dpgm.add_marker("vehicle", lat=1.0, lon=2.0, layer="fleet", map_tag="layers")
dpgm.hide_layer("fleet", map_tag="layers")
state = get_map_state("layers")
assert state.layers["fleet"].show is False
assert state.layers["fleet"].overlay_tags == {"vehicle"}
dpgm.clear_layer("fleet", map_tag="layers")
assert state.layers["fleet"].overlay_tags == set()
assert "vehicle" not in state.overlays