Compare commits
5 Commits
main
...
563ddd962b
| Author | SHA1 | Date | |
|---|---|---|---|
| 563ddd962b | |||
| 743a82f796 | |||
| 13b6a1e65b | |||
| bd1ce7abff | |||
| 11fc1bb9bd |
74
AGENTS.md
Normal file
74
AGENTS.md
Normal file
@@ -0,0 +1,74 @@
|
||||
# AGENTS.md
|
||||
|
||||
## Current status
|
||||
|
||||
Step 4 complete.
|
||||
|
||||
## Completed steps
|
||||
|
||||
Step 1 - Public API contract and pure core.
|
||||
Step 2 - Thread-safe state, commands, overlays, and cache model.
|
||||
Step 3 - Widget shell, sizing system, and GUI-thread frame pump.
|
||||
Step 4 - Tile manager, persistent cache, and asynchronous loading.
|
||||
|
||||
## Current step
|
||||
|
||||
Step 5 - Interaction: pan, zoom, and view commands.
|
||||
|
||||
## Design decisions
|
||||
|
||||
- Package is managed with uv.
|
||||
- Public import is `import dpg_map as dpgm`.
|
||||
- Dear PyGui calls are GUI-thread-only.
|
||||
- Runtime public calls enqueue commands or update logical state.
|
||||
- Overlay updates must not reset center/zoom.
|
||||
- The widget uses child_window + measured-size drawlist.
|
||||
- Tiles use a memory cache and persistent disk cache.
|
||||
- Tile providers are interchangeable.
|
||||
|
||||
## Known issues
|
||||
|
||||
None yet.
|
||||
|
||||
## Commands used
|
||||
|
||||
- Read `STEPS.md`, `FEATURES.md`, and `ARCHITECTURE.md`.
|
||||
- Created initial package, examples, tests, and agent-log structure.
|
||||
- Implemented public exports, exceptions, common types, tile provider registry, projection helpers, cache dataclasses, and GUI-dependent API stubs.
|
||||
- Added Step 1 tests for imports, providers, projection, and cache dataclasses.
|
||||
- Implemented global configuration, logical MapState, map registry, and current map context stack.
|
||||
- Implemented DirtyFlags, MapCommand, CommandKind, and coalescing MapCommandQueue.
|
||||
- Implemented logical marker, polyline, trajectory, and layer state models.
|
||||
- Implemented public runtime overlay/view/layer/provider/cache/debug wrappers against logical state without Dear PyGui calls.
|
||||
- Implemented memory cache metadata, disk cache path generation, metadata read/write, disk size scanning, and prune planning.
|
||||
- Added Step 2 tests for command coalescing, overlay/view isolation, copied trajectory inputs, coordinate length validation, layer state, disk path generation, and prune ordering.
|
||||
- Implemented `map_widget(...)` as a Dear PyGui child-window plus drawlist shell.
|
||||
- Implemented GUI-thread renderer frame pump that schedules frame callbacks, drains command queues, measures size, resizes the drawlist, and draws a placeholder background/attribution.
|
||||
- Implemented sizing helpers for measured size, last non-zero size preservation, visibility transitions, effective draw size, and resize dirty flags.
|
||||
- Implemented map interaction hit-rectangle calculation.
|
||||
- Implemented TileID, TileStatus, Tile, visible tile calculation, and TileManager.
|
||||
- Implemented asynchronous tile worker queue for disk reads, HTTP fetches, and image decoding.
|
||||
- Implemented provider-namespaced persistent cache writes, access metadata updates, clearing, and LRU pruning.
|
||||
- Implemented memory tile cache with visible-tile protection and deferred GUI-thread texture deletion.
|
||||
- Integrated tile result processing, stale generation/provider rejection, texture creation, and tile drawing into the GUI-thread renderer.
|
||||
- Added OpenStreetMap User-Agent warning/fallback and configured examples with example user agents.
|
||||
- Added Step 3 examples for basic map, window sizing, child-window sizing, table sizing, and hidden-tab sizing.
|
||||
- Added Step 4 cache stress example.
|
||||
- Added Step 3 tests for sizing transitions, zero-size preservation, resize dirty flags, command drain ordering, and hit rectangles.
|
||||
- Added Step 4 tests for visible tile calculation, stale result rejection, protected memory eviction, and tile image decoding.
|
||||
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist creation.
|
||||
- Ran `uv run pytest`.
|
||||
- Ran `uv run ruff check .`.
|
||||
- Ran `uv run ruff format .`.
|
||||
- Ran `uv run ruff format --check .`.
|
||||
- Ran `uv run pyright`.
|
||||
- Ran `uv run pytest`.
|
||||
- Ran `uv run ruff check .`.
|
||||
- Ran `uv run ruff format .`.
|
||||
- Ran `uv run ruff format --check .`.
|
||||
- Ran `uv run pyright`.
|
||||
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry creation.
|
||||
|
||||
## Next action
|
||||
|
||||
Implement Step 5.
|
||||
51
README.md
51
README.md
@@ -0,0 +1,51 @@
|
||||
# dpg-map
|
||||
|
||||
`dpg-map` is a Dear PyGui map widget package under rebuild.
|
||||
|
||||
The Step 4 tile manager is in place:
|
||||
|
||||
```python
|
||||
import dpg_map as dpgm
|
||||
|
||||
provider = dpgm.TileProvider(
|
||||
name="custom",
|
||||
url_template="https://example.com/{z}/{x}/{y}.png",
|
||||
attribution="Tiles (c) Example",
|
||||
)
|
||||
dpgm.register_provider(provider)
|
||||
|
||||
with dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15):
|
||||
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093)
|
||||
|
||||
dpgm.update_marker("vehicle", lat=47.9030, lon=1.9094, map_tag="map")
|
||||
```
|
||||
|
||||
Implemented so far:
|
||||
|
||||
- public package exports
|
||||
- tile provider definitions and registry
|
||||
- Web Mercator projection helpers
|
||||
- thread-safe logical map state and map registry
|
||||
- command queue with coalescing for overlay and view updates
|
||||
- logical marker, polyline, trajectory, and layer models
|
||||
- persistent disk cache paths, metadata, scanning, pruning, and clearing
|
||||
- Dear PyGui `child_window` + measured-size `drawlist` widget shell
|
||||
- GUI-thread frame pump that drains commands, manages textures, and draws raster tiles
|
||||
- sizing helpers that preserve the last non-zero size across hidden layouts
|
||||
- interaction hit-rectangle calculation for the measured map area
|
||||
- asynchronous tile workers that read disk cache, fetch HTTP, and decode images
|
||||
- memory cache with visible-tile protection and GUI-thread texture deletion
|
||||
|
||||
Overlay drawing is not implemented yet. Step 5 will add pan/zoom interaction and
|
||||
view command projection.
|
||||
|
||||
Examples:
|
||||
|
||||
```bash
|
||||
uv run python examples/basic_map.py
|
||||
uv run python examples/sizing_window.py
|
||||
uv run python examples/sizing_child.py
|
||||
uv run python examples/sizing_table.py
|
||||
uv run python examples/hidden_tab.py
|
||||
uv run python examples/cache_stress.py
|
||||
```
|
||||
|
||||
1
examples/.gitkeep
Normal file
1
examples/.gitkeep
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
29
examples/basic_map.py
Normal file
29
examples/basic_map.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from typing import Any
|
||||
|
||||
import dearpygui.dearpygui as _dpg
|
||||
|
||||
import dpg_map as dpgm
|
||||
|
||||
dpg: Any = _dpg
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dpgm.configure(user_agent="dpg-map basic example")
|
||||
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="dpg-map basic", width=900, height=600)
|
||||
|
||||
with (
|
||||
dpg.window(label="Map", width=-1, height=-1),
|
||||
dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15, width=-1, height=-1),
|
||||
):
|
||||
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093)
|
||||
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.start_dearpygui()
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
43
examples/cache_stress.py
Normal file
43
examples/cache_stress.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import dearpygui.dearpygui as _dpg
|
||||
|
||||
import dpg_map as dpgm
|
||||
|
||||
dpg: Any = _dpg
|
||||
|
||||
|
||||
def main() -> None:
|
||||
cache_dir = Path(__file__).resolve().parent / ".tile-cache"
|
||||
dpgm.configure(
|
||||
cache_dir=cache_dir,
|
||||
memory_cache_max_tiles=32,
|
||||
disk_cache_max_bytes=30_000_000,
|
||||
prefetch_margin_tiles=1,
|
||||
user_agent="dpg-map cache_stress example",
|
||||
)
|
||||
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="dpg-map cache stress", width=1000, height=700)
|
||||
|
||||
with (
|
||||
dpg.window(label="Cache Stress", width=-1, height=-1),
|
||||
dpgm.map_widget(
|
||||
tag="cache-map",
|
||||
center=(47.9029, 1.9093),
|
||||
zoom=14,
|
||||
width=-1,
|
||||
height=-1,
|
||||
),
|
||||
):
|
||||
dpgm.add_marker("start", lat=47.9029, lon=1.9093, label="Orleans")
|
||||
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.start_dearpygui()
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
27
examples/hidden_tab.py
Normal file
27
examples/hidden_tab.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from typing import Any
|
||||
|
||||
import dearpygui.dearpygui as _dpg
|
||||
|
||||
import dpg_map as dpgm
|
||||
|
||||
dpg: Any = _dpg
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="dpg-map hidden tab", width=900, height=600)
|
||||
|
||||
with dpg.window(label="Hidden Tab Sizing", width=-1, height=-1), dpg.tab_bar():
|
||||
with dpg.tab(label="First"):
|
||||
dpg.add_text("Switch to the map tab.")
|
||||
with dpg.tab(label="Map"), dpgm.map_widget(tag="map-hidden-tab", width=-1, height=500):
|
||||
dpgm.add_marker("tab-marker", lat=35.0, lon=139.0)
|
||||
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.start_dearpygui()
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
28
examples/sizing_child.py
Normal file
28
examples/sizing_child.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from typing import Any
|
||||
|
||||
import dearpygui.dearpygui as _dpg
|
||||
|
||||
import dpg_map as dpgm
|
||||
|
||||
dpg: Any = _dpg
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="dpg-map child sizing", width=900, height=600)
|
||||
|
||||
with (
|
||||
dpg.window(label="Nested Child", width=-1, height=-1),
|
||||
dpg.child_window(width=-1, height=420),
|
||||
dpgm.map_widget(tag="map-child", width=-1, height=-1),
|
||||
):
|
||||
dpgm.add_marker("inside-child", lat=47.0, lon=2.0)
|
||||
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.start_dearpygui()
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
33
examples/sizing_table.py
Normal file
33
examples/sizing_table.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from typing import Any
|
||||
|
||||
import dearpygui.dearpygui as _dpg
|
||||
|
||||
import dpg_map as dpgm
|
||||
|
||||
dpg: Any = _dpg
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="dpg-map table sizing", width=1000, height=600)
|
||||
|
||||
with (
|
||||
dpg.window(label="Table Layout", width=-1, height=-1),
|
||||
dpg.table(header_row=True, resizable=True, policy=dpg.mvTable_SizingStretchProp),
|
||||
):
|
||||
dpg.add_table_column(label="Map")
|
||||
dpg.add_table_column(label="Controls")
|
||||
with dpg.table_row():
|
||||
with dpg.table_cell(), dpgm.map_widget(tag="map-table", width=-1, height=500):
|
||||
dpgm.add_marker("table-marker", lat=51.5, lon=-0.1)
|
||||
with dpg.table_cell():
|
||||
dpg.add_text("Resize the window and table columns.")
|
||||
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.start_dearpygui()
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
27
examples/sizing_window.py
Normal file
27
examples/sizing_window.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from typing import Any
|
||||
|
||||
import dearpygui.dearpygui as _dpg
|
||||
|
||||
import dpg_map as dpgm
|
||||
|
||||
dpg: Any = _dpg
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="dpg-map window sizing", width=900, height=600)
|
||||
|
||||
with (
|
||||
dpg.window(label="Fill Window", width=-1, height=-1),
|
||||
dpgm.map_widget(tag="map-window", width=-1, height=-1),
|
||||
):
|
||||
dpgm.add_marker("center", lat=0.0, lon=0.0)
|
||||
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.start_dearpygui()
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -27,3 +27,13 @@ dev = [
|
||||
"pytest>=9.0.3",
|
||||
"ruff>=0.15.14",
|
||||
]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
target-version = "py311"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I", "UP", "B", "SIM"]
|
||||
|
||||
[tool.pyright]
|
||||
typeCheckingMode = "basic"
|
||||
|
||||
@@ -1,2 +1,87 @@
|
||||
"""Public API for dpg-map."""
|
||||
|
||||
from .api import (
|
||||
add_layer,
|
||||
add_marker,
|
||||
add_polyline,
|
||||
add_trajectory,
|
||||
clear_disk_cache,
|
||||
clear_layer,
|
||||
clear_map,
|
||||
clear_memory_cache,
|
||||
configure,
|
||||
delete_overlay,
|
||||
fit_bounds,
|
||||
get_cache_stats,
|
||||
get_center,
|
||||
get_map_debug_state,
|
||||
get_zoom,
|
||||
hide_layer,
|
||||
latlon_to_screen,
|
||||
screen_to_latlon,
|
||||
set_center,
|
||||
set_marker_label,
|
||||
set_marker_position,
|
||||
set_overlay_show,
|
||||
set_polyline_points,
|
||||
set_provider,
|
||||
set_view,
|
||||
set_zoom,
|
||||
show_layer,
|
||||
update_marker,
|
||||
update_polyline,
|
||||
update_trajectory,
|
||||
)
|
||||
from .providers import (
|
||||
TileProvider,
|
||||
get_provider,
|
||||
list_providers,
|
||||
register_provider,
|
||||
unregister_provider,
|
||||
)
|
||||
from .widget import map_widget
|
||||
|
||||
__all__ = [
|
||||
"TileProvider",
|
||||
"add_layer",
|
||||
"add_marker",
|
||||
"add_polyline",
|
||||
"add_trajectory",
|
||||
"clear_disk_cache",
|
||||
"clear_layer",
|
||||
"clear_map",
|
||||
"clear_memory_cache",
|
||||
"configure",
|
||||
"delete_overlay",
|
||||
"fit_bounds",
|
||||
"get_cache_stats",
|
||||
"get_center",
|
||||
"get_map_debug_state",
|
||||
"get_provider",
|
||||
"get_zoom",
|
||||
"hide_layer",
|
||||
"latlon_to_screen",
|
||||
"list_providers",
|
||||
"map_widget",
|
||||
"register_provider",
|
||||
"screen_to_latlon",
|
||||
"set_center",
|
||||
"set_marker_label",
|
||||
"set_marker_position",
|
||||
"set_overlay_show",
|
||||
"set_polyline_points",
|
||||
"set_provider",
|
||||
"set_view",
|
||||
"set_zoom",
|
||||
"show_layer",
|
||||
"unregister_provider",
|
||||
"update_marker",
|
||||
"update_polyline",
|
||||
"update_trajectory",
|
||||
]
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print("Hello from dpg-map!")
|
||||
"""Console entry point placeholder."""
|
||||
|
||||
print("dpg-map")
|
||||
|
||||
537
src/dpg_map/api.py
Normal file
537
src/dpg_map/api.py
Normal file
@@ -0,0 +1,537 @@
|
||||
"""Public API wrappers for dpg-map."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import asdict
|
||||
from math import isfinite
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .cache import CacheStats, clear_disk_cache_path, disk_cache_root, disk_cache_size_bytes
|
||||
from .commands import CommandKind, MapCommand
|
||||
from .exceptions import CoordinateError, OverlayNotFoundError
|
||||
from .overlays import LayerState, MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
|
||||
from .providers import TileProvider, get_provider
|
||||
from .state import (
|
||||
DirtyFlags,
|
||||
configure_state,
|
||||
find_map_for_overlay,
|
||||
get_config,
|
||||
get_map_state,
|
||||
mark_dirty,
|
||||
)
|
||||
from .types import Bounds, LatLon, Point, Tag
|
||||
|
||||
|
||||
def _validate_latlon(lat: float, lon: float) -> LatLon:
|
||||
lat_value = float(lat)
|
||||
lon_value = float(lon)
|
||||
if not isfinite(lat_value) or not isfinite(lon_value):
|
||||
raise CoordinateError("coordinates must be finite numbers")
|
||||
if lat_value < -90.0 or lat_value > 90.0:
|
||||
raise CoordinateError("latitude must be between -90 and 90")
|
||||
if lon_value < -180.0 or lon_value > 180.0:
|
||||
raise CoordinateError("longitude must be between -180 and 180")
|
||||
return (lat_value, lon_value)
|
||||
|
||||
|
||||
def _points_from_inputs(
|
||||
points: Sequence[LatLon] | None = None,
|
||||
*,
|
||||
lats: Sequence[float] | None = None,
|
||||
lons: Sequence[float] | None = None,
|
||||
) -> tuple[LatLon, ...]:
|
||||
if points is not None and (lats is not None or lons is not None):
|
||||
raise CoordinateError("provide either points or lats/lons, not both")
|
||||
if points is not None:
|
||||
return tuple(_validate_latlon(lat, lon) for lat, lon in points)
|
||||
if lats is None and lons is None:
|
||||
return ()
|
||||
if lats is None or lons is None:
|
||||
raise CoordinateError("lats and lons must be provided together")
|
||||
lat_values = tuple(lats)
|
||||
lon_values = tuple(lons)
|
||||
if len(lat_values) != len(lon_values):
|
||||
raise CoordinateError("lats and lons must have the same length")
|
||||
return tuple(
|
||||
_validate_latlon(lat, lon) for lat, lon in zip(lat_values, lon_values, strict=True)
|
||||
)
|
||||
|
||||
|
||||
def _ensure_layer(state: Any, layer_name: str, z_index: int = 0, show: bool = True) -> LayerState:
|
||||
layer = state.layers.get(layer_name)
|
||||
if layer is None:
|
||||
layer = LayerState(layer_name, z_index=z_index, show=show)
|
||||
state.layers[layer_name] = layer
|
||||
return layer
|
||||
|
||||
|
||||
def _overlay_payload(overlay: Overlay) -> dict[str, Any]:
|
||||
return {"tag": overlay.tag, "overlay": asdict(overlay)}
|
||||
|
||||
|
||||
def _queue(state: Any, kind: CommandKind, payload: dict[str, Any]) -> None:
|
||||
state.command_queue.put(MapCommand(kind=kind, map_tag=state.tag, payload=payload))
|
||||
|
||||
|
||||
def configure(
|
||||
*,
|
||||
user_agent: str | None = None,
|
||||
cache_dir: str | Path | None = None,
|
||||
default_provider: str | TileProvider = "osm",
|
||||
memory_cache_max_tiles: int = 512,
|
||||
disk_cache_max_bytes: int | None = 2_000_000_000,
|
||||
prefetch_margin_tiles: int = 1,
|
||||
tile_worker_count: int = 4,
|
||||
overlay_update_policy: str = "coalesce",
|
||||
debug: bool = False,
|
||||
) -> None:
|
||||
configure_state(
|
||||
user_agent=user_agent,
|
||||
cache_dir=cache_dir,
|
||||
default_provider=default_provider,
|
||||
memory_cache_max_tiles=memory_cache_max_tiles,
|
||||
disk_cache_max_bytes=disk_cache_max_bytes,
|
||||
prefetch_margin_tiles=prefetch_margin_tiles,
|
||||
tile_worker_count=tile_worker_count,
|
||||
overlay_update_policy=overlay_update_policy,
|
||||
debug=debug,
|
||||
)
|
||||
|
||||
|
||||
def set_center(lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
|
||||
set_view(center=_validate_latlon(lat, lon), map_tag=map_tag)
|
||||
|
||||
|
||||
def get_center(*, map_tag: Tag | None = None) -> LatLon:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
return state.center
|
||||
|
||||
|
||||
def set_zoom(zoom: int, *, map_tag: Tag | None = None) -> None:
|
||||
set_view(zoom=zoom, map_tag=map_tag)
|
||||
|
||||
|
||||
def get_zoom(*, map_tag: Tag | None = None) -> int:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
return state.zoom
|
||||
|
||||
|
||||
def set_view(
|
||||
*,
|
||||
center: LatLon | None = None,
|
||||
zoom: int | None = None,
|
||||
map_tag: Tag | None = None,
|
||||
) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
payload: dict[str, Any] = {}
|
||||
if center is not None:
|
||||
state.center = _validate_latlon(center[0], center[1])
|
||||
payload["center"] = state.center
|
||||
if zoom is not None:
|
||||
state.zoom = max(state.min_zoom, min(state.max_zoom, int(zoom)))
|
||||
payload["zoom"] = state.zoom
|
||||
if not payload:
|
||||
return
|
||||
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.SET_VIEW, payload)
|
||||
|
||||
|
||||
def fit_bounds(bounds: Bounds, *, map_tag: Tag | None = None) -> None:
|
||||
(south_west, north_east) = bounds
|
||||
south, west = _validate_latlon(south_west[0], south_west[1])
|
||||
north, east = _validate_latlon(north_east[0], north_east[1])
|
||||
set_center((south + north) / 2.0, (west + east) / 2.0, map_tag=map_tag)
|
||||
|
||||
|
||||
def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon:
|
||||
_ = (x, y)
|
||||
return get_center(map_tag=map_tag)
|
||||
|
||||
|
||||
def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point:
|
||||
_validate_latlon(lat, lon)
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
return (state.measured_width / 2.0, state.measured_height / 2.0)
|
||||
|
||||
|
||||
def add_marker(
|
||||
tag: Tag,
|
||||
*,
|
||||
lat: float,
|
||||
lon: float,
|
||||
label: str | None = None,
|
||||
layer: str = "default",
|
||||
show: bool = True,
|
||||
map_tag: Tag | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Tag:
|
||||
state = get_map_state(map_tag)
|
||||
color = kwargs.get("color", (255, 80, 80, 255))
|
||||
radius = float(kwargs.get("radius", 5.0))
|
||||
show_label = bool(kwargs.get("show_label", False))
|
||||
user_data = kwargs.get("user_data")
|
||||
callback = kwargs.get("callback")
|
||||
with state.lock:
|
||||
marker = MarkerOverlay(
|
||||
tag=tag,
|
||||
map_tag=state.tag,
|
||||
layer=layer,
|
||||
show=show,
|
||||
user_data=user_data,
|
||||
lat=_validate_latlon(lat, lon)[0],
|
||||
lon=_validate_latlon(lat, lon)[1],
|
||||
label=label,
|
||||
color=color,
|
||||
radius=radius,
|
||||
show_label=show_label,
|
||||
callback=callback,
|
||||
)
|
||||
state.overlays[tag] = marker
|
||||
_ensure_layer(state, layer).overlay_tags.add(tag)
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(marker))
|
||||
return tag
|
||||
|
||||
|
||||
def add_polyline(
|
||||
tag: Tag,
|
||||
*,
|
||||
points: Sequence[LatLon] | None = None,
|
||||
lats: Sequence[float] | None = None,
|
||||
lons: Sequence[float] | None = None,
|
||||
layer: str = "default",
|
||||
show: bool = True,
|
||||
map_tag: Tag | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Tag:
|
||||
state = get_map_state(map_tag)
|
||||
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
|
||||
with state.lock:
|
||||
polyline = PolylineOverlay(
|
||||
tag=tag,
|
||||
map_tag=state.tag,
|
||||
layer=layer,
|
||||
show=show,
|
||||
user_data=kwargs.get("user_data"),
|
||||
points=copied_points,
|
||||
color=kwargs.get("color", (80, 180, 255, 255)),
|
||||
thickness=float(kwargs.get("thickness", 2.0)),
|
||||
closed=bool(kwargs.get("closed", False)),
|
||||
simplify=bool(kwargs.get("simplify", True)),
|
||||
)
|
||||
state.overlays[tag] = polyline
|
||||
_ensure_layer(state, layer).overlay_tags.add(tag)
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(polyline))
|
||||
return tag
|
||||
|
||||
|
||||
def add_trajectory(
|
||||
tag: Tag,
|
||||
*,
|
||||
points: Sequence[LatLon] | None = None,
|
||||
lats: Sequence[float] | None = None,
|
||||
lons: Sequence[float] | None = None,
|
||||
layer: str = "default",
|
||||
show: bool = True,
|
||||
map_tag: Tag | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Tag:
|
||||
state = get_map_state(map_tag)
|
||||
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
|
||||
timestamps = kwargs.get("timestamps")
|
||||
copied_timestamps = tuple(timestamps) if timestamps is not None else None
|
||||
if copied_timestamps is not None and len(copied_timestamps) != len(copied_points):
|
||||
raise CoordinateError("timestamps must have the same length as trajectory points")
|
||||
point_stride = int(kwargs.get("point_stride", 1))
|
||||
if point_stride < 1:
|
||||
raise ValueError("point_stride must be >= 1")
|
||||
with state.lock:
|
||||
trajectory = TrajectoryOverlay(
|
||||
tag=tag,
|
||||
map_tag=state.tag,
|
||||
layer=layer,
|
||||
show=show,
|
||||
user_data=kwargs.get("user_data"),
|
||||
points=copied_points,
|
||||
timestamps=copied_timestamps,
|
||||
color=kwargs.get("color", (255, 180, 60, 255)),
|
||||
thickness=float(kwargs.get("thickness", 2.0)),
|
||||
show_points=bool(kwargs.get("show_points", False)),
|
||||
point_stride=point_stride,
|
||||
)
|
||||
state.overlays[tag] = trajectory
|
||||
_ensure_layer(state, layer).overlay_tags.add(tag)
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(trajectory))
|
||||
return tag
|
||||
|
||||
|
||||
def update_marker(
|
||||
tag: Tag,
|
||||
*,
|
||||
lat: float | None = None,
|
||||
lon: float | None = None,
|
||||
label: str | None = None,
|
||||
map_tag: Tag | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
state = find_map_for_overlay(tag, map_tag)
|
||||
with state.lock:
|
||||
overlay = state.overlays.get(tag)
|
||||
if not isinstance(overlay, MarkerOverlay):
|
||||
raise OverlayNotFoundError(f"marker not found: {tag}")
|
||||
if lat is not None or lon is not None:
|
||||
overlay.lat, overlay.lon = _validate_latlon(
|
||||
overlay.lat if lat is None else lat,
|
||||
overlay.lon if lon is None else lon,
|
||||
)
|
||||
if label is not None:
|
||||
overlay.label = label
|
||||
if "show" in kwargs:
|
||||
overlay.show = bool(kwargs["show"])
|
||||
if "color" in kwargs:
|
||||
overlay.color = kwargs["color"]
|
||||
if "radius" in kwargs:
|
||||
overlay.radius = float(kwargs["radius"])
|
||||
overlay.touch()
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
|
||||
|
||||
|
||||
def update_polyline(
|
||||
tag: Tag,
|
||||
*,
|
||||
points: Sequence[LatLon] | None = None,
|
||||
lats: Sequence[float] | None = None,
|
||||
lons: Sequence[float] | None = None,
|
||||
map_tag: Tag | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
state = find_map_for_overlay(tag, map_tag)
|
||||
with state.lock:
|
||||
overlay = state.overlays.get(tag)
|
||||
if not isinstance(overlay, PolylineOverlay):
|
||||
raise OverlayNotFoundError(f"polyline not found: {tag}")
|
||||
if points is not None or lats is not None or lons is not None:
|
||||
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
|
||||
if "show" in kwargs:
|
||||
overlay.show = bool(kwargs["show"])
|
||||
if "color" in kwargs:
|
||||
overlay.color = kwargs["color"]
|
||||
if "thickness" in kwargs:
|
||||
overlay.thickness = float(kwargs["thickness"])
|
||||
overlay.touch()
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
|
||||
|
||||
|
||||
def update_trajectory(
|
||||
tag: Tag,
|
||||
*,
|
||||
points: Sequence[LatLon] | None = None,
|
||||
lats: Sequence[float] | None = None,
|
||||
lons: Sequence[float] | None = None,
|
||||
map_tag: Tag | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
state = find_map_for_overlay(tag, map_tag)
|
||||
with state.lock:
|
||||
overlay = state.overlays.get(tag)
|
||||
if not isinstance(overlay, TrajectoryOverlay):
|
||||
raise OverlayNotFoundError(f"trajectory not found: {tag}")
|
||||
if points is not None or lats is not None or lons is not None:
|
||||
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
|
||||
if "timestamps" in kwargs:
|
||||
timestamps = kwargs["timestamps"]
|
||||
overlay.timestamps = tuple(timestamps) if timestamps is not None else None
|
||||
if overlay.timestamps is not None and len(overlay.timestamps) != len(overlay.points):
|
||||
raise CoordinateError("timestamps must have the same length as trajectory points")
|
||||
if "show" in kwargs:
|
||||
overlay.show = bool(kwargs["show"])
|
||||
if "color" in kwargs:
|
||||
overlay.color = kwargs["color"]
|
||||
if "thickness" in kwargs:
|
||||
overlay.thickness = float(kwargs["thickness"])
|
||||
overlay.touch()
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
|
||||
|
||||
|
||||
def set_marker_position(tag: Tag, lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
|
||||
update_marker(tag, lat=lat, lon=lon, map_tag=map_tag)
|
||||
|
||||
|
||||
def set_marker_label(tag: Tag, label: str, *, map_tag: Tag | None = None) -> None:
|
||||
update_marker(tag, label=label, map_tag=map_tag)
|
||||
|
||||
|
||||
def set_polyline_points(
|
||||
tag: Tag,
|
||||
points: Sequence[LatLon],
|
||||
*,
|
||||
map_tag: Tag | None = None,
|
||||
) -> None:
|
||||
update_polyline(tag, points=points, map_tag=map_tag)
|
||||
|
||||
|
||||
def set_overlay_show(tag: Tag, show: bool, *, map_tag: Tag | None = None) -> None:
|
||||
state = find_map_for_overlay(tag, map_tag)
|
||||
with state.lock:
|
||||
overlay = state.overlays.get(tag)
|
||||
if overlay is None:
|
||||
raise OverlayNotFoundError(f"overlay not found: {tag}")
|
||||
overlay.show = bool(show)
|
||||
overlay.touch()
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
|
||||
|
||||
|
||||
def delete_overlay(tag: Tag, *, map_tag: Tag | None = None) -> None:
|
||||
state = find_map_for_overlay(tag, map_tag)
|
||||
with state.lock:
|
||||
overlay = state.overlays.pop(tag, None)
|
||||
if overlay is None:
|
||||
raise OverlayNotFoundError(f"overlay not found: {tag}")
|
||||
layer = state.layers.get(overlay.layer)
|
||||
if layer is not None:
|
||||
layer.overlay_tags.discard(tag)
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.DELETE_OVERLAY, {"tag": tag})
|
||||
|
||||
|
||||
def add_layer(name: str, *, show: bool = True, map_tag: Tag | None = None) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
layer = _ensure_layer(state, name, z_index=len(state.layers), show=show)
|
||||
layer.show = show
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.ADD_LAYER, {"name": name, "show": show})
|
||||
|
||||
|
||||
def show_layer(name: str, *, map_tag: Tag | None = None) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
_ensure_layer(state, name).show = True
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": True})
|
||||
|
||||
|
||||
def hide_layer(name: str, *, map_tag: Tag | None = None) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
_ensure_layer(state, name).show = False
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": False})
|
||||
|
||||
|
||||
def clear_layer(name: str, *, map_tag: Tag | None = None) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
layer = _ensure_layer(state, name)
|
||||
for overlay_tag in tuple(layer.overlay_tags):
|
||||
state.overlays.pop(overlay_tag, None)
|
||||
layer.overlay_tags.clear()
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS)
|
||||
_queue(state, CommandKind.CLEAR_LAYER, {"name": name})
|
||||
|
||||
|
||||
def clear_map(*, map_tag: Tag | None = None) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
state.overlays.clear()
|
||||
for layer in state.layers.values():
|
||||
layer.overlay_tags.clear()
|
||||
state.generation += 1
|
||||
mark_dirty(state, DirtyFlags.OVERLAYS | DirtyFlags.TILES)
|
||||
_queue(state, CommandKind.CLEAR_MAP, {})
|
||||
|
||||
|
||||
def set_provider(provider: str | TileProvider, *, map_tag: Tag | None = None) -> None:
|
||||
provider_obj = get_provider(provider) if isinstance(provider, str) else provider
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
state.provider = provider_obj
|
||||
state.min_zoom = max(state.min_zoom, provider_obj.min_zoom)
|
||||
state.max_zoom = min(state.max_zoom, provider_obj.max_zoom)
|
||||
state.zoom = max(state.min_zoom, min(state.max_zoom, state.zoom))
|
||||
state.generation += 1
|
||||
mark_dirty(state, DirtyFlags.PROVIDER | DirtyFlags.TILES)
|
||||
_queue(state, CommandKind.SET_PROVIDER, {"provider": provider_obj.name})
|
||||
|
||||
|
||||
def clear_memory_cache(*, map_tag: Tag | None = None) -> None:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
mark_dirty(state, DirtyFlags.TILES)
|
||||
_queue(state, CommandKind.CLEAR_MEMORY_CACHE, {})
|
||||
|
||||
|
||||
def clear_disk_cache(*, map_tag: Tag | None = None) -> None:
|
||||
if map_tag is None:
|
||||
clear_disk_cache_path(get_config().cache_dir)
|
||||
return
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
mark_dirty(state, DirtyFlags.TILES)
|
||||
_queue(state, CommandKind.CLEAR_DISK_CACHE, {})
|
||||
|
||||
|
||||
def get_cache_stats(*, map_tag: Tag | None = None) -> CacheStats:
|
||||
config = get_config()
|
||||
if map_tag is None:
|
||||
cache_dir = config.cache_dir
|
||||
return CacheStats(
|
||||
memory_max_tiles=config.memory_cache_max_tiles,
|
||||
disk_bytes=disk_cache_size_bytes(cache_dir),
|
||||
disk_max_bytes=config.disk_cache_max_bytes,
|
||||
disk_path=disk_cache_root(cache_dir),
|
||||
)
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
tile_snapshot = state.tile_manager.snapshot()
|
||||
return CacheStats(
|
||||
memory_tiles=tile_snapshot.memory_tiles,
|
||||
memory_max_tiles=config.memory_cache_max_tiles,
|
||||
memory_hits=tile_snapshot.memory_hits,
|
||||
memory_misses=tile_snapshot.memory_misses,
|
||||
disk_bytes=disk_cache_size_bytes(state.cache_dir),
|
||||
disk_max_bytes=config.disk_cache_max_bytes,
|
||||
disk_hits=tile_snapshot.disk_hits,
|
||||
disk_misses=tile_snapshot.disk_misses,
|
||||
disk_path=disk_cache_root(state.cache_dir),
|
||||
)
|
||||
|
||||
|
||||
def get_map_debug_state(*, map_tag: Tag | None = None) -> dict[str, Any]:
|
||||
state = get_map_state(map_tag)
|
||||
with state.lock:
|
||||
return {
|
||||
"tag": state.tag,
|
||||
"center": state.center,
|
||||
"zoom": state.zoom,
|
||||
"requested_size": (state.requested_width, state.requested_height),
|
||||
"measured_size": (state.measured_width, state.measured_height),
|
||||
"visible": state.is_visible,
|
||||
"provider": state.provider.name,
|
||||
"overlay_count": len(state.overlays),
|
||||
"layers": {
|
||||
name: {
|
||||
"show": layer.show,
|
||||
"z_index": layer.z_index,
|
||||
"overlay_count": len(layer.overlay_tags),
|
||||
}
|
||||
for name, layer in state.layers.items()
|
||||
},
|
||||
"dirty_flags": int(state.dirty),
|
||||
"pending_command_count": len(state.command_queue),
|
||||
"generation": state.generation,
|
||||
"active_drag": state.interaction.active_drag,
|
||||
"tiles": asdict(state.tile_manager.snapshot()),
|
||||
}
|
||||
289
src/dpg_map/cache.py
Normal file
289
src/dpg_map/cache.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""Memory and disk cache helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import shutil
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path
|
||||
from time import time
|
||||
from typing import Any
|
||||
|
||||
from platformdirs import user_cache_dir
|
||||
|
||||
from .exceptions import CacheError
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class CacheStats:
|
||||
"""Public cache statistics snapshot."""
|
||||
|
||||
memory_tiles: int = 0
|
||||
memory_max_tiles: int = 0
|
||||
memory_hits: int = 0
|
||||
memory_misses: int = 0
|
||||
disk_bytes: int = 0
|
||||
disk_max_bytes: int | None = None
|
||||
disk_hits: int = 0
|
||||
disk_misses: int = 0
|
||||
disk_path: Path | None = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MemoryCacheConfig:
|
||||
"""Initial memory cache configuration."""
|
||||
|
||||
max_tiles: int = 512
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DiskCacheConfig:
|
||||
"""Initial persistent disk cache configuration."""
|
||||
|
||||
path: Path | None = None
|
||||
max_bytes: int | None = 2_000_000_000
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MemoryCacheEntry:
|
||||
"""Metadata for one in-memory tile."""
|
||||
|
||||
tile_id: object
|
||||
size_bytes: int = 0
|
||||
last_accessed_at: float = field(default_factory=time)
|
||||
protected: bool = False
|
||||
texture_tag: object | None = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MemoryCacheModel:
|
||||
"""Small LRU metadata model for decoded/runtime tiles."""
|
||||
|
||||
max_tiles: int = 512
|
||||
entries: dict[object, MemoryCacheEntry] = field(default_factory=dict)
|
||||
hits: int = 0
|
||||
misses: int = 0
|
||||
|
||||
def record_access(self, tile_id: object) -> MemoryCacheEntry | None:
|
||||
"""Mark an entry as recently used and return it if present."""
|
||||
|
||||
entry = self.entries.get(tile_id)
|
||||
if entry is None:
|
||||
self.misses += 1
|
||||
return None
|
||||
self.hits += 1
|
||||
entry.last_accessed_at = time()
|
||||
return entry
|
||||
|
||||
def put(self, entry: MemoryCacheEntry) -> None:
|
||||
"""Insert or replace entry metadata."""
|
||||
|
||||
entry.last_accessed_at = time()
|
||||
self.entries[entry.tile_id] = entry
|
||||
|
||||
def plan_evictions(self) -> list[object]:
|
||||
"""Return tile IDs that can be evicted without touching GUI resources."""
|
||||
|
||||
overflow = len(self.entries) - self.max_tiles
|
||||
if overflow <= 0:
|
||||
return []
|
||||
candidates = [entry for entry in self.entries.values() if not entry.protected]
|
||||
candidates.sort(key=lambda entry: entry.last_accessed_at)
|
||||
return [entry.tile_id for entry in candidates[:overflow]]
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class DiskCacheMetadata:
|
||||
"""Persistent metadata stored next to a tile file."""
|
||||
|
||||
url: str = ""
|
||||
etag: str | None = None
|
||||
last_modified: str | None = None
|
||||
expires: str | None = None
|
||||
downloaded_at: float = 0.0
|
||||
last_accessed_at: float = 0.0
|
||||
size_bytes: int = 0
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class DiskCacheEntry:
|
||||
"""Scanned disk cache file plus metadata."""
|
||||
|
||||
tile_path: Path
|
||||
metadata_path: Path
|
||||
metadata: DiskCacheMetadata
|
||||
|
||||
|
||||
def default_cache_dir() -> Path:
|
||||
"""Return the default persistent cache directory."""
|
||||
|
||||
return Path(user_cache_dir("dpg-map", appauthor=False))
|
||||
|
||||
|
||||
def disk_cache_root(cache_dir: str | Path | None = None) -> Path:
|
||||
"""Resolve the disk cache root path."""
|
||||
|
||||
return Path(cache_dir).expanduser() if cache_dir is not None else default_cache_dir()
|
||||
|
||||
|
||||
def tile_cache_path(
|
||||
cache_dir: str | Path | None,
|
||||
provider_name: str,
|
||||
z: int,
|
||||
x: int,
|
||||
y: int,
|
||||
extension: str | None = None,
|
||||
) -> Path:
|
||||
"""Return the provider-namespaced persistent tile path."""
|
||||
|
||||
ext = (extension or "png").lstrip(".")
|
||||
safe_provider = provider_name.replace("/", "_")
|
||||
return disk_cache_root(cache_dir) / safe_provider / str(z) / str(x) / f"{y}.{ext}"
|
||||
|
||||
|
||||
def tile_metadata_path(tile_path: Path) -> Path:
|
||||
"""Return the metadata path for a tile path."""
|
||||
|
||||
return tile_path.with_suffix(".json")
|
||||
|
||||
|
||||
def read_disk_metadata(path: Path) -> DiskCacheMetadata:
|
||||
"""Read a metadata JSON file, returning defaults for missing metadata."""
|
||||
|
||||
if not path.exists():
|
||||
return DiskCacheMetadata()
|
||||
try:
|
||||
raw: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
|
||||
except OSError as exc:
|
||||
raise CacheError(f"could not read cache metadata: {path}") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise CacheError(f"invalid cache metadata JSON: {path}") from exc
|
||||
|
||||
return DiskCacheMetadata(
|
||||
url=str(raw.get("url", "")),
|
||||
etag=raw.get("etag"),
|
||||
last_modified=raw.get("last_modified"),
|
||||
expires=raw.get("expires"),
|
||||
downloaded_at=float(raw.get("downloaded_at", 0.0)),
|
||||
last_accessed_at=float(raw.get("last_accessed_at", 0.0)),
|
||||
size_bytes=int(raw.get("size_bytes", 0)),
|
||||
)
|
||||
|
||||
|
||||
def write_disk_metadata(path: Path, metadata: DiskCacheMetadata) -> None:
|
||||
"""Write metadata JSON next to a tile file."""
|
||||
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(asdict(metadata), sort_keys=True), encoding="utf-8")
|
||||
except OSError as exc:
|
||||
raise CacheError(f"could not write cache metadata: {path}") from exc
|
||||
|
||||
|
||||
def touch_disk_metadata(path: Path, *, accessed_at: float | None = None) -> None:
|
||||
"""Update only the last access timestamp for a metadata file."""
|
||||
|
||||
metadata = read_disk_metadata(path)
|
||||
write_disk_metadata(
|
||||
path,
|
||||
DiskCacheMetadata(
|
||||
url=metadata.url,
|
||||
etag=metadata.etag,
|
||||
last_modified=metadata.last_modified,
|
||||
expires=metadata.expires,
|
||||
downloaded_at=metadata.downloaded_at,
|
||||
last_accessed_at=time() if accessed_at is None else accessed_at,
|
||||
size_bytes=metadata.size_bytes,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def scan_disk_cache(cache_dir: str | Path | None) -> list[DiskCacheEntry]:
|
||||
"""Scan tile files under a disk cache root."""
|
||||
|
||||
root = disk_cache_root(cache_dir)
|
||||
if not root.exists():
|
||||
return []
|
||||
entries: list[DiskCacheEntry] = []
|
||||
for path in root.rglob("*"):
|
||||
if not path.is_file() or path.suffix == ".json":
|
||||
continue
|
||||
metadata_path = tile_metadata_path(path)
|
||||
metadata = read_disk_metadata(metadata_path)
|
||||
size_bytes = metadata.size_bytes or path.stat().st_size
|
||||
if size_bytes != metadata.size_bytes:
|
||||
metadata = DiskCacheMetadata(
|
||||
url=metadata.url,
|
||||
etag=metadata.etag,
|
||||
last_modified=metadata.last_modified,
|
||||
expires=metadata.expires,
|
||||
downloaded_at=metadata.downloaded_at,
|
||||
last_accessed_at=metadata.last_accessed_at,
|
||||
size_bytes=size_bytes,
|
||||
)
|
||||
entries.append(DiskCacheEntry(path, metadata_path, metadata))
|
||||
return entries
|
||||
|
||||
|
||||
def disk_cache_size_bytes(cache_dir: str | Path | None) -> int:
|
||||
"""Return total bytes for cached tile files."""
|
||||
|
||||
return sum(entry.metadata.size_bytes for entry in scan_disk_cache(cache_dir))
|
||||
|
||||
|
||||
def plan_disk_prune(
|
||||
cache_dir: str | Path | None,
|
||||
max_bytes: int | None,
|
||||
*,
|
||||
protected_paths: set[Path] | None = None,
|
||||
) -> list[Path]:
|
||||
"""Return tile paths that should be pruned by LRU order without deleting them."""
|
||||
|
||||
if max_bytes is None:
|
||||
return []
|
||||
protected = {path.resolve() for path in protected_paths or set()}
|
||||
entries = scan_disk_cache(cache_dir)
|
||||
total = sum(entry.metadata.size_bytes for entry in entries)
|
||||
if total <= max_bytes:
|
||||
return []
|
||||
|
||||
candidates = [entry for entry in entries if entry.tile_path.resolve() not in protected]
|
||||
candidates.sort(key=lambda entry: entry.metadata.last_accessed_at)
|
||||
prune: list[Path] = []
|
||||
for entry in candidates:
|
||||
if total <= max_bytes:
|
||||
break
|
||||
prune.append(entry.tile_path)
|
||||
total -= entry.metadata.size_bytes
|
||||
return prune
|
||||
|
||||
|
||||
def prune_disk_cache(
|
||||
cache_dir: str | Path | None,
|
||||
max_bytes: int | None,
|
||||
*,
|
||||
protected_paths: set[Path] | None = None,
|
||||
) -> list[Path]:
|
||||
"""Delete LRU tile files until the cache fits the configured limit."""
|
||||
|
||||
planned = plan_disk_prune(cache_dir, max_bytes, protected_paths=protected_paths)
|
||||
for path in planned:
|
||||
metadata_path = tile_metadata_path(path)
|
||||
try:
|
||||
path.unlink(missing_ok=True)
|
||||
metadata_path.unlink(missing_ok=True)
|
||||
except OSError as exc:
|
||||
raise CacheError(f"could not prune cached tile: {path}") from exc
|
||||
return planned
|
||||
|
||||
|
||||
def clear_disk_cache_path(cache_dir: str | Path | None) -> None:
|
||||
"""Remove all persistent tile cache files under a cache root."""
|
||||
|
||||
root = disk_cache_root(cache_dir)
|
||||
if not root.exists():
|
||||
return
|
||||
try:
|
||||
shutil.rmtree(root)
|
||||
except OSError as exc:
|
||||
raise CacheError(f"could not clear disk cache: {root}") from exc
|
||||
113
src/dpg_map/commands.py
Normal file
113
src/dpg_map/commands.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Command models for GUI-thread rendering work."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import OrderedDict, deque
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from threading import RLock
|
||||
from time import monotonic
|
||||
from typing import Any
|
||||
|
||||
from .types import Tag
|
||||
|
||||
|
||||
class CommandKind(Enum):
|
||||
"""Commands that the GUI thread can apply in order."""
|
||||
|
||||
SET_VIEW = "set_view"
|
||||
SET_PROVIDER = "set_provider"
|
||||
ADD_OVERLAY = "add_overlay"
|
||||
UPDATE_OVERLAY = "update_overlay"
|
||||
DELETE_OVERLAY = "delete_overlay"
|
||||
SET_LAYER_VISIBILITY = "set_layer_visibility"
|
||||
ADD_LAYER = "add_layer"
|
||||
CLEAR_LAYER = "clear_layer"
|
||||
CLEAR_MAP = "clear_map"
|
||||
CLEAR_MEMORY_CACHE = "clear_memory_cache"
|
||||
CLEAR_DISK_CACHE = "clear_disk_cache"
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class MapCommand:
|
||||
"""A command submitted from public API calls to the GUI-thread renderer."""
|
||||
|
||||
kind: CommandKind
|
||||
map_tag: Tag
|
||||
payload: dict[str, Any] = field(default_factory=dict)
|
||||
created_at: float = field(default_factory=monotonic)
|
||||
|
||||
|
||||
class MapCommandQueue:
|
||||
"""Thread-safe command queue with bounded coalescing for high-rate updates."""
|
||||
|
||||
_VIEW_KEY = "__view__"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = RLock()
|
||||
self._ordered: deque[MapCommand] = deque()
|
||||
self._overlay_updates: OrderedDict[tuple[Tag, Tag], MapCommand] = OrderedDict()
|
||||
self._view_updates: OrderedDict[Tag, MapCommand] = OrderedDict()
|
||||
|
||||
def put(self, command: MapCommand) -> None:
|
||||
"""Queue a command, coalescing update commands where ordering permits."""
|
||||
|
||||
with self._lock:
|
||||
if command.kind is CommandKind.UPDATE_OVERLAY:
|
||||
overlay_tag = command.payload.get("tag")
|
||||
if overlay_tag is None:
|
||||
self._ordered.append(command)
|
||||
return
|
||||
key = (command.map_tag, overlay_tag)
|
||||
self._ordered.append(command)
|
||||
self._overlay_updates[key] = command
|
||||
return
|
||||
|
||||
if command.kind is CommandKind.SET_VIEW:
|
||||
self._ordered.append(command)
|
||||
self._view_updates[command.map_tag] = command
|
||||
return
|
||||
|
||||
self._ordered.append(command)
|
||||
|
||||
def drain(self) -> list[MapCommand]:
|
||||
"""Return pending commands in render order and clear the queue."""
|
||||
|
||||
with self._lock:
|
||||
drained: list[MapCommand] = []
|
||||
while self._ordered:
|
||||
command = self._ordered.popleft()
|
||||
if command.kind is CommandKind.UPDATE_OVERLAY:
|
||||
overlay_tag = command.payload.get("tag")
|
||||
if not isinstance(overlay_tag, str | int):
|
||||
drained.append(command)
|
||||
continue
|
||||
key = (command.map_tag, overlay_tag)
|
||||
latest = self._overlay_updates.get(key)
|
||||
if latest is command:
|
||||
drained.append(command)
|
||||
del self._overlay_updates[key]
|
||||
continue
|
||||
|
||||
if command.kind is CommandKind.SET_VIEW:
|
||||
latest = self._view_updates.get(command.map_tag)
|
||||
if latest is command:
|
||||
drained.append(command)
|
||||
del self._view_updates[command.map_tag]
|
||||
continue
|
||||
|
||||
drained.append(command)
|
||||
|
||||
return drained
|
||||
|
||||
def __len__(self) -> int:
|
||||
with self._lock:
|
||||
return len(self._ordered)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Drop all pending commands."""
|
||||
|
||||
with self._lock:
|
||||
self._ordered.clear()
|
||||
self._overlay_updates.clear()
|
||||
self._view_updates.clear()
|
||||
1
src/dpg_map/diagnostics.py
Normal file
1
src/dpg_map/diagnostics.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Diagnostics and debug state helpers."""
|
||||
1
src/dpg_map/draw_layers.py
Normal file
1
src/dpg_map/draw_layers.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Draw layer bookkeeping helpers."""
|
||||
49
src/dpg_map/exceptions.py
Normal file
49
src/dpg_map/exceptions.py
Normal file
@@ -0,0 +1,49 @@
|
||||
"""Public exception types for dpg-map."""
|
||||
|
||||
|
||||
class DpgMapError(Exception):
|
||||
"""Base exception for all public dpg-map errors."""
|
||||
|
||||
|
||||
class DpgMapNotImplementedError(DpgMapError, NotImplementedError):
|
||||
"""Raised by public APIs that are intentionally stubbed during the rebuild."""
|
||||
|
||||
|
||||
class ProviderError(DpgMapError):
|
||||
"""Base exception for tile provider errors."""
|
||||
|
||||
|
||||
class ProviderExistsError(ProviderError):
|
||||
"""Raised when registering a provider name that already exists."""
|
||||
|
||||
|
||||
class ProviderNotFoundError(ProviderError):
|
||||
"""Raised when a requested tile provider is not registered."""
|
||||
|
||||
|
||||
class InvalidProviderError(ProviderError, ValueError):
|
||||
"""Raised when a tile provider definition is invalid."""
|
||||
|
||||
|
||||
class ProjectionError(DpgMapError, ValueError):
|
||||
"""Raised when geographic projection input is invalid."""
|
||||
|
||||
|
||||
class MapNotFoundError(DpgMapError, KeyError):
|
||||
"""Raised when a requested map tag is not registered."""
|
||||
|
||||
|
||||
class OverlayNotFoundError(DpgMapError, KeyError):
|
||||
"""Raised when a requested overlay tag is not registered."""
|
||||
|
||||
|
||||
class CoordinateError(DpgMapError, ValueError):
|
||||
"""Raised when geographic coordinate input is invalid."""
|
||||
|
||||
|
||||
class ThreadingError(DpgMapError):
|
||||
"""Raised when an operation violates dpg-map threading rules."""
|
||||
|
||||
|
||||
class CacheError(DpgMapError):
|
||||
"""Raised when cache metadata or paths cannot be handled."""
|
||||
36
src/dpg_map/interaction.py
Normal file
36
src/dpg_map/interaction.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Map interaction state and handlers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .sizing import effective_draw_size
|
||||
from .state import MapState
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class HitRect:
|
||||
"""Screen-space rectangle used for map interaction tests."""
|
||||
|
||||
x: float
|
||||
y: float
|
||||
width: float
|
||||
height: float
|
||||
|
||||
@property
|
||||
def right(self) -> float:
|
||||
return self.x + self.width
|
||||
|
||||
@property
|
||||
def bottom(self) -> float:
|
||||
return self.y + self.height
|
||||
|
||||
def contains(self, x: float, y: float) -> bool:
|
||||
return self.x <= x <= self.right and self.y <= y <= self.bottom
|
||||
|
||||
|
||||
def calculate_hit_rect(state: MapState, drawlist_pos: tuple[float, float]) -> HitRect:
|
||||
"""Return the map interaction rectangle for a drawlist position."""
|
||||
|
||||
width, height = effective_draw_size(state)
|
||||
return HitRect(float(drawlist_pos[0]), float(drawlist_pos[1]), float(width), float(height))
|
||||
72
src/dpg_map/overlays.py
Normal file
72
src/dpg_map/overlays.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""Logical overlay models."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from .types import Color, LatLon, Tag
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Overlay:
|
||||
"""Base logical overlay state."""
|
||||
|
||||
tag: Tag
|
||||
map_tag: Tag
|
||||
layer: str
|
||||
show: bool = True
|
||||
user_data: Any = None
|
||||
revision: int = 0
|
||||
|
||||
def touch(self) -> None:
|
||||
"""Mark this overlay as changed."""
|
||||
|
||||
self.revision += 1
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MarkerOverlay(Overlay):
|
||||
"""Logical marker overlay."""
|
||||
|
||||
lat: float = 0.0
|
||||
lon: float = 0.0
|
||||
label: str | None = None
|
||||
color: Color = (255, 80, 80, 255)
|
||||
radius: float = 5.0
|
||||
show_label: bool = False
|
||||
callback: Callable[..., Any] | None = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PolylineOverlay(Overlay):
|
||||
"""Logical polyline overlay."""
|
||||
|
||||
points: tuple[LatLon, ...] = ()
|
||||
color: Color = (80, 180, 255, 255)
|
||||
thickness: float = 2.0
|
||||
closed: bool = False
|
||||
simplify: bool = True
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TrajectoryOverlay(Overlay):
|
||||
"""Logical trajectory overlay."""
|
||||
|
||||
points: tuple[LatLon, ...] = ()
|
||||
timestamps: tuple[float, ...] | None = None
|
||||
color: Color = (255, 180, 60, 255)
|
||||
thickness: float = 2.0
|
||||
show_points: bool = False
|
||||
point_stride: int = 1
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class LayerState:
|
||||
"""Logical layer visibility and ordering state."""
|
||||
|
||||
name: str
|
||||
z_index: int = 0
|
||||
show: bool = True
|
||||
overlay_tags: set[Tag] = field(default_factory=set)
|
||||
92
src/dpg_map/projection.py
Normal file
92
src/dpg_map/projection.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Web Mercator projection helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
from .exceptions import ProjectionError
|
||||
from .types import LatLon, Point
|
||||
|
||||
WEB_MERCATOR_MAX_LAT = 85.05112878
|
||||
DEFAULT_TILE_SIZE = 256
|
||||
|
||||
|
||||
def clamp_latitude(lat: float) -> float:
|
||||
"""Clamp latitude to the Web Mercator representable range."""
|
||||
|
||||
return min(max(lat, -WEB_MERCATOR_MAX_LAT), WEB_MERCATOR_MAX_LAT)
|
||||
|
||||
|
||||
def map_size(zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> int:
|
||||
"""Return square pixel size of the full world at a zoom level."""
|
||||
|
||||
if zoom < 0:
|
||||
raise ProjectionError("zoom must be >= 0")
|
||||
if tile_size <= 0:
|
||||
raise ProjectionError("tile_size must be > 0")
|
||||
return tile_size * (2**zoom)
|
||||
|
||||
|
||||
def latlon_to_world(lat: float, lon: float, zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> Point:
|
||||
"""Project latitude/longitude to world pixel coordinates."""
|
||||
|
||||
size = map_size(zoom, tile_size)
|
||||
lat = clamp_latitude(lat)
|
||||
lon = ((lon + 180.0) % 360.0) - 180.0
|
||||
|
||||
sin_lat = math.sin(math.radians(lat))
|
||||
x = (lon + 180.0) / 360.0 * size
|
||||
y = (0.5 - math.log((1.0 + sin_lat) / (1.0 - sin_lat)) / (4.0 * math.pi)) * size
|
||||
return (x, y)
|
||||
|
||||
|
||||
def world_to_latlon(x: float, y: float, zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> LatLon:
|
||||
"""Unproject world pixel coordinates to latitude/longitude."""
|
||||
|
||||
size = map_size(zoom, tile_size)
|
||||
lon = x / size * 360.0 - 180.0
|
||||
n = math.pi - 2.0 * math.pi * y / size
|
||||
lat = math.degrees(math.atan(math.sinh(n)))
|
||||
return (lat, lon)
|
||||
|
||||
|
||||
def latlon_to_tile(lat: float, lon: float, zoom: int) -> tuple[int, int, int]:
|
||||
"""Return the XYZ tile coordinate containing a latitude/longitude point."""
|
||||
|
||||
x, y = latlon_to_world(lat, lon, zoom)
|
||||
scale = 2**zoom
|
||||
tile_x = min(max(int(x // DEFAULT_TILE_SIZE), 0), scale - 1)
|
||||
tile_y = min(max(int(y // DEFAULT_TILE_SIZE), 0), scale - 1)
|
||||
return (tile_x, tile_y, zoom)
|
||||
|
||||
|
||||
def world_to_screen(
|
||||
world_x: float,
|
||||
world_y: float,
|
||||
*,
|
||||
center: LatLon,
|
||||
zoom: int,
|
||||
width: int,
|
||||
height: int,
|
||||
tile_size: int = DEFAULT_TILE_SIZE,
|
||||
) -> Point:
|
||||
"""Convert world pixels to screen pixels for a centered viewport."""
|
||||
|
||||
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
|
||||
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
|
||||
|
||||
|
||||
def screen_to_world(
|
||||
screen_x: float,
|
||||
screen_y: float,
|
||||
*,
|
||||
center: LatLon,
|
||||
zoom: int,
|
||||
width: int,
|
||||
height: int,
|
||||
tile_size: int = DEFAULT_TILE_SIZE,
|
||||
) -> Point:
|
||||
"""Convert screen pixels to world pixels for a centered viewport."""
|
||||
|
||||
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
|
||||
return (screen_x + center_x - width / 2.0, screen_y + center_y - height / 2.0)
|
||||
145
src/dpg_map/providers.py
Normal file
145
src/dpg_map/providers.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Tile provider definitions and registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from string import Formatter
|
||||
from threading import RLock
|
||||
|
||||
from .exceptions import InvalidProviderError, ProviderExistsError, ProviderNotFoundError
|
||||
|
||||
_REQUIRED_TEMPLATE_FIELDS = frozenset({"x", "y", "z"})
|
||||
_OPTIONAL_TEMPLATE_FIELDS = frozenset({"s", "r", "ext"})
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TileProvider:
|
||||
"""XYZ raster tile provider definition."""
|
||||
|
||||
name: str
|
||||
url_template: str
|
||||
min_zoom: int = 0
|
||||
max_zoom: int = 19
|
||||
tile_size: int = 256
|
||||
attribution: str = ""
|
||||
headers: dict[str, str] = field(default_factory=dict)
|
||||
subdomains: tuple[str, ...] = ()
|
||||
retina: bool = False
|
||||
file_extension: str | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
name = self.name.strip()
|
||||
if not name:
|
||||
raise InvalidProviderError("provider name must not be empty")
|
||||
if name != self.name:
|
||||
object.__setattr__(self, "name", name)
|
||||
|
||||
if not self.url_template.strip():
|
||||
raise InvalidProviderError("provider url_template must not be empty")
|
||||
|
||||
fields = {
|
||||
field_name
|
||||
for _, field_name, _, _ in Formatter().parse(self.url_template)
|
||||
if field_name is not None and field_name != ""
|
||||
}
|
||||
missing = _REQUIRED_TEMPLATE_FIELDS - fields
|
||||
if missing:
|
||||
missing_text = ", ".join(sorted(missing))
|
||||
raise InvalidProviderError(f"provider url_template missing field(s): {missing_text}")
|
||||
|
||||
unknown = fields - _REQUIRED_TEMPLATE_FIELDS - _OPTIONAL_TEMPLATE_FIELDS
|
||||
if unknown:
|
||||
unknown_text = ", ".join(sorted(unknown))
|
||||
raise InvalidProviderError(
|
||||
f"provider url_template contains unknown field(s): {unknown_text}"
|
||||
)
|
||||
|
||||
if self.min_zoom < 0:
|
||||
raise InvalidProviderError("provider min_zoom must be >= 0")
|
||||
if self.max_zoom < self.min_zoom:
|
||||
raise InvalidProviderError("provider max_zoom must be >= min_zoom")
|
||||
if self.tile_size <= 0:
|
||||
raise InvalidProviderError("provider tile_size must be > 0")
|
||||
|
||||
object.__setattr__(self, "headers", dict(self.headers))
|
||||
object.__setattr__(self, "subdomains", tuple(self.subdomains))
|
||||
|
||||
def build_url(self, *, x: int, y: int, z: int) -> str:
|
||||
"""Build a concrete tile URL for an XYZ tile coordinate."""
|
||||
|
||||
if z < self.min_zoom or z > self.max_zoom:
|
||||
raise InvalidProviderError(
|
||||
f"zoom {z} is outside provider range {self.min_zoom}-{self.max_zoom}"
|
||||
)
|
||||
|
||||
subdomain = ""
|
||||
if self.subdomains:
|
||||
subdomain = self.subdomains[(x + y + z) % len(self.subdomains)]
|
||||
|
||||
retina_suffix = "@2x" if self.retina else ""
|
||||
extension = self.file_extension or ""
|
||||
return self.url_template.format(
|
||||
x=x,
|
||||
y=y,
|
||||
z=z,
|
||||
s=subdomain,
|
||||
r=retina_suffix,
|
||||
ext=extension,
|
||||
)
|
||||
|
||||
|
||||
OSM = TileProvider(
|
||||
name="osm",
|
||||
url_template="https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
min_zoom=0,
|
||||
max_zoom=19,
|
||||
tile_size=256,
|
||||
attribution="\u00a9 OpenStreetMap contributors",
|
||||
)
|
||||
|
||||
_providers: dict[str, TileProvider] = {OSM.name: OSM}
|
||||
_providers_lock = RLock()
|
||||
|
||||
|
||||
def register_provider(provider: TileProvider, *, replace: bool = False) -> None:
|
||||
"""Register a tile provider by name."""
|
||||
|
||||
if not isinstance(provider, TileProvider):
|
||||
raise InvalidProviderError("provider must be a TileProvider")
|
||||
|
||||
with _providers_lock:
|
||||
if provider.name in _providers and not replace:
|
||||
raise ProviderExistsError(f"provider already registered: {provider.name}")
|
||||
_providers[provider.name] = provider
|
||||
|
||||
|
||||
def unregister_provider(name: str) -> None:
|
||||
"""Remove a registered tile provider."""
|
||||
|
||||
with _providers_lock:
|
||||
if name not in _providers:
|
||||
raise ProviderNotFoundError(f"provider not registered: {name}")
|
||||
del _providers[name]
|
||||
|
||||
|
||||
def get_provider(name: str) -> TileProvider:
|
||||
"""Return a registered tile provider."""
|
||||
|
||||
with _providers_lock:
|
||||
try:
|
||||
return _providers[name]
|
||||
except KeyError as exc:
|
||||
raise ProviderNotFoundError(f"provider not registered: {name}") from exc
|
||||
|
||||
|
||||
def get_default_provider() -> TileProvider:
|
||||
"""Return the default OpenStreetMap provider."""
|
||||
|
||||
return get_provider(OSM.name)
|
||||
|
||||
|
||||
def list_providers() -> list[str]:
|
||||
"""List registered provider names in stable sorted order."""
|
||||
|
||||
with _providers_lock:
|
||||
return sorted(_providers)
|
||||
231
src/dpg_map/renderer.py
Normal file
231
src/dpg_map/renderer.py
Normal file
@@ -0,0 +1,231 @@
|
||||
"""GUI-thread renderer implementation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from .commands import CommandKind, MapCommand
|
||||
from .interaction import HitRect, calculate_hit_rect
|
||||
from .sizing import SizeMeasurement, apply_size_measurement
|
||||
from .state import DirtyFlags, MapState
|
||||
from .tiles import Tile, VisibleTile
|
||||
|
||||
|
||||
class MapRenderer:
|
||||
"""GUI-thread renderer for the map widget shell and tile layer."""
|
||||
|
||||
def __init__(self, state: MapState, dpg: Any) -> None:
|
||||
self.state = state
|
||||
self._dpg = dpg
|
||||
self._background_tag = f"{state.tag}##background"
|
||||
self._attribution_tag = f"{state.tag}##attribution"
|
||||
self.last_drained_commands: tuple[MapCommand, ...] = ()
|
||||
self.last_hit_rect: HitRect | None = None
|
||||
|
||||
def schedule_next_frame(self) -> None:
|
||||
"""Schedule this renderer to run on the next Dear PyGui frame."""
|
||||
|
||||
with self.state.lock:
|
||||
if self.state.frame_scheduled:
|
||||
return
|
||||
self.state.frame_scheduled = True
|
||||
frame = self._dpg.get_frame_count() + 1
|
||||
self._dpg.set_frame_callback(frame, self._frame_callback)
|
||||
|
||||
def _frame_callback(self, sender: Any | None = None, app_data: Any | None = None) -> None:
|
||||
_ = (sender, app_data)
|
||||
with self.state.lock:
|
||||
self.state.frame_scheduled = False
|
||||
if not self._dpg.does_item_exist(self.state.drawlist_tag):
|
||||
return
|
||||
self.render_frame()
|
||||
self.schedule_next_frame()
|
||||
|
||||
def render_frame(self) -> None:
|
||||
"""Drain pending commands, refresh size, process tiles, and redraw."""
|
||||
|
||||
commands = drain_renderer_commands(self.state)
|
||||
self.last_drained_commands = tuple(commands)
|
||||
self._update_size_from_dpg()
|
||||
|
||||
with self.state.lock:
|
||||
dirty = self.state.dirty
|
||||
should_draw = bool(dirty & (DirtyFlags.FULL | DirtyFlags.SIZE | DirtyFlags.TILES))
|
||||
visible = self.state.is_visible
|
||||
width = self.state.measured_width or self.state.last_nonzero_width
|
||||
height = self.state.measured_height or self.state.last_nonzero_height
|
||||
provider_attribution = self.state.provider.attribution
|
||||
provider = self.state.provider
|
||||
center = self.state.center
|
||||
zoom = self.state.zoom
|
||||
generation = self.state.generation
|
||||
cache_dir = self.state.cache_dir
|
||||
self.state.dirty = DirtyFlags.NONE
|
||||
|
||||
accepted_tiles = self.state.tile_manager.drain_results(
|
||||
generation=generation,
|
||||
provider_name=provider.name,
|
||||
)
|
||||
self._delete_evicted_textures()
|
||||
for tile in accepted_tiles:
|
||||
self._ensure_texture(tile)
|
||||
|
||||
visible_tiles: list[VisibleTile] = []
|
||||
if visible and width > 0 and height > 0:
|
||||
visible_tiles = self.state.tile_manager.request_visible_tiles(
|
||||
center=center,
|
||||
zoom=zoom,
|
||||
width=width,
|
||||
height=height,
|
||||
provider=provider,
|
||||
generation=generation,
|
||||
cache_dir=cache_dir,
|
||||
margin=self._prefetch_margin(),
|
||||
)
|
||||
|
||||
if visible and (should_draw or accepted_tiles):
|
||||
self._draw_tile_layer(
|
||||
visible_tiles=visible_tiles,
|
||||
width=width,
|
||||
height=height,
|
||||
attribution=provider_attribution,
|
||||
tile_size=provider.tile_size,
|
||||
)
|
||||
|
||||
def _update_size_from_dpg(self) -> None:
|
||||
width, height = self._measure_child_content()
|
||||
visible = bool(self._dpg.is_item_shown(self.state.child_window_tag))
|
||||
with self.state.lock:
|
||||
update = apply_size_measurement(
|
||||
self.state,
|
||||
SizeMeasurement(width=width, height=height, visible=visible),
|
||||
)
|
||||
draw_width = update.effective_width
|
||||
draw_height = update.effective_height
|
||||
self._dpg.configure_item(self.state.drawlist_tag, width=draw_width, height=draw_height)
|
||||
draw_pos = tuple(float(value) for value in self._dpg.get_item_pos(self.state.drawlist_tag))
|
||||
with self.state.lock:
|
||||
self.last_hit_rect = calculate_hit_rect(self.state, (draw_pos[0], draw_pos[1]))
|
||||
|
||||
def _measure_child_content(self) -> tuple[int, int]:
|
||||
try:
|
||||
width, height = self._dpg.get_item_rect_size(self.state.child_window_tag)
|
||||
except Exception:
|
||||
return (0, 0)
|
||||
return (max(0, int(width)), max(0, int(height)))
|
||||
|
||||
def _draw_tile_layer(
|
||||
self,
|
||||
*,
|
||||
visible_tiles: list[VisibleTile],
|
||||
width: int,
|
||||
height: int,
|
||||
attribution: str,
|
||||
tile_size: int,
|
||||
) -> None:
|
||||
width = max(1, int(width))
|
||||
height = max(1, int(height))
|
||||
self._dpg.delete_item(self.state.drawlist_tag, children_only=True)
|
||||
self._dpg.draw_rectangle(
|
||||
(0, 0),
|
||||
(width, height),
|
||||
parent=self.state.drawlist_tag,
|
||||
tag=self._background_tag,
|
||||
color=(54, 68, 78, 255),
|
||||
fill=(29, 38, 45, 255),
|
||||
)
|
||||
for visible_tile in visible_tiles:
|
||||
tile = self.state.tile_manager.get_ready_tile(visible_tile.tile_id)
|
||||
if tile is None or tile.texture_tag is None:
|
||||
continue
|
||||
screen_x = visible_tile.screen_x
|
||||
screen_y = visible_tile.screen_y
|
||||
self._dpg.draw_image(
|
||||
tile.texture_tag,
|
||||
(screen_x, screen_y),
|
||||
(screen_x + tile_size, screen_y + tile_size),
|
||||
parent=self.state.drawlist_tag,
|
||||
)
|
||||
label = attribution or "Map tiles"
|
||||
text_y = max(28, height - 24)
|
||||
self._dpg.draw_text(
|
||||
(12, text_y),
|
||||
label,
|
||||
parent=self.state.drawlist_tag,
|
||||
tag=self._attribution_tag,
|
||||
color=(172, 184, 192, 255),
|
||||
size=12,
|
||||
)
|
||||
|
||||
def _ensure_texture(self, tile: Tile) -> None:
|
||||
if tile.texture_tag is not None:
|
||||
return
|
||||
texture_tag = (
|
||||
f"{self.state.tag}##tile-{tile.tile_id.provider_name}-"
|
||||
f"{tile.tile_id.z}-{tile.tile_id.x}-{tile.tile_id.y}"
|
||||
)
|
||||
if not self._dpg.does_item_exist(texture_tag):
|
||||
self._dpg.add_static_texture(
|
||||
tile.width,
|
||||
tile.height,
|
||||
tile.pixels,
|
||||
tag=texture_tag,
|
||||
parent=self.state.texture_registry_tag,
|
||||
)
|
||||
self.state.tile_manager.set_texture_tag(tile.tile_id, texture_tag)
|
||||
|
||||
def _delete_evicted_textures(self) -> None:
|
||||
for texture_tag in self.state.tile_manager.take_texture_deletions():
|
||||
if self._dpg.does_item_exist(texture_tag):
|
||||
self._dpg.delete_item(texture_tag)
|
||||
|
||||
def _prefetch_margin(self) -> int:
|
||||
from .state import get_config
|
||||
|
||||
return get_config().prefetch_margin_tiles
|
||||
|
||||
|
||||
def drain_renderer_commands(state: MapState) -> list[MapCommand]:
|
||||
"""Drain and apply GUI-thread command side effects."""
|
||||
|
||||
commands = state.command_queue.drain()
|
||||
if not commands:
|
||||
return []
|
||||
|
||||
with state.lock:
|
||||
for command in commands:
|
||||
if command.kind is CommandKind.SET_VIEW:
|
||||
state.dirty |= DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS
|
||||
elif command.kind is CommandKind.SET_PROVIDER:
|
||||
state.tile_manager.clear_memory_cache()
|
||||
state.dirty |= DirtyFlags.PROVIDER | DirtyFlags.TILES
|
||||
elif command.kind in {
|
||||
CommandKind.ADD_OVERLAY,
|
||||
CommandKind.UPDATE_OVERLAY,
|
||||
CommandKind.DELETE_OVERLAY,
|
||||
CommandKind.SET_LAYER_VISIBILITY,
|
||||
CommandKind.ADD_LAYER,
|
||||
CommandKind.CLEAR_LAYER,
|
||||
}:
|
||||
state.dirty |= DirtyFlags.OVERLAYS
|
||||
elif command.kind is CommandKind.CLEAR_MAP:
|
||||
state.tile_manager.clear_memory_cache()
|
||||
state.dirty |= DirtyFlags.TILES | DirtyFlags.OVERLAYS
|
||||
elif command.kind is CommandKind.CLEAR_MEMORY_CACHE:
|
||||
state.tile_manager.clear_memory_cache()
|
||||
state.dirty |= DirtyFlags.TILES
|
||||
elif command.kind is CommandKind.CLEAR_DISK_CACHE:
|
||||
state.tile_manager.clear_disk_cache(state.cache_dir)
|
||||
state.dirty |= DirtyFlags.TILES
|
||||
return commands
|
||||
|
||||
|
||||
def make_frame_pump(state: MapState, dpg: Any) -> Callable[[], None]:
|
||||
"""Create and attach a renderer frame pump for a map state."""
|
||||
|
||||
renderer = MapRenderer(state, dpg)
|
||||
with state.lock:
|
||||
state.renderer = renderer
|
||||
renderer.schedule_next_frame()
|
||||
return renderer.render_frame
|
||||
87
src/dpg_map/sizing.py
Normal file
87
src/dpg_map/sizing.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""Map sizing measurement helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .state import DirtyFlags, MapState, mark_dirty
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class SizeMeasurement:
|
||||
"""Concrete size and visibility reported by the GUI thread."""
|
||||
|
||||
width: int
|
||||
height: int
|
||||
visible: bool
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class SizeUpdate:
|
||||
"""Result of applying one GUI size measurement."""
|
||||
|
||||
changed: bool
|
||||
became_visible: bool
|
||||
became_hidden: bool
|
||||
effective_width: int
|
||||
effective_height: int
|
||||
|
||||
|
||||
def normalize_dimension(value: int | float | None) -> int:
|
||||
"""Convert Dear PyGui size values to stable integer pixels."""
|
||||
|
||||
if value is None:
|
||||
return 0
|
||||
return max(0, int(value))
|
||||
|
||||
|
||||
def effective_draw_size(state: MapState) -> tuple[int, int]:
|
||||
"""Return the size the drawlist should use for the current frame."""
|
||||
|
||||
width = state.measured_width or state.last_nonzero_width or 1
|
||||
height = state.measured_height or state.last_nonzero_height or 1
|
||||
return (max(1, width), max(1, height))
|
||||
|
||||
|
||||
def apply_size_measurement(
|
||||
state: MapState,
|
||||
measurement: SizeMeasurement,
|
||||
*,
|
||||
mark: bool = True,
|
||||
) -> SizeUpdate:
|
||||
"""Apply a GUI-thread size measurement to logical state.
|
||||
|
||||
A zero measurement is preserved as the current measured size, but it does not
|
||||
erase the last non-zero size. This keeps hidden maps from permanently
|
||||
collapsing when Dear PyGui reports a temporary zero content region.
|
||||
"""
|
||||
|
||||
width = normalize_dimension(measurement.width)
|
||||
height = normalize_dimension(measurement.height)
|
||||
visible = bool(measurement.visible and width > 0 and height > 0)
|
||||
|
||||
previous_width = state.measured_width
|
||||
previous_height = state.measured_height
|
||||
previous_visible = state.is_visible
|
||||
|
||||
changed = previous_width != width or previous_height != height or previous_visible != visible
|
||||
|
||||
state.measured_width = width
|
||||
state.measured_height = height
|
||||
state.is_visible = visible
|
||||
if width > 0:
|
||||
state.last_nonzero_width = width
|
||||
if height > 0:
|
||||
state.last_nonzero_height = height
|
||||
|
||||
if changed and mark:
|
||||
mark_dirty(state, DirtyFlags.SIZE | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
|
||||
|
||||
effective_width, effective_height = effective_draw_size(state)
|
||||
return SizeUpdate(
|
||||
changed=changed,
|
||||
became_visible=visible and not previous_visible,
|
||||
became_hidden=previous_visible and not visible,
|
||||
effective_width=effective_width,
|
||||
effective_height=effective_height,
|
||||
)
|
||||
325
src/dpg_map/state.py
Normal file
325
src/dpg_map/state.py
Normal file
@@ -0,0 +1,325 @@
|
||||
"""Thread-safe state models and registries."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntFlag, auto
|
||||
from pathlib import Path
|
||||
from threading import RLock
|
||||
from uuid import uuid4
|
||||
|
||||
from .commands import MapCommandQueue
|
||||
from .exceptions import InvalidProviderError, MapNotFoundError
|
||||
from .overlays import LayerState, Overlay
|
||||
from .providers import TileProvider, get_default_provider, get_provider
|
||||
from .tiles import TileManager
|
||||
from .types import LatLon, Tag
|
||||
|
||||
|
||||
class DirtyFlags(IntFlag):
|
||||
"""Reasons the GUI renderer needs to refresh part of a map."""
|
||||
|
||||
NONE = 0
|
||||
VIEW = auto()
|
||||
TILES = auto()
|
||||
OVERLAYS = auto()
|
||||
SIZE = auto()
|
||||
PROVIDER = auto()
|
||||
FULL = VIEW | TILES | OVERLAYS | SIZE | PROVIDER
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DpgMapConfig:
|
||||
"""Global package configuration."""
|
||||
|
||||
user_agent: str | None = None
|
||||
cache_dir: Path | None = None
|
||||
default_provider: str | TileProvider = "osm"
|
||||
memory_cache_max_tiles: int = 512
|
||||
disk_cache_max_bytes: int | None = 2_000_000_000
|
||||
prefetch_margin_tiles: int = 1
|
||||
tile_worker_count: int = 4
|
||||
overlay_update_policy: str = "coalesce"
|
||||
debug: bool = False
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class InteractionState:
|
||||
"""Logical interaction state until GUI interaction is implemented."""
|
||||
|
||||
active_drag: bool = False
|
||||
last_mouse_position: tuple[float, float] | None = None
|
||||
|
||||
|
||||
def default_layers() -> dict[str, LayerState]:
|
||||
"""Return the default logical map layers."""
|
||||
|
||||
layers = [
|
||||
LayerState("background", z_index=0),
|
||||
LayerState("tiles", z_index=10),
|
||||
LayerState("default", z_index=50),
|
||||
LayerState("markers", z_index=60),
|
||||
LayerState("lines", z_index=70),
|
||||
LayerState("trajectories", z_index=80),
|
||||
LayerState("attribution", z_index=100),
|
||||
]
|
||||
return {layer.name: layer for layer in layers}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MapState:
|
||||
"""Thread-safe logical state for one map widget."""
|
||||
|
||||
tag: Tag
|
||||
child_window_tag: Tag
|
||||
drawlist_tag: Tag
|
||||
texture_registry_tag: Tag
|
||||
handler_registry_tag: Tag
|
||||
requested_width: int = 0
|
||||
requested_height: int = 0
|
||||
requested_autosize_x: bool = False
|
||||
requested_autosize_y: bool = False
|
||||
measured_width: int = 0
|
||||
measured_height: int = 0
|
||||
last_nonzero_width: int = 0
|
||||
last_nonzero_height: int = 0
|
||||
is_visible: bool = False
|
||||
center: LatLon = (0.0, 0.0)
|
||||
zoom: int = 2
|
||||
min_zoom: int = 0
|
||||
max_zoom: int = 19
|
||||
provider: TileProvider = field(default_factory=get_default_provider)
|
||||
overlays: dict[Tag, Overlay] = field(default_factory=dict)
|
||||
layers: dict[str, LayerState] = field(default_factory=default_layers)
|
||||
command_queue: MapCommandQueue = field(default_factory=MapCommandQueue)
|
||||
tile_manager: TileManager = field(default_factory=TileManager)
|
||||
renderer: object | None = None
|
||||
interaction: InteractionState = field(default_factory=InteractionState)
|
||||
lock: RLock = field(default_factory=RLock)
|
||||
dirty: DirtyFlags = DirtyFlags.FULL
|
||||
frame_scheduled: bool = False
|
||||
generation: int = 0
|
||||
cache_dir: Path | None = None
|
||||
user_agent: str | None = None
|
||||
|
||||
|
||||
_config = DpgMapConfig()
|
||||
_config_lock = RLock()
|
||||
_maps: dict[Tag, MapState] = {}
|
||||
_maps_lock = RLock()
|
||||
_current_map_stack: list[Tag] = []
|
||||
_current_map_lock = RLock()
|
||||
|
||||
|
||||
def _resolve_provider(provider: str | TileProvider | None) -> TileProvider:
|
||||
if provider is None:
|
||||
with _config_lock:
|
||||
provider = _config.default_provider
|
||||
if isinstance(provider, TileProvider):
|
||||
return provider
|
||||
if isinstance(provider, str):
|
||||
return get_provider(provider)
|
||||
raise InvalidProviderError("provider must be a provider name or TileProvider")
|
||||
|
||||
|
||||
def configure_state(
|
||||
*,
|
||||
user_agent: str | None = None,
|
||||
cache_dir: str | Path | None = None,
|
||||
default_provider: str | TileProvider = "osm",
|
||||
memory_cache_max_tiles: int = 512,
|
||||
disk_cache_max_bytes: int | None = 2_000_000_000,
|
||||
prefetch_margin_tiles: int = 1,
|
||||
tile_worker_count: int = 4,
|
||||
overlay_update_policy: str = "coalesce",
|
||||
debug: bool = False,
|
||||
) -> None:
|
||||
"""Replace global dpg-map configuration."""
|
||||
|
||||
if memory_cache_max_tiles < 0:
|
||||
raise ValueError("memory_cache_max_tiles must be >= 0")
|
||||
if disk_cache_max_bytes is not None and disk_cache_max_bytes < 0:
|
||||
raise ValueError("disk_cache_max_bytes must be >= 0 or None")
|
||||
if prefetch_margin_tiles < 0:
|
||||
raise ValueError("prefetch_margin_tiles must be >= 0")
|
||||
if tile_worker_count < 1:
|
||||
raise ValueError("tile_worker_count must be >= 1")
|
||||
if overlay_update_policy != "coalesce":
|
||||
raise ValueError('overlay_update_policy must be "coalesce"')
|
||||
|
||||
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else None
|
||||
_resolve_provider(default_provider)
|
||||
with _config_lock:
|
||||
_config.user_agent = user_agent
|
||||
_config.cache_dir = resolved_cache_dir
|
||||
_config.default_provider = default_provider
|
||||
_config.memory_cache_max_tiles = memory_cache_max_tiles
|
||||
_config.disk_cache_max_bytes = disk_cache_max_bytes
|
||||
_config.prefetch_margin_tiles = prefetch_margin_tiles
|
||||
_config.tile_worker_count = tile_worker_count
|
||||
_config.overlay_update_policy = overlay_update_policy
|
||||
_config.debug = debug
|
||||
|
||||
|
||||
def get_config() -> DpgMapConfig:
|
||||
"""Return a copy of current global configuration."""
|
||||
|
||||
with _config_lock:
|
||||
return DpgMapConfig(
|
||||
user_agent=_config.user_agent,
|
||||
cache_dir=_config.cache_dir,
|
||||
default_provider=_config.default_provider,
|
||||
memory_cache_max_tiles=_config.memory_cache_max_tiles,
|
||||
disk_cache_max_bytes=_config.disk_cache_max_bytes,
|
||||
prefetch_margin_tiles=_config.prefetch_margin_tiles,
|
||||
tile_worker_count=_config.tile_worker_count,
|
||||
overlay_update_policy=_config.overlay_update_policy,
|
||||
debug=_config.debug,
|
||||
)
|
||||
|
||||
|
||||
def create_map_state(
|
||||
*,
|
||||
tag: Tag | None = None,
|
||||
center: LatLon = (0.0, 0.0),
|
||||
zoom: int = 2,
|
||||
min_zoom: int | None = None,
|
||||
max_zoom: int | None = None,
|
||||
width: int = 0,
|
||||
height: int = 0,
|
||||
autosize_x: bool = False,
|
||||
autosize_y: bool = False,
|
||||
provider: str | TileProvider | None = None,
|
||||
cache_dir: str | Path | None = None,
|
||||
user_agent: str | None = None,
|
||||
) -> MapState:
|
||||
"""Create and register a logical map state."""
|
||||
|
||||
map_tag = tag if tag is not None else f"dpg_map_{uuid4().hex}"
|
||||
provider_obj = _resolve_provider(provider)
|
||||
min_zoom_value = provider_obj.min_zoom if min_zoom is None else min_zoom
|
||||
max_zoom_value = provider_obj.max_zoom if max_zoom is None else max_zoom
|
||||
if min_zoom_value < provider_obj.min_zoom:
|
||||
min_zoom_value = provider_obj.min_zoom
|
||||
if max_zoom_value > provider_obj.max_zoom:
|
||||
max_zoom_value = provider_obj.max_zoom
|
||||
if max_zoom_value < min_zoom_value:
|
||||
raise ValueError("max_zoom must be >= min_zoom")
|
||||
|
||||
zoom_value = max(min_zoom_value, min(max_zoom_value, int(zoom)))
|
||||
config = get_config()
|
||||
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else config.cache_dir
|
||||
state = MapState(
|
||||
tag=map_tag,
|
||||
child_window_tag=f"{map_tag}##child",
|
||||
drawlist_tag=f"{map_tag}##drawlist",
|
||||
texture_registry_tag=f"{map_tag}##textures",
|
||||
handler_registry_tag=f"{map_tag}##handlers",
|
||||
requested_width=width,
|
||||
requested_height=height,
|
||||
requested_autosize_x=autosize_x,
|
||||
requested_autosize_y=autosize_y,
|
||||
center=(float(center[0]), float(center[1])),
|
||||
zoom=zoom_value,
|
||||
min_zoom=min_zoom_value,
|
||||
max_zoom=max_zoom_value,
|
||||
provider=provider_obj,
|
||||
cache_dir=resolved_cache_dir,
|
||||
user_agent=user_agent if user_agent is not None else config.user_agent,
|
||||
)
|
||||
state.tile_manager = TileManager(
|
||||
memory_cache_max_tiles=config.memory_cache_max_tiles,
|
||||
disk_cache_max_bytes=config.disk_cache_max_bytes,
|
||||
worker_count=config.tile_worker_count,
|
||||
user_agent=state.user_agent,
|
||||
)
|
||||
with _maps_lock:
|
||||
_maps[map_tag] = state
|
||||
return state
|
||||
|
||||
|
||||
def register_map_state(state: MapState) -> None:
|
||||
"""Register a prebuilt map state."""
|
||||
|
||||
with _maps_lock:
|
||||
_maps[state.tag] = state
|
||||
|
||||
|
||||
def unregister_map_state(tag: Tag) -> None:
|
||||
"""Remove a map state from the registry."""
|
||||
|
||||
with _maps_lock:
|
||||
_maps.pop(tag, None)
|
||||
|
||||
|
||||
def get_map_state(map_tag: Tag | None = None) -> MapState:
|
||||
"""Resolve a map by explicit tag or current map context."""
|
||||
|
||||
resolved_tag = resolve_map_tag(map_tag)
|
||||
with _maps_lock:
|
||||
try:
|
||||
return _maps[resolved_tag]
|
||||
except KeyError as exc:
|
||||
raise MapNotFoundError(f"map not registered: {resolved_tag}") from exc
|
||||
|
||||
|
||||
def resolve_map_tag(map_tag: Tag | None = None) -> Tag:
|
||||
"""Resolve an explicit tag or the current context map tag."""
|
||||
|
||||
if map_tag is not None:
|
||||
return map_tag
|
||||
with _current_map_lock:
|
||||
if _current_map_stack:
|
||||
return _current_map_stack[-1]
|
||||
raise MapNotFoundError("map_tag is required outside a map_widget context")
|
||||
|
||||
|
||||
def find_map_for_overlay(tag: Tag, map_tag: Tag | None = None) -> MapState:
|
||||
"""Find the map containing an overlay, optionally scoped by map tag."""
|
||||
|
||||
if map_tag is not None:
|
||||
return get_map_state(map_tag)
|
||||
|
||||
with _current_map_lock:
|
||||
if _current_map_stack:
|
||||
current = get_map_state(_current_map_stack[-1])
|
||||
with current.lock:
|
||||
if tag in current.overlays:
|
||||
return current
|
||||
|
||||
matches: list[MapState] = []
|
||||
with _maps_lock:
|
||||
states = list(_maps.values())
|
||||
for state in states:
|
||||
with state.lock:
|
||||
if tag in state.overlays:
|
||||
matches.append(state)
|
||||
if len(matches) == 1:
|
||||
return matches[0]
|
||||
if not matches:
|
||||
raise MapNotFoundError(f"no map contains overlay: {tag}")
|
||||
raise MapNotFoundError(f"overlay tag is ambiguous across maps: {tag}")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def current_map_context(map_tag: Tag) -> Iterator[None]:
|
||||
"""Push a current map tag for context-style overlay creation."""
|
||||
|
||||
with _current_map_lock:
|
||||
_current_map_stack.append(map_tag)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
with _current_map_lock:
|
||||
if _current_map_stack and _current_map_stack[-1] == map_tag:
|
||||
_current_map_stack.pop()
|
||||
elif map_tag in _current_map_stack:
|
||||
_current_map_stack.remove(map_tag)
|
||||
|
||||
|
||||
def mark_dirty(state: MapState, flags: DirtyFlags) -> None:
|
||||
"""Mark a map dirty while holding or acquiring its state lock."""
|
||||
|
||||
state.dirty |= flags
|
||||
555
src/dpg_map/tiles.py
Normal file
555
src/dpg_map/tiles.py
Normal file
@@ -0,0 +1,555 @@
|
||||
"""Tile identity, lifecycle, cache, and worker coordination."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import warnings
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from queue import Empty, Queue
|
||||
from threading import Lock, Thread
|
||||
from time import time
|
||||
from typing import Literal, cast
|
||||
|
||||
import requests
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
|
||||
from .cache import (
|
||||
DiskCacheMetadata,
|
||||
MemoryCacheEntry,
|
||||
MemoryCacheModel,
|
||||
clear_disk_cache_path,
|
||||
prune_disk_cache,
|
||||
tile_cache_path,
|
||||
tile_metadata_path,
|
||||
touch_disk_metadata,
|
||||
write_disk_metadata,
|
||||
)
|
||||
from .exceptions import CacheError
|
||||
from .projection import latlon_to_world, map_size
|
||||
from .providers import TileProvider
|
||||
from .types import LatLon
|
||||
|
||||
|
||||
class TileStatus(Enum):
|
||||
"""Lifecycle status for a tile."""
|
||||
|
||||
QUEUED = "queued"
|
||||
LOADING = "loading"
|
||||
READY = "ready"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TileID:
|
||||
"""Provider-namespaced XYZ tile identity."""
|
||||
|
||||
provider_name: str
|
||||
z: int
|
||||
x: int
|
||||
y: int
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Tile:
|
||||
"""Decoded tile data plus GUI-thread texture metadata."""
|
||||
|
||||
tile_id: TileID
|
||||
status: TileStatus
|
||||
generation: int
|
||||
width: int = 0
|
||||
height: int = 0
|
||||
pixels: tuple[float, ...] = ()
|
||||
texture_tag: object | None = None
|
||||
source: Literal["memory", "disk", "network"] | None = None
|
||||
error: str | None = None
|
||||
last_accessed_at: float = field(default_factory=time)
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class VisibleTile:
|
||||
"""A visible tile and its screen-space top-left point."""
|
||||
|
||||
tile_id: TileID
|
||||
screen_x: float
|
||||
screen_y: float
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TileRequest:
|
||||
"""Worker-thread tile request."""
|
||||
|
||||
tile_id: TileID
|
||||
generation: int
|
||||
url: str
|
||||
path: Path
|
||||
headers: dict[str, str]
|
||||
disk_cache_max_bytes: int | None
|
||||
protected_paths: frozenset[Path] = frozenset()
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TileResult:
|
||||
"""Worker-thread tile result consumed by the GUI thread."""
|
||||
|
||||
tile_id: TileID
|
||||
generation: int
|
||||
status: TileStatus
|
||||
width: int = 0
|
||||
height: int = 0
|
||||
pixels: tuple[float, ...] = ()
|
||||
source: Literal["disk", "network"] | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TileManagerSnapshot:
|
||||
"""Thread-safe counters for diagnostics."""
|
||||
|
||||
queued_tiles: int
|
||||
loading_tiles: int
|
||||
failed_tiles: int
|
||||
visible_tile_count: int
|
||||
memory_tiles: int
|
||||
memory_hits: int
|
||||
memory_misses: int
|
||||
disk_hits: int
|
||||
disk_misses: int
|
||||
stale_results: int
|
||||
|
||||
|
||||
def calculate_visible_tiles(
|
||||
*,
|
||||
center: LatLon,
|
||||
zoom: int,
|
||||
width: int,
|
||||
height: int,
|
||||
provider: TileProvider,
|
||||
margin: int = 0,
|
||||
) -> list[VisibleTile]:
|
||||
"""Return visible XYZ tiles plus a margin, with wrapped X and clamped Y."""
|
||||
|
||||
if width <= 0 or height <= 0:
|
||||
return []
|
||||
tile_size = provider.tile_size
|
||||
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
|
||||
left = center_x - width / 2.0
|
||||
top = center_y - height / 2.0
|
||||
right = center_x + width / 2.0
|
||||
bottom = center_y + height / 2.0
|
||||
|
||||
max_tile = (2**zoom) - 1
|
||||
start_x = int(left // tile_size) - margin
|
||||
end_x = int(right // tile_size) + margin
|
||||
start_y = max(0, int(top // tile_size) - margin)
|
||||
end_y = min(max_tile, int(bottom // tile_size) + margin)
|
||||
world_size = map_size(zoom, tile_size)
|
||||
|
||||
tiles: list[VisibleTile] = []
|
||||
seen: set[TileID] = set()
|
||||
for y in range(start_y, end_y + 1):
|
||||
for raw_x in range(start_x, end_x + 1):
|
||||
x = raw_x % (max_tile + 1)
|
||||
tile_id = TileID(provider.name, zoom, x, y)
|
||||
if tile_id in seen:
|
||||
continue
|
||||
seen.add(tile_id)
|
||||
tile_left = raw_x * tile_size
|
||||
if tile_left < -tile_size:
|
||||
tile_left += world_size
|
||||
screen_x = tile_left - left
|
||||
screen_y = y * tile_size - top
|
||||
tiles.append(VisibleTile(tile_id, screen_x, screen_y))
|
||||
return tiles
|
||||
|
||||
|
||||
class TileManager:
|
||||
"""Asynchronous tile loader with memory and persistent disk caches."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
memory_cache_max_tiles: int = 512,
|
||||
disk_cache_max_bytes: int | None = 2_000_000_000,
|
||||
worker_count: int = 4,
|
||||
user_agent: str | None = None,
|
||||
) -> None:
|
||||
self.memory = MemoryCacheModel(max_tiles=memory_cache_max_tiles)
|
||||
self.disk_cache_max_bytes = disk_cache_max_bytes
|
||||
self.worker_count = worker_count
|
||||
self.user_agent = user_agent
|
||||
self._tiles: dict[TileID, Tile] = {}
|
||||
self._visible_tile_ids: set[TileID] = set()
|
||||
self._queued: set[TileID] = set()
|
||||
self._loading: set[TileID] = set()
|
||||
self._failed: set[TileID] = set()
|
||||
self._request_queue: Queue[TileRequest | None] = Queue()
|
||||
self._result_queue: Queue[TileResult] = Queue()
|
||||
self._threads: list[Thread] = []
|
||||
self._lock = Lock()
|
||||
self._delete_texture_tags: list[object] = []
|
||||
self._disk_hits = 0
|
||||
self._disk_misses = 0
|
||||
self._stale_results = 0
|
||||
self._warned_osm_user_agent = False
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start worker threads once."""
|
||||
|
||||
with self._lock:
|
||||
if self._threads:
|
||||
return
|
||||
count = max(1, self.worker_count)
|
||||
for index in range(count):
|
||||
thread = Thread(
|
||||
target=self._worker_loop,
|
||||
name=f"dpg-map-tile-worker-{index + 1}",
|
||||
daemon=True,
|
||||
)
|
||||
self._threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Ask workers to exit."""
|
||||
|
||||
with self._lock:
|
||||
threads = list(self._threads)
|
||||
self._threads.clear()
|
||||
for _ in threads:
|
||||
self._request_queue.put(None)
|
||||
|
||||
def request_visible_tiles(
|
||||
self,
|
||||
*,
|
||||
center: LatLon,
|
||||
zoom: int,
|
||||
width: int,
|
||||
height: int,
|
||||
provider: TileProvider,
|
||||
generation: int,
|
||||
cache_dir: str | Path | None,
|
||||
margin: int,
|
||||
) -> list[VisibleTile]:
|
||||
"""Queue missing visible tiles and return their screen positions."""
|
||||
|
||||
visible = calculate_visible_tiles(
|
||||
center=center,
|
||||
zoom=zoom,
|
||||
width=width,
|
||||
height=height,
|
||||
provider=provider,
|
||||
margin=margin,
|
||||
)
|
||||
visible_ids = {tile.tile_id for tile in visible}
|
||||
with self._lock:
|
||||
self._visible_tile_ids = visible_ids
|
||||
for entry in self.memory.entries.values():
|
||||
entry.protected = entry.tile_id in visible_ids
|
||||
|
||||
self.start()
|
||||
for visible_tile in visible:
|
||||
self._queue_tile(
|
||||
visible_tile.tile_id,
|
||||
provider=provider,
|
||||
generation=generation,
|
||||
cache_dir=cache_dir,
|
||||
)
|
||||
return visible
|
||||
|
||||
def _queue_tile(
|
||||
self,
|
||||
tile_id: TileID,
|
||||
*,
|
||||
provider: TileProvider,
|
||||
generation: int,
|
||||
cache_dir: str | Path | None,
|
||||
) -> None:
|
||||
with self._lock:
|
||||
tile = self._tiles.get(tile_id)
|
||||
if tile is not None and tile.status is TileStatus.READY:
|
||||
self.memory.record_access(tile_id)
|
||||
tile.last_accessed_at = time()
|
||||
return
|
||||
entry = self.memory.record_access(tile_id)
|
||||
if entry is not None:
|
||||
return
|
||||
if tile_id in self._queued or tile_id in self._loading:
|
||||
return
|
||||
self._queued.add(tile_id)
|
||||
self._tiles[tile_id] = Tile(tile_id, TileStatus.QUEUED, generation=generation)
|
||||
|
||||
headers = dict(provider.headers)
|
||||
if provider.name == "osm":
|
||||
headers = self._headers_with_osm_user_agent(headers)
|
||||
path = tile_cache_path(
|
||||
cache_dir,
|
||||
provider.name,
|
||||
tile_id.z,
|
||||
tile_id.x,
|
||||
tile_id.y,
|
||||
provider.file_extension or "png",
|
||||
)
|
||||
request = TileRequest(
|
||||
tile_id=tile_id,
|
||||
generation=generation,
|
||||
url=provider.build_url(x=tile_id.x, y=tile_id.y, z=tile_id.z),
|
||||
path=path,
|
||||
headers=headers,
|
||||
disk_cache_max_bytes=self.disk_cache_max_bytes,
|
||||
protected_paths=frozenset(path for path in self._visible_disk_paths(cache_dir)),
|
||||
)
|
||||
self._request_queue.put(request)
|
||||
|
||||
def _headers_with_osm_user_agent(self, headers: dict[str, str]) -> dict[str, str]:
|
||||
if any(key.lower() == "user-agent" for key in headers):
|
||||
return headers
|
||||
if self.user_agent:
|
||||
headers["User-Agent"] = self.user_agent
|
||||
return headers
|
||||
with self._lock:
|
||||
should_warn = not self._warned_osm_user_agent
|
||||
self._warned_osm_user_agent = True
|
||||
if should_warn:
|
||||
warnings.warn(
|
||||
"OpenStreetMap tile usage should configure an application-specific user_agent",
|
||||
RuntimeWarning,
|
||||
stacklevel=3,
|
||||
)
|
||||
headers["User-Agent"] = "dpg-map/0.1"
|
||||
return headers
|
||||
|
||||
def _visible_disk_paths(self, cache_dir: str | Path | None) -> list[Path]:
|
||||
paths: list[Path] = []
|
||||
with self._lock:
|
||||
visible = tuple(self._visible_tile_ids)
|
||||
for tile_id in visible:
|
||||
paths.append(
|
||||
tile_cache_path(
|
||||
cache_dir,
|
||||
tile_id.provider_name,
|
||||
tile_id.z,
|
||||
tile_id.x,
|
||||
tile_id.y,
|
||||
)
|
||||
)
|
||||
return paths
|
||||
|
||||
def get_ready_tile(self, tile_id: TileID) -> Tile | None:
|
||||
"""Return a ready tile, updating memory LRU metadata."""
|
||||
|
||||
with self._lock:
|
||||
tile = self._tiles.get(tile_id)
|
||||
if tile is None or tile.status is not TileStatus.READY:
|
||||
return None
|
||||
tile.last_accessed_at = time()
|
||||
self.memory.record_access(tile_id)
|
||||
return tile
|
||||
|
||||
def drain_results(self, *, generation: int, provider_name: str) -> list[Tile]:
|
||||
"""Accept current-generation results and return ready tiles."""
|
||||
|
||||
accepted: list[Tile] = []
|
||||
while True:
|
||||
try:
|
||||
result = self._result_queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
with self._lock:
|
||||
self._queued.discard(result.tile_id)
|
||||
self._loading.discard(result.tile_id)
|
||||
if result.generation != generation or result.tile_id.provider_name != provider_name:
|
||||
self._stale_results += 1
|
||||
continue
|
||||
if result.status is TileStatus.FAILED:
|
||||
self._failed.add(result.tile_id)
|
||||
self._tiles[result.tile_id] = Tile(
|
||||
result.tile_id,
|
||||
TileStatus.FAILED,
|
||||
generation=result.generation,
|
||||
error=result.error,
|
||||
)
|
||||
continue
|
||||
tile = Tile(
|
||||
result.tile_id,
|
||||
TileStatus.READY,
|
||||
generation=result.generation,
|
||||
width=result.width,
|
||||
height=result.height,
|
||||
pixels=result.pixels,
|
||||
source=result.source,
|
||||
)
|
||||
self._tiles[result.tile_id] = tile
|
||||
self.memory.put(
|
||||
MemoryCacheEntry(
|
||||
tile_id=result.tile_id,
|
||||
size_bytes=len(result.pixels) * 4,
|
||||
protected=result.tile_id in self._visible_tile_ids,
|
||||
texture_tag=None,
|
||||
)
|
||||
)
|
||||
accepted.append(tile)
|
||||
self._evict_memory_if_needed()
|
||||
return accepted
|
||||
|
||||
def set_texture_tag(self, tile_id: TileID, texture_tag: object) -> None:
|
||||
"""Record a GUI-thread texture tag for a ready tile."""
|
||||
|
||||
with self._lock:
|
||||
tile = self._tiles.get(tile_id)
|
||||
if tile is not None:
|
||||
tile.texture_tag = texture_tag
|
||||
entry = self.memory.entries.get(tile_id)
|
||||
if entry is not None:
|
||||
entry.texture_tag = texture_tag
|
||||
|
||||
def take_texture_deletions(self) -> list[object]:
|
||||
"""Return texture tags that must be deleted by the GUI thread."""
|
||||
|
||||
with self._lock:
|
||||
tags = list(self._delete_texture_tags)
|
||||
self._delete_texture_tags.clear()
|
||||
return tags
|
||||
|
||||
def clear_memory_cache(self) -> list[object]:
|
||||
"""Clear decoded memory tiles and return texture tags for GUI deletion."""
|
||||
|
||||
with self._lock:
|
||||
tags = [
|
||||
entry.texture_tag
|
||||
for entry in self.memory.entries.values()
|
||||
if entry.texture_tag is not None
|
||||
]
|
||||
self._delete_texture_tags.extend(tags)
|
||||
self.memory.entries.clear()
|
||||
self._tiles.clear()
|
||||
self._queued.clear()
|
||||
self._loading.clear()
|
||||
self._failed.clear()
|
||||
return tags
|
||||
|
||||
def clear_disk_cache(self, cache_dir: str | Path | None) -> None:
|
||||
"""Clear the persistent cache root."""
|
||||
|
||||
clear_disk_cache_path(cache_dir)
|
||||
|
||||
def snapshot(self) -> TileManagerSnapshot:
|
||||
"""Return diagnostic counters."""
|
||||
|
||||
with self._lock:
|
||||
return TileManagerSnapshot(
|
||||
queued_tiles=len(self._queued),
|
||||
loading_tiles=len(self._loading),
|
||||
failed_tiles=len(self._failed),
|
||||
visible_tile_count=len(self._visible_tile_ids),
|
||||
memory_tiles=len(self.memory.entries),
|
||||
memory_hits=self.memory.hits,
|
||||
memory_misses=self.memory.misses,
|
||||
disk_hits=self._disk_hits,
|
||||
disk_misses=self._disk_misses,
|
||||
stale_results=self._stale_results,
|
||||
)
|
||||
|
||||
def _evict_memory_if_needed(self) -> None:
|
||||
with self._lock:
|
||||
evict_ids = self.memory.plan_evictions()
|
||||
for raw_tile_id in evict_ids:
|
||||
tile_id = cast(TileID, raw_tile_id)
|
||||
entry = self.memory.entries.pop(tile_id, None)
|
||||
tile = self._tiles.pop(tile_id, None)
|
||||
texture_tag = None
|
||||
if entry is not None:
|
||||
texture_tag = entry.texture_tag
|
||||
if texture_tag is None and tile is not None:
|
||||
texture_tag = tile.texture_tag
|
||||
if texture_tag is not None:
|
||||
self._delete_texture_tags.append(texture_tag)
|
||||
|
||||
def _worker_loop(self) -> None:
|
||||
while True:
|
||||
request = self._request_queue.get()
|
||||
if request is None:
|
||||
return
|
||||
with self._lock:
|
||||
self._queued.discard(request.tile_id)
|
||||
self._loading.add(request.tile_id)
|
||||
tile = self._tiles.get(request.tile_id)
|
||||
if tile is not None:
|
||||
tile.status = TileStatus.LOADING
|
||||
result = self._load_tile(request)
|
||||
self._result_queue.put(result)
|
||||
|
||||
def _load_tile(self, request: TileRequest) -> TileResult:
|
||||
try:
|
||||
raw, source = self._read_or_fetch(request)
|
||||
width, height, pixels = decode_tile_image(raw)
|
||||
return TileResult(
|
||||
request.tile_id,
|
||||
request.generation,
|
||||
TileStatus.READY,
|
||||
width=width,
|
||||
height=height,
|
||||
pixels=pixels,
|
||||
source=source,
|
||||
)
|
||||
except Exception as exc:
|
||||
return TileResult(
|
||||
request.tile_id,
|
||||
request.generation,
|
||||
TileStatus.FAILED,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
def _read_or_fetch(self, request: TileRequest) -> tuple[bytes, Literal["disk", "network"]]:
|
||||
if request.path.exists():
|
||||
try:
|
||||
raw = request.path.read_bytes()
|
||||
touch_disk_metadata(tile_metadata_path(request.path))
|
||||
except (OSError, CacheError):
|
||||
raw = b""
|
||||
if raw:
|
||||
with self._lock:
|
||||
self._disk_hits += 1
|
||||
return raw, "disk"
|
||||
|
||||
with self._lock:
|
||||
self._disk_misses += 1
|
||||
response = requests.get(request.url, headers=request.headers, timeout=20)
|
||||
response.raise_for_status()
|
||||
raw = response.content
|
||||
downloaded_at = time()
|
||||
try:
|
||||
request.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
request.path.write_bytes(raw)
|
||||
write_disk_metadata(
|
||||
tile_metadata_path(request.path),
|
||||
DiskCacheMetadata(
|
||||
url=request.url,
|
||||
etag=response.headers.get("ETag"),
|
||||
last_modified=response.headers.get("Last-Modified"),
|
||||
expires=response.headers.get("Expires"),
|
||||
downloaded_at=downloaded_at,
|
||||
last_accessed_at=downloaded_at,
|
||||
size_bytes=len(raw),
|
||||
),
|
||||
)
|
||||
prune_disk_cache(
|
||||
request.path.parents[3],
|
||||
request.disk_cache_max_bytes,
|
||||
protected_paths=set(request.protected_paths),
|
||||
)
|
||||
except (OSError, CacheError):
|
||||
pass
|
||||
return raw, "network"
|
||||
|
||||
|
||||
def decode_tile_image(raw: bytes) -> tuple[int, int, tuple[float, ...]]:
|
||||
"""Decode an image into Dear PyGui-compatible RGBA float data."""
|
||||
|
||||
try:
|
||||
image = Image.open(BytesIO(raw)).convert("RGBA")
|
||||
except UnidentifiedImageError as exc:
|
||||
raise ValueError("tile image data could not be decoded") from exc
|
||||
width, height = image.size
|
||||
pixels = tuple(channel / 255.0 for channel in image.tobytes())
|
||||
return width, height, pixels
|
||||
31
src/dpg_map/types.py
Normal file
31
src/dpg_map/types.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""Shared type aliases and small value objects."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import TypeAlias
|
||||
|
||||
Tag: TypeAlias = str | int
|
||||
LatLon: TypeAlias = tuple[float, float]
|
||||
Point: TypeAlias = tuple[float, float]
|
||||
Bounds: TypeAlias = tuple[LatLon, LatLon]
|
||||
TileCoord: TypeAlias = tuple[int, int, int]
|
||||
Color: TypeAlias = tuple[int, int, int] | tuple[int, int, int, int]
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class ScreenPoint:
|
||||
"""A pixel position in map widget coordinates."""
|
||||
|
||||
x: float
|
||||
y: float
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class GeoBounds:
|
||||
"""Latitude/longitude bounds with south-west and north-east corners."""
|
||||
|
||||
south: float
|
||||
west: float
|
||||
north: float
|
||||
east: float
|
||||
73
src/dpg_map/widget.py
Normal file
73
src/dpg_map/widget.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Dear PyGui map widget construction."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .providers import TileProvider
|
||||
from .renderer import MapRenderer
|
||||
from .state import create_map_state, current_map_context
|
||||
from .types import LatLon, Tag
|
||||
|
||||
|
||||
@contextmanager
|
||||
def map_widget(
|
||||
*,
|
||||
tag: Tag | None = None,
|
||||
center: LatLon = (0.0, 0.0),
|
||||
zoom: int = 2,
|
||||
provider: str | TileProvider | None = None,
|
||||
width: int = 0,
|
||||
height: int = 0,
|
||||
autosize_x: bool = False,
|
||||
autosize_y: bool = False,
|
||||
**kwargs: object,
|
||||
) -> Iterator[Tag | None]:
|
||||
"""Create a Dear PyGui child-window map shell and logical map context."""
|
||||
|
||||
cache_dir_value = kwargs.get("cache_dir")
|
||||
cache_dir = cache_dir_value if isinstance(cache_dir_value, str | Path) else None
|
||||
user_agent_value = kwargs.get("user_agent")
|
||||
user_agent = user_agent_value if isinstance(user_agent_value, str) else None
|
||||
state = create_map_state(
|
||||
tag=tag,
|
||||
center=center,
|
||||
zoom=zoom,
|
||||
provider=provider,
|
||||
width=width,
|
||||
height=height,
|
||||
autosize_x=autosize_x,
|
||||
autosize_y=autosize_y,
|
||||
cache_dir=cache_dir,
|
||||
user_agent=user_agent,
|
||||
)
|
||||
import dearpygui.dearpygui as dpg
|
||||
|
||||
child_kwargs: dict[str, Any] = dict(kwargs)
|
||||
child_kwargs.pop("cache_dir", None)
|
||||
child_kwargs.pop("user_agent", None)
|
||||
child_kwargs.setdefault("border", False)
|
||||
child_kwargs.setdefault("no_scrollbar", True)
|
||||
child_kwargs.setdefault("no_scroll_with_mouse", True)
|
||||
|
||||
dpg.add_child_window(
|
||||
tag=state.child_window_tag,
|
||||
width=width,
|
||||
height=height,
|
||||
autosize_x=autosize_x,
|
||||
autosize_y=autosize_y,
|
||||
**child_kwargs,
|
||||
)
|
||||
dpg.add_texture_registry(tag=state.texture_registry_tag)
|
||||
dpg.add_drawlist(1, 1, tag=state.drawlist_tag, parent=state.child_window_tag)
|
||||
|
||||
renderer = MapRenderer(state, dpg)
|
||||
with state.lock:
|
||||
state.renderer = renderer
|
||||
renderer.schedule_next_frame()
|
||||
|
||||
with current_map_context(state.tag):
|
||||
yield state.tag
|
||||
1
tests/.gitkeep
Normal file
1
tests/.gitkeep
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
68
tests/test_cache.py
Normal file
68
tests/test_cache.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from dpg_map.cache import (
|
||||
CacheStats,
|
||||
DiskCacheConfig,
|
||||
DiskCacheMetadata,
|
||||
MemoryCacheConfig,
|
||||
plan_disk_prune,
|
||||
tile_cache_path,
|
||||
write_disk_metadata,
|
||||
)
|
||||
|
||||
|
||||
def test_cache_stats_dataclass_construction() -> None:
|
||||
stats = CacheStats(
|
||||
memory_tiles=3,
|
||||
memory_max_tiles=512,
|
||||
memory_hits=10,
|
||||
memory_misses=2,
|
||||
disk_bytes=1024,
|
||||
disk_max_bytes=None,
|
||||
disk_hits=7,
|
||||
disk_misses=1,
|
||||
disk_path=Path("/tmp/dpg-map-cache"),
|
||||
)
|
||||
|
||||
assert stats.memory_tiles == 3
|
||||
assert stats.disk_max_bytes is None
|
||||
assert stats.disk_path == Path("/tmp/dpg-map-cache")
|
||||
|
||||
|
||||
def test_initial_cache_config_dataclasses() -> None:
|
||||
memory_config = MemoryCacheConfig()
|
||||
disk_config = DiskCacheConfig()
|
||||
|
||||
assert memory_config.max_tiles == 512
|
||||
assert disk_config.max_bytes == 2_000_000_000
|
||||
|
||||
|
||||
def test_disk_cache_path_generation(tmp_path: Path) -> None:
|
||||
path = tile_cache_path(tmp_path, "osm", 4, 8, 9, "jpg")
|
||||
|
||||
assert path == tmp_path / "osm" / "4" / "8" / "9.jpg"
|
||||
|
||||
|
||||
def test_disk_cache_prune_ordering(tmp_path: Path) -> None:
|
||||
first = tile_cache_path(tmp_path, "osm", 1, 1, 1)
|
||||
second = tile_cache_path(tmp_path, "osm", 1, 1, 2)
|
||||
protected = tile_cache_path(tmp_path, "osm", 1, 1, 3)
|
||||
|
||||
for path, accessed_at in [(first, 1.0), (second, 2.0), (protected, 0.0)]:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(b"abcde")
|
||||
write_disk_metadata(
|
||||
path.with_suffix(".json"),
|
||||
DiskCacheMetadata(
|
||||
url=str(path),
|
||||
downloaded_at=accessed_at,
|
||||
last_accessed_at=accessed_at,
|
||||
size_bytes=5,
|
||||
),
|
||||
)
|
||||
|
||||
planned = plan_disk_prune(tmp_path, 5, protected_paths={protected})
|
||||
|
||||
assert planned == [first, second]
|
||||
35
tests/test_commands.py
Normal file
35
tests/test_commands.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dpg_map.commands import CommandKind, MapCommand, MapCommandQueue
|
||||
|
||||
|
||||
def test_overlay_updates_coalesce_by_overlay_tag() -> None:
|
||||
queue = MapCommandQueue()
|
||||
|
||||
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 1.0}))
|
||||
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 2.0}))
|
||||
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "other", "lat": 3.0}))
|
||||
|
||||
drained = queue.drain()
|
||||
|
||||
assert len(drained) == 2
|
||||
assert drained[0].payload == {"tag": "vehicle", "lat": 2.0}
|
||||
assert drained[1].payload == {"tag": "other", "lat": 3.0}
|
||||
|
||||
|
||||
def test_latest_view_update_wins_but_structural_commands_keep_order() -> None:
|
||||
queue = MapCommandQueue()
|
||||
|
||||
queue.put(MapCommand(CommandKind.ADD_OVERLAY, "map", {"tag": "a"}))
|
||||
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 3}))
|
||||
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 4}))
|
||||
queue.put(MapCommand(CommandKind.DELETE_OVERLAY, "map", {"tag": "a"}))
|
||||
|
||||
drained = queue.drain()
|
||||
|
||||
assert [command.kind for command in drained] == [
|
||||
CommandKind.ADD_OVERLAY,
|
||||
CommandKind.SET_VIEW,
|
||||
CommandKind.DELETE_OVERLAY,
|
||||
]
|
||||
assert drained[1].payload == {"zoom": 4}
|
||||
19
tests/test_interaction.py
Normal file
19
tests/test_interaction.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dpg_map.interaction import calculate_hit_rect
|
||||
from dpg_map.sizing import SizeMeasurement, apply_size_measurement
|
||||
from dpg_map.state import create_map_state
|
||||
|
||||
|
||||
def test_hit_rect_uses_effective_map_size() -> None:
|
||||
state = create_map_state(tag="hit-rect")
|
||||
apply_size_measurement(state, SizeMeasurement(width=400, height=250, visible=True))
|
||||
|
||||
rect = calculate_hit_rect(state, (10.0, 20.0))
|
||||
|
||||
assert rect.x == 10.0
|
||||
assert rect.y == 20.0
|
||||
assert rect.width == 400
|
||||
assert rect.height == 250
|
||||
assert rect.contains(410.0, 270.0)
|
||||
assert not rect.contains(411.0, 270.0)
|
||||
63
tests/test_overlays_state.py
Normal file
63
tests/test_overlays_state.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
import dpg_map as dpgm
|
||||
from dpg_map.exceptions import CoordinateError
|
||||
from dpg_map.overlays import TrajectoryOverlay
|
||||
from dpg_map.state import DirtyFlags, create_map_state, get_map_state
|
||||
|
||||
|
||||
def test_overlay_update_does_not_alter_center_or_zoom() -> None:
|
||||
create_map_state(tag="overlay-isolation", center=(47.0, 2.0), zoom=8)
|
||||
dpgm.add_marker("vehicle", lat=47.1, lon=2.1, map_tag="overlay-isolation")
|
||||
|
||||
before_center = dpgm.get_center(map_tag="overlay-isolation")
|
||||
before_zoom = dpgm.get_zoom(map_tag="overlay-isolation")
|
||||
|
||||
dpgm.update_marker("vehicle", lat=47.2, lon=2.2, map_tag="overlay-isolation")
|
||||
|
||||
assert dpgm.get_center(map_tag="overlay-isolation") == before_center
|
||||
assert dpgm.get_zoom(map_tag="overlay-isolation") == before_zoom
|
||||
state = get_map_state("overlay-isolation")
|
||||
assert state.dirty & DirtyFlags.OVERLAYS
|
||||
|
||||
|
||||
def test_trajectory_inputs_are_copied() -> None:
|
||||
create_map_state(tag="trajectory-copy")
|
||||
lats = [1.0, 2.0]
|
||||
lons = [3.0, 4.0]
|
||||
|
||||
dpgm.add_trajectory("track", lats=lats, lons=lons, map_tag="trajectory-copy")
|
||||
lats[0] = 99.0
|
||||
lons[0] = 99.0
|
||||
|
||||
state = get_map_state("trajectory-copy")
|
||||
overlay = state.overlays["track"]
|
||||
|
||||
assert isinstance(overlay, TrajectoryOverlay)
|
||||
assert overlay.points == ((1.0, 3.0), (2.0, 4.0))
|
||||
|
||||
|
||||
def test_mismatched_lat_lon_lengths_raise() -> None:
|
||||
create_map_state(tag="bad-coordinates")
|
||||
|
||||
with pytest.raises(CoordinateError):
|
||||
dpgm.add_trajectory("track", lats=[1.0], lons=[2.0, 3.0], map_tag="bad-coordinates")
|
||||
|
||||
|
||||
def test_layer_state_tracks_visibility_and_overlay_membership() -> None:
|
||||
create_map_state(tag="layers")
|
||||
|
||||
dpgm.add_layer("fleet", map_tag="layers")
|
||||
dpgm.add_marker("vehicle", lat=1.0, lon=2.0, layer="fleet", map_tag="layers")
|
||||
dpgm.hide_layer("fleet", map_tag="layers")
|
||||
|
||||
state = get_map_state("layers")
|
||||
assert state.layers["fleet"].show is False
|
||||
assert state.layers["fleet"].overlay_tags == {"vehicle"}
|
||||
|
||||
dpgm.clear_layer("fleet", map_tag="layers")
|
||||
|
||||
assert state.layers["fleet"].overlay_tags == set()
|
||||
assert "vehicle" not in state.overlays
|
||||
44
tests/test_projection.py
Normal file
44
tests/test_projection.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from dpg_map.projection import (
|
||||
WEB_MERCATOR_MAX_LAT,
|
||||
clamp_latitude,
|
||||
latlon_to_tile,
|
||||
latlon_to_world,
|
||||
screen_to_world,
|
||||
world_to_latlon,
|
||||
world_to_screen,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("lat", "lon", "zoom"),
|
||||
[
|
||||
(0.0, 0.0, 0),
|
||||
(47.9029, 1.9093, 15),
|
||||
(-33.8688, 151.2093, 10),
|
||||
(WEB_MERCATOR_MAX_LAT, 179.999, 4),
|
||||
],
|
||||
)
|
||||
def test_projection_roundtrip(lat: float, lon: float, zoom: int) -> None:
|
||||
x, y = latlon_to_world(lat, lon, zoom)
|
||||
roundtrip_lat, roundtrip_lon = world_to_latlon(x, y, zoom)
|
||||
|
||||
assert roundtrip_lat == pytest.approx(clamp_latitude(lat), abs=1e-7)
|
||||
assert roundtrip_lon == pytest.approx(lon, abs=1e-7)
|
||||
|
||||
|
||||
def test_latlon_to_tile_returns_xyz_coordinate() -> None:
|
||||
assert latlon_to_tile(0.0, 0.0, 1) == (1, 1, 1)
|
||||
|
||||
|
||||
def test_world_screen_roundtrip() -> None:
|
||||
center = (47.9029, 1.9093)
|
||||
world = latlon_to_world(47.91, 1.92, 14)
|
||||
screen = world_to_screen(*world, center=center, zoom=14, width=800, height=600)
|
||||
|
||||
assert screen_to_world(*screen, center=center, zoom=14, width=800, height=600) == pytest.approx(
|
||||
world
|
||||
)
|
||||
71
tests/test_providers.py
Normal file
71
tests/test_providers.py
Normal file
@@ -0,0 +1,71 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
import dpg_map as dpgm
|
||||
from dpg_map.exceptions import InvalidProviderError, ProviderExistsError, ProviderNotFoundError
|
||||
|
||||
|
||||
def test_default_provider_is_registered() -> None:
|
||||
provider = dpgm.get_provider("osm")
|
||||
|
||||
assert provider.name == "osm"
|
||||
assert "osm" in dpgm.list_providers()
|
||||
assert provider.build_url(x=1, y=2, z=3) == "https://tile.openstreetmap.org/3/1/2.png"
|
||||
|
||||
|
||||
def test_provider_registration_roundtrip() -> None:
|
||||
provider = dpgm.TileProvider(
|
||||
name="unit-test-provider",
|
||||
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
|
||||
attribution="Example",
|
||||
)
|
||||
|
||||
dpgm.register_provider(provider)
|
||||
try:
|
||||
assert dpgm.get_provider("unit-test-provider") == provider
|
||||
assert "unit-test-provider" in dpgm.list_providers()
|
||||
finally:
|
||||
dpgm.unregister_provider("unit-test-provider")
|
||||
|
||||
with pytest.raises(ProviderNotFoundError):
|
||||
dpgm.get_provider("unit-test-provider")
|
||||
|
||||
|
||||
def test_duplicate_provider_registration_fails() -> None:
|
||||
provider = dpgm.TileProvider(
|
||||
name="unit-test-duplicate",
|
||||
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
|
||||
)
|
||||
|
||||
dpgm.register_provider(provider)
|
||||
try:
|
||||
with pytest.raises(ProviderExistsError):
|
||||
dpgm.register_provider(provider)
|
||||
finally:
|
||||
dpgm.unregister_provider("unit-test-duplicate")
|
||||
|
||||
|
||||
def test_provider_url_building_with_subdomains_retina_and_extension() -> None:
|
||||
provider = dpgm.TileProvider(
|
||||
name="unit-test-template",
|
||||
url_template="https://{s}.tiles.example.test/{z}/{x}/{y}{r}.{ext}",
|
||||
subdomains=("a", "b", "c"),
|
||||
retina=True,
|
||||
file_extension="webp",
|
||||
)
|
||||
|
||||
assert provider.build_url(x=1, y=2, z=3) == "https://a.tiles.example.test/3/1/2@2x.webp"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"template",
|
||||
[
|
||||
"https://tiles.example.test/{z}/{x}.png",
|
||||
"https://tiles.example.test/{z}/{x}/{y}/{quadkey}.png",
|
||||
"",
|
||||
],
|
||||
)
|
||||
def test_invalid_provider_templates_fail(template: str) -> None:
|
||||
with pytest.raises(InvalidProviderError):
|
||||
dpgm.TileProvider(name="bad-template", url_template=template)
|
||||
48
tests/test_public_api.py
Normal file
48
tests/test_public_api.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def test_package_exports_required_public_api() -> None:
|
||||
import dpg_map as dpgm
|
||||
|
||||
expected = {
|
||||
"configure",
|
||||
"TileProvider",
|
||||
"register_provider",
|
||||
"unregister_provider",
|
||||
"get_provider",
|
||||
"list_providers",
|
||||
"map_widget",
|
||||
"set_center",
|
||||
"get_center",
|
||||
"set_zoom",
|
||||
"get_zoom",
|
||||
"set_view",
|
||||
"fit_bounds",
|
||||
"screen_to_latlon",
|
||||
"latlon_to_screen",
|
||||
"add_marker",
|
||||
"add_polyline",
|
||||
"add_trajectory",
|
||||
"update_marker",
|
||||
"update_polyline",
|
||||
"update_trajectory",
|
||||
"set_marker_position",
|
||||
"set_marker_label",
|
||||
"set_polyline_points",
|
||||
"set_overlay_show",
|
||||
"delete_overlay",
|
||||
"add_layer",
|
||||
"show_layer",
|
||||
"hide_layer",
|
||||
"clear_layer",
|
||||
"clear_map",
|
||||
"set_provider",
|
||||
"clear_memory_cache",
|
||||
"clear_disk_cache",
|
||||
"get_cache_stats",
|
||||
"get_map_debug_state",
|
||||
}
|
||||
|
||||
assert set(dpgm.__all__) == expected
|
||||
for name in expected:
|
||||
assert hasattr(dpgm, name)
|
||||
30
tests/test_renderer.py
Normal file
30
tests/test_renderer.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dpg_map.commands import CommandKind, MapCommand
|
||||
from dpg_map.renderer import drain_renderer_commands
|
||||
from dpg_map.state import DirtyFlags, create_map_state
|
||||
|
||||
|
||||
def test_renderer_command_drain_preserves_structural_order_and_coalesces() -> None:
|
||||
state = create_map_state(tag="renderer-drain")
|
||||
state.dirty = DirtyFlags.NONE
|
||||
|
||||
state.command_queue.put(MapCommand(CommandKind.ADD_OVERLAY, state.tag, {"tag": "a"}))
|
||||
state.command_queue.put(MapCommand(CommandKind.SET_VIEW, state.tag, {"zoom": 3}))
|
||||
state.command_queue.put(MapCommand(CommandKind.SET_VIEW, state.tag, {"zoom": 4}))
|
||||
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a", "v": 1}))
|
||||
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a", "v": 2}))
|
||||
state.command_queue.put(MapCommand(CommandKind.DELETE_OVERLAY, state.tag, {"tag": "a"}))
|
||||
|
||||
commands = drain_renderer_commands(state)
|
||||
|
||||
assert [command.kind for command in commands] == [
|
||||
CommandKind.ADD_OVERLAY,
|
||||
CommandKind.SET_VIEW,
|
||||
CommandKind.UPDATE_OVERLAY,
|
||||
CommandKind.DELETE_OVERLAY,
|
||||
]
|
||||
assert commands[1].payload == {"zoom": 4}
|
||||
assert commands[2].payload == {"tag": "a", "v": 2}
|
||||
assert state.dirty & DirtyFlags.VIEW
|
||||
assert state.dirty & DirtyFlags.OVERLAYS
|
||||
44
tests/test_sizing.py
Normal file
44
tests/test_sizing.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dpg_map.sizing import SizeMeasurement, apply_size_measurement, effective_draw_size
|
||||
from dpg_map.state import DirtyFlags, create_map_state
|
||||
|
||||
|
||||
def test_size_measurement_tracks_last_nonzero_size() -> None:
|
||||
state = create_map_state(tag="size-last-nonzero")
|
||||
|
||||
first = apply_size_measurement(state, SizeMeasurement(width=640, height=360, visible=True))
|
||||
hidden = apply_size_measurement(state, SizeMeasurement(width=0, height=0, visible=False))
|
||||
|
||||
assert first.changed is True
|
||||
assert state.measured_width == 0
|
||||
assert state.measured_height == 0
|
||||
assert state.last_nonzero_width == 640
|
||||
assert state.last_nonzero_height == 360
|
||||
assert hidden.became_hidden is True
|
||||
assert effective_draw_size(state) == (640, 360)
|
||||
|
||||
|
||||
def test_zero_size_does_not_permanently_collapse_map() -> None:
|
||||
state = create_map_state(tag="size-reappears")
|
||||
|
||||
apply_size_measurement(state, SizeMeasurement(width=500, height=300, visible=True))
|
||||
apply_size_measurement(state, SizeMeasurement(width=0, height=0, visible=False))
|
||||
update = apply_size_measurement(state, SizeMeasurement(width=700, height=450, visible=True))
|
||||
|
||||
assert update.became_visible is True
|
||||
assert update.effective_width == 700
|
||||
assert update.effective_height == 450
|
||||
assert state.last_nonzero_width == 700
|
||||
assert state.last_nonzero_height == 450
|
||||
|
||||
|
||||
def test_resize_marks_size_dirty() -> None:
|
||||
state = create_map_state(tag="size-dirty")
|
||||
state.dirty = DirtyFlags.NONE
|
||||
|
||||
apply_size_measurement(state, SizeMeasurement(width=320, height=240, visible=True))
|
||||
|
||||
assert state.dirty & DirtyFlags.SIZE
|
||||
assert state.dirty & DirtyFlags.TILES
|
||||
assert state.dirty & DirtyFlags.OVERLAYS
|
||||
87
tests/test_tiles.py
Normal file
87
tests/test_tiles.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from dpg_map.providers import OSM
|
||||
from dpg_map.tiles import (
|
||||
TileID,
|
||||
TileManager,
|
||||
TileResult,
|
||||
TileStatus,
|
||||
calculate_visible_tiles,
|
||||
decode_tile_image,
|
||||
)
|
||||
|
||||
|
||||
def test_visible_tile_calculation_uses_provider_namespace() -> None:
|
||||
tiles = calculate_visible_tiles(
|
||||
center=(0.0, 0.0),
|
||||
zoom=2,
|
||||
width=256,
|
||||
height=256,
|
||||
provider=OSM,
|
||||
margin=0,
|
||||
)
|
||||
|
||||
assert tiles
|
||||
assert {tile.tile_id.provider_name for tile in tiles} == {"osm"}
|
||||
assert all(0 <= tile.tile_id.x <= 3 for tile in tiles)
|
||||
assert all(0 <= tile.tile_id.y <= 3 for tile in tiles)
|
||||
|
||||
|
||||
def test_tile_manager_ignores_stale_generation_results() -> None:
|
||||
manager = TileManager()
|
||||
stale = TileResult(
|
||||
TileID("osm", 1, 0, 0),
|
||||
generation=1,
|
||||
status=TileStatus.READY,
|
||||
width=1,
|
||||
height=1,
|
||||
pixels=(1.0, 1.0, 1.0, 1.0),
|
||||
source="disk",
|
||||
)
|
||||
manager._result_queue.put(stale)
|
||||
|
||||
accepted = manager.drain_results(generation=2, provider_name="osm")
|
||||
|
||||
assert accepted == []
|
||||
assert manager.snapshot().stale_results == 1
|
||||
|
||||
|
||||
def test_memory_eviction_protects_visible_tiles() -> None:
|
||||
manager = TileManager(memory_cache_max_tiles=1)
|
||||
protected = TileID("osm", 1, 0, 0)
|
||||
evictable = TileID("osm", 1, 0, 1)
|
||||
with manager._lock:
|
||||
manager._visible_tile_ids = {protected}
|
||||
|
||||
for tile_id in (protected, evictable):
|
||||
manager._result_queue.put(
|
||||
TileResult(
|
||||
tile_id,
|
||||
generation=1,
|
||||
status=TileStatus.READY,
|
||||
width=1,
|
||||
height=1,
|
||||
pixels=(1.0, 1.0, 1.0, 1.0),
|
||||
source="disk",
|
||||
)
|
||||
)
|
||||
|
||||
manager.drain_results(generation=1, provider_name="osm")
|
||||
|
||||
assert manager.get_ready_tile(protected) is not None
|
||||
assert manager.get_ready_tile(evictable) is None
|
||||
|
||||
|
||||
def test_decode_png_tile_image() -> None:
|
||||
image = Image.new("RGBA", (1, 1), (255, 0, 128, 255))
|
||||
buffer = BytesIO()
|
||||
image.save(buffer, format="PNG")
|
||||
|
||||
width, height, pixels = decode_tile_image(buffer.getvalue())
|
||||
|
||||
assert (width, height) == (1, 1)
|
||||
assert pixels == (1.0, 0.0, 128 / 255, 1.0)
|
||||
Reference in New Issue
Block a user