Compare commits

..

5 Commits

38 changed files with 3556 additions and 1 deletions

74
AGENTS.md Normal file
View File

@@ -0,0 +1,74 @@
# AGENTS.md
## Current status
Step 4 complete.
## Completed steps
Step 1 - Public API contract and pure core.
Step 2 - Thread-safe state, commands, overlays, and cache model.
Step 3 - Widget shell, sizing system, and GUI-thread frame pump.
Step 4 - Tile manager, persistent cache, and asynchronous loading.
## Current step
Step 5 - Interaction: pan, zoom, and view commands.
## Design decisions
- Package is managed with uv.
- Public import is `import dpg_map as dpgm`.
- Dear PyGui calls are GUI-thread-only.
- Runtime public calls enqueue commands or update logical state.
- Overlay updates must not reset center/zoom.
- The widget uses child_window + measured-size drawlist.
- Tiles use a memory cache and persistent disk cache.
- Tile providers are interchangeable.
## Known issues
None yet.
## Commands used
- Read `STEPS.md`, `FEATURES.md`, and `ARCHITECTURE.md`.
- Created initial package, examples, tests, and agent-log structure.
- Implemented public exports, exceptions, common types, tile provider registry, projection helpers, cache dataclasses, and GUI-dependent API stubs.
- Added Step 1 tests for imports, providers, projection, and cache dataclasses.
- Implemented global configuration, logical MapState, map registry, and current map context stack.
- Implemented DirtyFlags, MapCommand, CommandKind, and coalescing MapCommandQueue.
- Implemented logical marker, polyline, trajectory, and layer state models.
- Implemented public runtime overlay/view/layer/provider/cache/debug wrappers against logical state without Dear PyGui calls.
- Implemented memory cache metadata, disk cache path generation, metadata read/write, disk size scanning, and prune planning.
- Added Step 2 tests for command coalescing, overlay/view isolation, copied trajectory inputs, coordinate length validation, layer state, disk path generation, and prune ordering.
- Implemented `map_widget(...)` as a Dear PyGui child-window plus drawlist shell.
- Implemented GUI-thread renderer frame pump that schedules frame callbacks, drains command queues, measures size, resizes the drawlist, and draws a placeholder background/attribution.
- Implemented sizing helpers for measured size, last non-zero size preservation, visibility transitions, effective draw size, and resize dirty flags.
- Implemented map interaction hit-rectangle calculation.
- Implemented TileID, TileStatus, Tile, visible tile calculation, and TileManager.
- Implemented asynchronous tile worker queue for disk reads, HTTP fetches, and image decoding.
- Implemented provider-namespaced persistent cache writes, access metadata updates, clearing, and LRU pruning.
- Implemented memory tile cache with visible-tile protection and deferred GUI-thread texture deletion.
- Integrated tile result processing, stale generation/provider rejection, texture creation, and tile drawing into the GUI-thread renderer.
- Added OpenStreetMap User-Agent warning/fallback and configured examples with example user agents.
- Added Step 3 examples for basic map, window sizing, child-window sizing, table sizing, and hidden-tab sizing.
- Added Step 4 cache stress example.
- Added Step 3 tests for sizing transitions, zero-size preservation, resize dirty flags, command drain ordering, and hit rectangles.
- Added Step 4 tests for visible tile calculation, stale result rejection, protected memory eviction, and tile image decoding.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist creation.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry creation.
## Next action
Implement Step 5.

View File

@@ -0,0 +1,51 @@
# dpg-map
`dpg-map` is a Dear PyGui map widget package under rebuild.
The Step 4 tile manager is in place:
```python
import dpg_map as dpgm
provider = dpgm.TileProvider(
name="custom",
url_template="https://example.com/{z}/{x}/{y}.png",
attribution="Tiles (c) Example",
)
dpgm.register_provider(provider)
with dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15):
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093)
dpgm.update_marker("vehicle", lat=47.9030, lon=1.9094, map_tag="map")
```
Implemented so far:
- public package exports
- tile provider definitions and registry
- Web Mercator projection helpers
- thread-safe logical map state and map registry
- command queue with coalescing for overlay and view updates
- logical marker, polyline, trajectory, and layer models
- persistent disk cache paths, metadata, scanning, pruning, and clearing
- Dear PyGui `child_window` + measured-size `drawlist` widget shell
- GUI-thread frame pump that drains commands, manages textures, and draws raster tiles
- sizing helpers that preserve the last non-zero size across hidden layouts
- interaction hit-rectangle calculation for the measured map area
- asynchronous tile workers that read disk cache, fetch HTTP, and decode images
- memory cache with visible-tile protection and GUI-thread texture deletion
Overlay drawing is not implemented yet. Step 5 will add pan/zoom interaction and
view command projection.
Examples:
```bash
uv run python examples/basic_map.py
uv run python examples/sizing_window.py
uv run python examples/sizing_child.py
uv run python examples/sizing_table.py
uv run python examples/hidden_tab.py
uv run python examples/cache_stress.py
```

1
examples/.gitkeep Normal file
View File

@@ -0,0 +1 @@

29
examples/basic_map.py Normal file
View File

@@ -0,0 +1,29 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map basic example")
dpg.create_context()
dpg.create_viewport(title="dpg-map basic", width=900, height=600)
with (
dpg.window(label="Map", width=-1, height=-1),
dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15, width=-1, height=-1),
):
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

43
examples/cache_stress.py Normal file
View File

@@ -0,0 +1,43 @@
from pathlib import Path
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
cache_dir = Path(__file__).resolve().parent / ".tile-cache"
dpgm.configure(
cache_dir=cache_dir,
memory_cache_max_tiles=32,
disk_cache_max_bytes=30_000_000,
prefetch_margin_tiles=1,
user_agent="dpg-map cache_stress example",
)
dpg.create_context()
dpg.create_viewport(title="dpg-map cache stress", width=1000, height=700)
with (
dpg.window(label="Cache Stress", width=-1, height=-1),
dpgm.map_widget(
tag="cache-map",
center=(47.9029, 1.9093),
zoom=14,
width=-1,
height=-1,
),
):
dpgm.add_marker("start", lat=47.9029, lon=1.9093, label="Orleans")
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

27
examples/hidden_tab.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map hidden tab", width=900, height=600)
with dpg.window(label="Hidden Tab Sizing", width=-1, height=-1), dpg.tab_bar():
with dpg.tab(label="First"):
dpg.add_text("Switch to the map tab.")
with dpg.tab(label="Map"), dpgm.map_widget(tag="map-hidden-tab", width=-1, height=500):
dpgm.add_marker("tab-marker", lat=35.0, lon=139.0)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

28
examples/sizing_child.py Normal file
View File

@@ -0,0 +1,28 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map child sizing", width=900, height=600)
with (
dpg.window(label="Nested Child", width=-1, height=-1),
dpg.child_window(width=-1, height=420),
dpgm.map_widget(tag="map-child", width=-1, height=-1),
):
dpgm.add_marker("inside-child", lat=47.0, lon=2.0)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

33
examples/sizing_table.py Normal file
View File

@@ -0,0 +1,33 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map table sizing", width=1000, height=600)
with (
dpg.window(label="Table Layout", width=-1, height=-1),
dpg.table(header_row=True, resizable=True, policy=dpg.mvTable_SizingStretchProp),
):
dpg.add_table_column(label="Map")
dpg.add_table_column(label="Controls")
with dpg.table_row():
with dpg.table_cell(), dpgm.map_widget(tag="map-table", width=-1, height=500):
dpgm.add_marker("table-marker", lat=51.5, lon=-0.1)
with dpg.table_cell():
dpg.add_text("Resize the window and table columns.")
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

27
examples/sizing_window.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map window sizing", width=900, height=600)
with (
dpg.window(label="Fill Window", width=-1, height=-1),
dpgm.map_widget(tag="map-window", width=-1, height=-1),
):
dpgm.add_marker("center", lat=0.0, lon=0.0)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -27,3 +27,13 @@ dev = [
"pytest>=9.0.3", "pytest>=9.0.3",
"ruff>=0.15.14", "ruff>=0.15.14",
] ]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
[tool.pyright]
typeCheckingMode = "basic"

View File

@@ -1,2 +1,87 @@
"""Public API for dpg-map."""
from .api import (
add_layer,
add_marker,
add_polyline,
add_trajectory,
clear_disk_cache,
clear_layer,
clear_map,
clear_memory_cache,
configure,
delete_overlay,
fit_bounds,
get_cache_stats,
get_center,
get_map_debug_state,
get_zoom,
hide_layer,
latlon_to_screen,
screen_to_latlon,
set_center,
set_marker_label,
set_marker_position,
set_overlay_show,
set_polyline_points,
set_provider,
set_view,
set_zoom,
show_layer,
update_marker,
update_polyline,
update_trajectory,
)
from .providers import (
TileProvider,
get_provider,
list_providers,
register_provider,
unregister_provider,
)
from .widget import map_widget
__all__ = [
"TileProvider",
"add_layer",
"add_marker",
"add_polyline",
"add_trajectory",
"clear_disk_cache",
"clear_layer",
"clear_map",
"clear_memory_cache",
"configure",
"delete_overlay",
"fit_bounds",
"get_cache_stats",
"get_center",
"get_map_debug_state",
"get_provider",
"get_zoom",
"hide_layer",
"latlon_to_screen",
"list_providers",
"map_widget",
"register_provider",
"screen_to_latlon",
"set_center",
"set_marker_label",
"set_marker_position",
"set_overlay_show",
"set_polyline_points",
"set_provider",
"set_view",
"set_zoom",
"show_layer",
"unregister_provider",
"update_marker",
"update_polyline",
"update_trajectory",
]
def main() -> None: def main() -> None:
print("Hello from dpg-map!") """Console entry point placeholder."""
print("dpg-map")

537
src/dpg_map/api.py Normal file
View File

@@ -0,0 +1,537 @@
"""Public API wrappers for dpg-map."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import asdict
from math import isfinite
from pathlib import Path
from typing import Any
from .cache import CacheStats, clear_disk_cache_path, disk_cache_root, disk_cache_size_bytes
from .commands import CommandKind, MapCommand
from .exceptions import CoordinateError, OverlayNotFoundError
from .overlays import LayerState, MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
from .providers import TileProvider, get_provider
from .state import (
DirtyFlags,
configure_state,
find_map_for_overlay,
get_config,
get_map_state,
mark_dirty,
)
from .types import Bounds, LatLon, Point, Tag
def _validate_latlon(lat: float, lon: float) -> LatLon:
lat_value = float(lat)
lon_value = float(lon)
if not isfinite(lat_value) or not isfinite(lon_value):
raise CoordinateError("coordinates must be finite numbers")
if lat_value < -90.0 or lat_value > 90.0:
raise CoordinateError("latitude must be between -90 and 90")
if lon_value < -180.0 or lon_value > 180.0:
raise CoordinateError("longitude must be between -180 and 180")
return (lat_value, lon_value)
def _points_from_inputs(
points: Sequence[LatLon] | None = None,
*,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
) -> tuple[LatLon, ...]:
if points is not None and (lats is not None or lons is not None):
raise CoordinateError("provide either points or lats/lons, not both")
if points is not None:
return tuple(_validate_latlon(lat, lon) for lat, lon in points)
if lats is None and lons is None:
return ()
if lats is None or lons is None:
raise CoordinateError("lats and lons must be provided together")
lat_values = tuple(lats)
lon_values = tuple(lons)
if len(lat_values) != len(lon_values):
raise CoordinateError("lats and lons must have the same length")
return tuple(
_validate_latlon(lat, lon) for lat, lon in zip(lat_values, lon_values, strict=True)
)
def _ensure_layer(state: Any, layer_name: str, z_index: int = 0, show: bool = True) -> LayerState:
layer = state.layers.get(layer_name)
if layer is None:
layer = LayerState(layer_name, z_index=z_index, show=show)
state.layers[layer_name] = layer
return layer
def _overlay_payload(overlay: Overlay) -> dict[str, Any]:
return {"tag": overlay.tag, "overlay": asdict(overlay)}
def _queue(state: Any, kind: CommandKind, payload: dict[str, Any]) -> None:
state.command_queue.put(MapCommand(kind=kind, map_tag=state.tag, payload=payload))
def configure(
*,
user_agent: str | None = None,
cache_dir: str | Path | None = None,
default_provider: str | TileProvider = "osm",
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
prefetch_margin_tiles: int = 1,
tile_worker_count: int = 4,
overlay_update_policy: str = "coalesce",
debug: bool = False,
) -> None:
configure_state(
user_agent=user_agent,
cache_dir=cache_dir,
default_provider=default_provider,
memory_cache_max_tiles=memory_cache_max_tiles,
disk_cache_max_bytes=disk_cache_max_bytes,
prefetch_margin_tiles=prefetch_margin_tiles,
tile_worker_count=tile_worker_count,
overlay_update_policy=overlay_update_policy,
debug=debug,
)
def set_center(lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
set_view(center=_validate_latlon(lat, lon), map_tag=map_tag)
def get_center(*, map_tag: Tag | None = None) -> LatLon:
state = get_map_state(map_tag)
with state.lock:
return state.center
def set_zoom(zoom: int, *, map_tag: Tag | None = None) -> None:
set_view(zoom=zoom, map_tag=map_tag)
def get_zoom(*, map_tag: Tag | None = None) -> int:
state = get_map_state(map_tag)
with state.lock:
return state.zoom
def set_view(
*,
center: LatLon | None = None,
zoom: int | None = None,
map_tag: Tag | None = None,
) -> None:
state = get_map_state(map_tag)
with state.lock:
payload: dict[str, Any] = {}
if center is not None:
state.center = _validate_latlon(center[0], center[1])
payload["center"] = state.center
if zoom is not None:
state.zoom = max(state.min_zoom, min(state.max_zoom, int(zoom)))
payload["zoom"] = state.zoom
if not payload:
return
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_VIEW, payload)
def fit_bounds(bounds: Bounds, *, map_tag: Tag | None = None) -> None:
(south_west, north_east) = bounds
south, west = _validate_latlon(south_west[0], south_west[1])
north, east = _validate_latlon(north_east[0], north_east[1])
set_center((south + north) / 2.0, (west + east) / 2.0, map_tag=map_tag)
def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon:
_ = (x, y)
return get_center(map_tag=map_tag)
def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point:
_validate_latlon(lat, lon)
state = get_map_state(map_tag)
with state.lock:
return (state.measured_width / 2.0, state.measured_height / 2.0)
def add_marker(
tag: Tag,
*,
lat: float,
lon: float,
label: str | None = None,
layer: str = "default",
show: bool = True,
map_tag: Tag | None = None,
**kwargs: Any,
) -> Tag:
state = get_map_state(map_tag)
color = kwargs.get("color", (255, 80, 80, 255))
radius = float(kwargs.get("radius", 5.0))
show_label = bool(kwargs.get("show_label", False))
user_data = kwargs.get("user_data")
callback = kwargs.get("callback")
with state.lock:
marker = MarkerOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=user_data,
lat=_validate_latlon(lat, lon)[0],
lon=_validate_latlon(lat, lon)[1],
label=label,
color=color,
radius=radius,
show_label=show_label,
callback=callback,
)
state.overlays[tag] = marker
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(marker))
return tag
def add_polyline(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
layer: str = "default",
show: bool = True,
map_tag: Tag | None = None,
**kwargs: Any,
) -> Tag:
state = get_map_state(map_tag)
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
with state.lock:
polyline = PolylineOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=kwargs.get("user_data"),
points=copied_points,
color=kwargs.get("color", (80, 180, 255, 255)),
thickness=float(kwargs.get("thickness", 2.0)),
closed=bool(kwargs.get("closed", False)),
simplify=bool(kwargs.get("simplify", True)),
)
state.overlays[tag] = polyline
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(polyline))
return tag
def add_trajectory(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
layer: str = "default",
show: bool = True,
map_tag: Tag | None = None,
**kwargs: Any,
) -> Tag:
state = get_map_state(map_tag)
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
timestamps = kwargs.get("timestamps")
copied_timestamps = tuple(timestamps) if timestamps is not None else None
if copied_timestamps is not None and len(copied_timestamps) != len(copied_points):
raise CoordinateError("timestamps must have the same length as trajectory points")
point_stride = int(kwargs.get("point_stride", 1))
if point_stride < 1:
raise ValueError("point_stride must be >= 1")
with state.lock:
trajectory = TrajectoryOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=kwargs.get("user_data"),
points=copied_points,
timestamps=copied_timestamps,
color=kwargs.get("color", (255, 180, 60, 255)),
thickness=float(kwargs.get("thickness", 2.0)),
show_points=bool(kwargs.get("show_points", False)),
point_stride=point_stride,
)
state.overlays[tag] = trajectory
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(trajectory))
return tag
def update_marker(
tag: Tag,
*,
lat: float | None = None,
lon: float | None = None,
label: str | None = None,
map_tag: Tag | None = None,
**kwargs: Any,
) -> None:
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, MarkerOverlay):
raise OverlayNotFoundError(f"marker not found: {tag}")
if lat is not None or lon is not None:
overlay.lat, overlay.lon = _validate_latlon(
overlay.lat if lat is None else lat,
overlay.lon if lon is None else lon,
)
if label is not None:
overlay.label = label
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "radius" in kwargs:
overlay.radius = float(kwargs["radius"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def update_polyline(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
map_tag: Tag | None = None,
**kwargs: Any,
) -> None:
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, PolylineOverlay):
raise OverlayNotFoundError(f"polyline not found: {tag}")
if points is not None or lats is not None or lons is not None:
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "thickness" in kwargs:
overlay.thickness = float(kwargs["thickness"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def update_trajectory(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
map_tag: Tag | None = None,
**kwargs: Any,
) -> None:
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, TrajectoryOverlay):
raise OverlayNotFoundError(f"trajectory not found: {tag}")
if points is not None or lats is not None or lons is not None:
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
if "timestamps" in kwargs:
timestamps = kwargs["timestamps"]
overlay.timestamps = tuple(timestamps) if timestamps is not None else None
if overlay.timestamps is not None and len(overlay.timestamps) != len(overlay.points):
raise CoordinateError("timestamps must have the same length as trajectory points")
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "thickness" in kwargs:
overlay.thickness = float(kwargs["thickness"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def set_marker_position(tag: Tag, lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
update_marker(tag, lat=lat, lon=lon, map_tag=map_tag)
def set_marker_label(tag: Tag, label: str, *, map_tag: Tag | None = None) -> None:
update_marker(tag, label=label, map_tag=map_tag)
def set_polyline_points(
tag: Tag,
points: Sequence[LatLon],
*,
map_tag: Tag | None = None,
) -> None:
update_polyline(tag, points=points, map_tag=map_tag)
def set_overlay_show(tag: Tag, show: bool, *, map_tag: Tag | None = None) -> None:
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if overlay is None:
raise OverlayNotFoundError(f"overlay not found: {tag}")
overlay.show = bool(show)
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def delete_overlay(tag: Tag, *, map_tag: Tag | None = None) -> None:
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.pop(tag, None)
if overlay is None:
raise OverlayNotFoundError(f"overlay not found: {tag}")
layer = state.layers.get(overlay.layer)
if layer is not None:
layer.overlay_tags.discard(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.DELETE_OVERLAY, {"tag": tag})
def add_layer(name: str, *, show: bool = True, map_tag: Tag | None = None) -> None:
state = get_map_state(map_tag)
with state.lock:
layer = _ensure_layer(state, name, z_index=len(state.layers), show=show)
layer.show = show
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_LAYER, {"name": name, "show": show})
def show_layer(name: str, *, map_tag: Tag | None = None) -> None:
state = get_map_state(map_tag)
with state.lock:
_ensure_layer(state, name).show = True
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": True})
def hide_layer(name: str, *, map_tag: Tag | None = None) -> None:
state = get_map_state(map_tag)
with state.lock:
_ensure_layer(state, name).show = False
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": False})
def clear_layer(name: str, *, map_tag: Tag | None = None) -> None:
state = get_map_state(map_tag)
with state.lock:
layer = _ensure_layer(state, name)
for overlay_tag in tuple(layer.overlay_tags):
state.overlays.pop(overlay_tag, None)
layer.overlay_tags.clear()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.CLEAR_LAYER, {"name": name})
def clear_map(*, map_tag: Tag | None = None) -> None:
state = get_map_state(map_tag)
with state.lock:
state.overlays.clear()
for layer in state.layers.values():
layer.overlay_tags.clear()
state.generation += 1
mark_dirty(state, DirtyFlags.OVERLAYS | DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_MAP, {})
def set_provider(provider: str | TileProvider, *, map_tag: Tag | None = None) -> None:
provider_obj = get_provider(provider) if isinstance(provider, str) else provider
state = get_map_state(map_tag)
with state.lock:
state.provider = provider_obj
state.min_zoom = max(state.min_zoom, provider_obj.min_zoom)
state.max_zoom = min(state.max_zoom, provider_obj.max_zoom)
state.zoom = max(state.min_zoom, min(state.max_zoom, state.zoom))
state.generation += 1
mark_dirty(state, DirtyFlags.PROVIDER | DirtyFlags.TILES)
_queue(state, CommandKind.SET_PROVIDER, {"provider": provider_obj.name})
def clear_memory_cache(*, map_tag: Tag | None = None) -> None:
state = get_map_state(map_tag)
with state.lock:
mark_dirty(state, DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_MEMORY_CACHE, {})
def clear_disk_cache(*, map_tag: Tag | None = None) -> None:
if map_tag is None:
clear_disk_cache_path(get_config().cache_dir)
return
state = get_map_state(map_tag)
with state.lock:
mark_dirty(state, DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_DISK_CACHE, {})
def get_cache_stats(*, map_tag: Tag | None = None) -> CacheStats:
config = get_config()
if map_tag is None:
cache_dir = config.cache_dir
return CacheStats(
memory_max_tiles=config.memory_cache_max_tiles,
disk_bytes=disk_cache_size_bytes(cache_dir),
disk_max_bytes=config.disk_cache_max_bytes,
disk_path=disk_cache_root(cache_dir),
)
state = get_map_state(map_tag)
with state.lock:
tile_snapshot = state.tile_manager.snapshot()
return CacheStats(
memory_tiles=tile_snapshot.memory_tiles,
memory_max_tiles=config.memory_cache_max_tiles,
memory_hits=tile_snapshot.memory_hits,
memory_misses=tile_snapshot.memory_misses,
disk_bytes=disk_cache_size_bytes(state.cache_dir),
disk_max_bytes=config.disk_cache_max_bytes,
disk_hits=tile_snapshot.disk_hits,
disk_misses=tile_snapshot.disk_misses,
disk_path=disk_cache_root(state.cache_dir),
)
def get_map_debug_state(*, map_tag: Tag | None = None) -> dict[str, Any]:
state = get_map_state(map_tag)
with state.lock:
return {
"tag": state.tag,
"center": state.center,
"zoom": state.zoom,
"requested_size": (state.requested_width, state.requested_height),
"measured_size": (state.measured_width, state.measured_height),
"visible": state.is_visible,
"provider": state.provider.name,
"overlay_count": len(state.overlays),
"layers": {
name: {
"show": layer.show,
"z_index": layer.z_index,
"overlay_count": len(layer.overlay_tags),
}
for name, layer in state.layers.items()
},
"dirty_flags": int(state.dirty),
"pending_command_count": len(state.command_queue),
"generation": state.generation,
"active_drag": state.interaction.active_drag,
"tiles": asdict(state.tile_manager.snapshot()),
}

289
src/dpg_map/cache.py Normal file
View File

@@ -0,0 +1,289 @@
"""Memory and disk cache helpers."""
from __future__ import annotations
import json
import shutil
from dataclasses import asdict, dataclass, field
from pathlib import Path
from time import time
from typing import Any
from platformdirs import user_cache_dir
from .exceptions import CacheError
@dataclass(frozen=True, slots=True)
class CacheStats:
"""Public cache statistics snapshot."""
memory_tiles: int = 0
memory_max_tiles: int = 0
memory_hits: int = 0
memory_misses: int = 0
disk_bytes: int = 0
disk_max_bytes: int | None = None
disk_hits: int = 0
disk_misses: int = 0
disk_path: Path | None = None
@dataclass(slots=True)
class MemoryCacheConfig:
"""Initial memory cache configuration."""
max_tiles: int = 512
@dataclass(slots=True)
class DiskCacheConfig:
"""Initial persistent disk cache configuration."""
path: Path | None = None
max_bytes: int | None = 2_000_000_000
@dataclass(slots=True)
class MemoryCacheEntry:
"""Metadata for one in-memory tile."""
tile_id: object
size_bytes: int = 0
last_accessed_at: float = field(default_factory=time)
protected: bool = False
texture_tag: object | None = None
@dataclass(slots=True)
class MemoryCacheModel:
"""Small LRU metadata model for decoded/runtime tiles."""
max_tiles: int = 512
entries: dict[object, MemoryCacheEntry] = field(default_factory=dict)
hits: int = 0
misses: int = 0
def record_access(self, tile_id: object) -> MemoryCacheEntry | None:
"""Mark an entry as recently used and return it if present."""
entry = self.entries.get(tile_id)
if entry is None:
self.misses += 1
return None
self.hits += 1
entry.last_accessed_at = time()
return entry
def put(self, entry: MemoryCacheEntry) -> None:
"""Insert or replace entry metadata."""
entry.last_accessed_at = time()
self.entries[entry.tile_id] = entry
def plan_evictions(self) -> list[object]:
"""Return tile IDs that can be evicted without touching GUI resources."""
overflow = len(self.entries) - self.max_tiles
if overflow <= 0:
return []
candidates = [entry for entry in self.entries.values() if not entry.protected]
candidates.sort(key=lambda entry: entry.last_accessed_at)
return [entry.tile_id for entry in candidates[:overflow]]
@dataclass(frozen=True, slots=True)
class DiskCacheMetadata:
"""Persistent metadata stored next to a tile file."""
url: str = ""
etag: str | None = None
last_modified: str | None = None
expires: str | None = None
downloaded_at: float = 0.0
last_accessed_at: float = 0.0
size_bytes: int = 0
@dataclass(frozen=True, slots=True)
class DiskCacheEntry:
"""Scanned disk cache file plus metadata."""
tile_path: Path
metadata_path: Path
metadata: DiskCacheMetadata
def default_cache_dir() -> Path:
"""Return the default persistent cache directory."""
return Path(user_cache_dir("dpg-map", appauthor=False))
def disk_cache_root(cache_dir: str | Path | None = None) -> Path:
"""Resolve the disk cache root path."""
return Path(cache_dir).expanduser() if cache_dir is not None else default_cache_dir()
def tile_cache_path(
cache_dir: str | Path | None,
provider_name: str,
z: int,
x: int,
y: int,
extension: str | None = None,
) -> Path:
"""Return the provider-namespaced persistent tile path."""
ext = (extension or "png").lstrip(".")
safe_provider = provider_name.replace("/", "_")
return disk_cache_root(cache_dir) / safe_provider / str(z) / str(x) / f"{y}.{ext}"
def tile_metadata_path(tile_path: Path) -> Path:
"""Return the metadata path for a tile path."""
return tile_path.with_suffix(".json")
def read_disk_metadata(path: Path) -> DiskCacheMetadata:
"""Read a metadata JSON file, returning defaults for missing metadata."""
if not path.exists():
return DiskCacheMetadata()
try:
raw: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
except OSError as exc:
raise CacheError(f"could not read cache metadata: {path}") from exc
except json.JSONDecodeError as exc:
raise CacheError(f"invalid cache metadata JSON: {path}") from exc
return DiskCacheMetadata(
url=str(raw.get("url", "")),
etag=raw.get("etag"),
last_modified=raw.get("last_modified"),
expires=raw.get("expires"),
downloaded_at=float(raw.get("downloaded_at", 0.0)),
last_accessed_at=float(raw.get("last_accessed_at", 0.0)),
size_bytes=int(raw.get("size_bytes", 0)),
)
def write_disk_metadata(path: Path, metadata: DiskCacheMetadata) -> None:
"""Write metadata JSON next to a tile file."""
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(metadata), sort_keys=True), encoding="utf-8")
except OSError as exc:
raise CacheError(f"could not write cache metadata: {path}") from exc
def touch_disk_metadata(path: Path, *, accessed_at: float | None = None) -> None:
"""Update only the last access timestamp for a metadata file."""
metadata = read_disk_metadata(path)
write_disk_metadata(
path,
DiskCacheMetadata(
url=metadata.url,
etag=metadata.etag,
last_modified=metadata.last_modified,
expires=metadata.expires,
downloaded_at=metadata.downloaded_at,
last_accessed_at=time() if accessed_at is None else accessed_at,
size_bytes=metadata.size_bytes,
),
)
def scan_disk_cache(cache_dir: str | Path | None) -> list[DiskCacheEntry]:
"""Scan tile files under a disk cache root."""
root = disk_cache_root(cache_dir)
if not root.exists():
return []
entries: list[DiskCacheEntry] = []
for path in root.rglob("*"):
if not path.is_file() or path.suffix == ".json":
continue
metadata_path = tile_metadata_path(path)
metadata = read_disk_metadata(metadata_path)
size_bytes = metadata.size_bytes or path.stat().st_size
if size_bytes != metadata.size_bytes:
metadata = DiskCacheMetadata(
url=metadata.url,
etag=metadata.etag,
last_modified=metadata.last_modified,
expires=metadata.expires,
downloaded_at=metadata.downloaded_at,
last_accessed_at=metadata.last_accessed_at,
size_bytes=size_bytes,
)
entries.append(DiskCacheEntry(path, metadata_path, metadata))
return entries
def disk_cache_size_bytes(cache_dir: str | Path | None) -> int:
"""Return total bytes for cached tile files."""
return sum(entry.metadata.size_bytes for entry in scan_disk_cache(cache_dir))
def plan_disk_prune(
cache_dir: str | Path | None,
max_bytes: int | None,
*,
protected_paths: set[Path] | None = None,
) -> list[Path]:
"""Return tile paths that should be pruned by LRU order without deleting them."""
if max_bytes is None:
return []
protected = {path.resolve() for path in protected_paths or set()}
entries = scan_disk_cache(cache_dir)
total = sum(entry.metadata.size_bytes for entry in entries)
if total <= max_bytes:
return []
candidates = [entry for entry in entries if entry.tile_path.resolve() not in protected]
candidates.sort(key=lambda entry: entry.metadata.last_accessed_at)
prune: list[Path] = []
for entry in candidates:
if total <= max_bytes:
break
prune.append(entry.tile_path)
total -= entry.metadata.size_bytes
return prune
def prune_disk_cache(
cache_dir: str | Path | None,
max_bytes: int | None,
*,
protected_paths: set[Path] | None = None,
) -> list[Path]:
"""Delete LRU tile files until the cache fits the configured limit."""
planned = plan_disk_prune(cache_dir, max_bytes, protected_paths=protected_paths)
for path in planned:
metadata_path = tile_metadata_path(path)
try:
path.unlink(missing_ok=True)
metadata_path.unlink(missing_ok=True)
except OSError as exc:
raise CacheError(f"could not prune cached tile: {path}") from exc
return planned
def clear_disk_cache_path(cache_dir: str | Path | None) -> None:
"""Remove all persistent tile cache files under a cache root."""
root = disk_cache_root(cache_dir)
if not root.exists():
return
try:
shutil.rmtree(root)
except OSError as exc:
raise CacheError(f"could not clear disk cache: {root}") from exc

113
src/dpg_map/commands.py Normal file
View File

@@ -0,0 +1,113 @@
"""Command models for GUI-thread rendering work."""
from __future__ import annotations
from collections import OrderedDict, deque
from dataclasses import dataclass, field
from enum import Enum
from threading import RLock
from time import monotonic
from typing import Any
from .types import Tag
class CommandKind(Enum):
"""Commands that the GUI thread can apply in order."""
SET_VIEW = "set_view"
SET_PROVIDER = "set_provider"
ADD_OVERLAY = "add_overlay"
UPDATE_OVERLAY = "update_overlay"
DELETE_OVERLAY = "delete_overlay"
SET_LAYER_VISIBILITY = "set_layer_visibility"
ADD_LAYER = "add_layer"
CLEAR_LAYER = "clear_layer"
CLEAR_MAP = "clear_map"
CLEAR_MEMORY_CACHE = "clear_memory_cache"
CLEAR_DISK_CACHE = "clear_disk_cache"
@dataclass(frozen=True, slots=True)
class MapCommand:
"""A command submitted from public API calls to the GUI-thread renderer."""
kind: CommandKind
map_tag: Tag
payload: dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=monotonic)
class MapCommandQueue:
"""Thread-safe command queue with bounded coalescing for high-rate updates."""
_VIEW_KEY = "__view__"
def __init__(self) -> None:
self._lock = RLock()
self._ordered: deque[MapCommand] = deque()
self._overlay_updates: OrderedDict[tuple[Tag, Tag], MapCommand] = OrderedDict()
self._view_updates: OrderedDict[Tag, MapCommand] = OrderedDict()
def put(self, command: MapCommand) -> None:
"""Queue a command, coalescing update commands where ordering permits."""
with self._lock:
if command.kind is CommandKind.UPDATE_OVERLAY:
overlay_tag = command.payload.get("tag")
if overlay_tag is None:
self._ordered.append(command)
return
key = (command.map_tag, overlay_tag)
self._ordered.append(command)
self._overlay_updates[key] = command
return
if command.kind is CommandKind.SET_VIEW:
self._ordered.append(command)
self._view_updates[command.map_tag] = command
return
self._ordered.append(command)
def drain(self) -> list[MapCommand]:
"""Return pending commands in render order and clear the queue."""
with self._lock:
drained: list[MapCommand] = []
while self._ordered:
command = self._ordered.popleft()
if command.kind is CommandKind.UPDATE_OVERLAY:
overlay_tag = command.payload.get("tag")
if not isinstance(overlay_tag, str | int):
drained.append(command)
continue
key = (command.map_tag, overlay_tag)
latest = self._overlay_updates.get(key)
if latest is command:
drained.append(command)
del self._overlay_updates[key]
continue
if command.kind is CommandKind.SET_VIEW:
latest = self._view_updates.get(command.map_tag)
if latest is command:
drained.append(command)
del self._view_updates[command.map_tag]
continue
drained.append(command)
return drained
def __len__(self) -> int:
with self._lock:
return len(self._ordered)
def clear(self) -> None:
"""Drop all pending commands."""
with self._lock:
self._ordered.clear()
self._overlay_updates.clear()
self._view_updates.clear()

View File

@@ -0,0 +1 @@
"""Diagnostics and debug state helpers."""

View File

@@ -0,0 +1 @@
"""Draw layer bookkeeping helpers."""

49
src/dpg_map/exceptions.py Normal file
View File

@@ -0,0 +1,49 @@
"""Public exception types for dpg-map."""
class DpgMapError(Exception):
"""Base exception for all public dpg-map errors."""
class DpgMapNotImplementedError(DpgMapError, NotImplementedError):
"""Raised by public APIs that are intentionally stubbed during the rebuild."""
class ProviderError(DpgMapError):
"""Base exception for tile provider errors."""
class ProviderExistsError(ProviderError):
"""Raised when registering a provider name that already exists."""
class ProviderNotFoundError(ProviderError):
"""Raised when a requested tile provider is not registered."""
class InvalidProviderError(ProviderError, ValueError):
"""Raised when a tile provider definition is invalid."""
class ProjectionError(DpgMapError, ValueError):
"""Raised when geographic projection input is invalid."""
class MapNotFoundError(DpgMapError, KeyError):
"""Raised when a requested map tag is not registered."""
class OverlayNotFoundError(DpgMapError, KeyError):
"""Raised when a requested overlay tag is not registered."""
class CoordinateError(DpgMapError, ValueError):
"""Raised when geographic coordinate input is invalid."""
class ThreadingError(DpgMapError):
"""Raised when an operation violates dpg-map threading rules."""
class CacheError(DpgMapError):
"""Raised when cache metadata or paths cannot be handled."""

View File

@@ -0,0 +1,36 @@
"""Map interaction state and handlers."""
from __future__ import annotations
from dataclasses import dataclass
from .sizing import effective_draw_size
from .state import MapState
@dataclass(frozen=True, slots=True)
class HitRect:
"""Screen-space rectangle used for map interaction tests."""
x: float
y: float
width: float
height: float
@property
def right(self) -> float:
return self.x + self.width
@property
def bottom(self) -> float:
return self.y + self.height
def contains(self, x: float, y: float) -> bool:
return self.x <= x <= self.right and self.y <= y <= self.bottom
def calculate_hit_rect(state: MapState, drawlist_pos: tuple[float, float]) -> HitRect:
"""Return the map interaction rectangle for a drawlist position."""
width, height = effective_draw_size(state)
return HitRect(float(drawlist_pos[0]), float(drawlist_pos[1]), float(width), float(height))

72
src/dpg_map/overlays.py Normal file
View File

@@ -0,0 +1,72 @@
"""Logical overlay models."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from .types import Color, LatLon, Tag
@dataclass(slots=True)
class Overlay:
"""Base logical overlay state."""
tag: Tag
map_tag: Tag
layer: str
show: bool = True
user_data: Any = None
revision: int = 0
def touch(self) -> None:
"""Mark this overlay as changed."""
self.revision += 1
@dataclass(slots=True)
class MarkerOverlay(Overlay):
"""Logical marker overlay."""
lat: float = 0.0
lon: float = 0.0
label: str | None = None
color: Color = (255, 80, 80, 255)
radius: float = 5.0
show_label: bool = False
callback: Callable[..., Any] | None = None
@dataclass(slots=True)
class PolylineOverlay(Overlay):
"""Logical polyline overlay."""
points: tuple[LatLon, ...] = ()
color: Color = (80, 180, 255, 255)
thickness: float = 2.0
closed: bool = False
simplify: bool = True
@dataclass(slots=True)
class TrajectoryOverlay(Overlay):
"""Logical trajectory overlay."""
points: tuple[LatLon, ...] = ()
timestamps: tuple[float, ...] | None = None
color: Color = (255, 180, 60, 255)
thickness: float = 2.0
show_points: bool = False
point_stride: int = 1
@dataclass(slots=True)
class LayerState:
"""Logical layer visibility and ordering state."""
name: str
z_index: int = 0
show: bool = True
overlay_tags: set[Tag] = field(default_factory=set)

92
src/dpg_map/projection.py Normal file
View File

@@ -0,0 +1,92 @@
"""Web Mercator projection helpers."""
from __future__ import annotations
import math
from .exceptions import ProjectionError
from .types import LatLon, Point
WEB_MERCATOR_MAX_LAT = 85.05112878
DEFAULT_TILE_SIZE = 256
def clamp_latitude(lat: float) -> float:
"""Clamp latitude to the Web Mercator representable range."""
return min(max(lat, -WEB_MERCATOR_MAX_LAT), WEB_MERCATOR_MAX_LAT)
def map_size(zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> int:
"""Return square pixel size of the full world at a zoom level."""
if zoom < 0:
raise ProjectionError("zoom must be >= 0")
if tile_size <= 0:
raise ProjectionError("tile_size must be > 0")
return tile_size * (2**zoom)
def latlon_to_world(lat: float, lon: float, zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> Point:
"""Project latitude/longitude to world pixel coordinates."""
size = map_size(zoom, tile_size)
lat = clamp_latitude(lat)
lon = ((lon + 180.0) % 360.0) - 180.0
sin_lat = math.sin(math.radians(lat))
x = (lon + 180.0) / 360.0 * size
y = (0.5 - math.log((1.0 + sin_lat) / (1.0 - sin_lat)) / (4.0 * math.pi)) * size
return (x, y)
def world_to_latlon(x: float, y: float, zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> LatLon:
"""Unproject world pixel coordinates to latitude/longitude."""
size = map_size(zoom, tile_size)
lon = x / size * 360.0 - 180.0
n = math.pi - 2.0 * math.pi * y / size
lat = math.degrees(math.atan(math.sinh(n)))
return (lat, lon)
def latlon_to_tile(lat: float, lon: float, zoom: int) -> tuple[int, int, int]:
"""Return the XYZ tile coordinate containing a latitude/longitude point."""
x, y = latlon_to_world(lat, lon, zoom)
scale = 2**zoom
tile_x = min(max(int(x // DEFAULT_TILE_SIZE), 0), scale - 1)
tile_y = min(max(int(y // DEFAULT_TILE_SIZE), 0), scale - 1)
return (tile_x, tile_y, zoom)
def world_to_screen(
world_x: float,
world_y: float,
*,
center: LatLon,
zoom: int,
width: int,
height: int,
tile_size: int = DEFAULT_TILE_SIZE,
) -> Point:
"""Convert world pixels to screen pixels for a centered viewport."""
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
def screen_to_world(
screen_x: float,
screen_y: float,
*,
center: LatLon,
zoom: int,
width: int,
height: int,
tile_size: int = DEFAULT_TILE_SIZE,
) -> Point:
"""Convert screen pixels to world pixels for a centered viewport."""
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
return (screen_x + center_x - width / 2.0, screen_y + center_y - height / 2.0)

145
src/dpg_map/providers.py Normal file
View File

@@ -0,0 +1,145 @@
"""Tile provider definitions and registry."""
from __future__ import annotations
from dataclasses import dataclass, field
from string import Formatter
from threading import RLock
from .exceptions import InvalidProviderError, ProviderExistsError, ProviderNotFoundError
_REQUIRED_TEMPLATE_FIELDS = frozenset({"x", "y", "z"})
_OPTIONAL_TEMPLATE_FIELDS = frozenset({"s", "r", "ext"})
@dataclass(frozen=True, slots=True)
class TileProvider:
"""XYZ raster tile provider definition."""
name: str
url_template: str
min_zoom: int = 0
max_zoom: int = 19
tile_size: int = 256
attribution: str = ""
headers: dict[str, str] = field(default_factory=dict)
subdomains: tuple[str, ...] = ()
retina: bool = False
file_extension: str | None = None
def __post_init__(self) -> None:
name = self.name.strip()
if not name:
raise InvalidProviderError("provider name must not be empty")
if name != self.name:
object.__setattr__(self, "name", name)
if not self.url_template.strip():
raise InvalidProviderError("provider url_template must not be empty")
fields = {
field_name
for _, field_name, _, _ in Formatter().parse(self.url_template)
if field_name is not None and field_name != ""
}
missing = _REQUIRED_TEMPLATE_FIELDS - fields
if missing:
missing_text = ", ".join(sorted(missing))
raise InvalidProviderError(f"provider url_template missing field(s): {missing_text}")
unknown = fields - _REQUIRED_TEMPLATE_FIELDS - _OPTIONAL_TEMPLATE_FIELDS
if unknown:
unknown_text = ", ".join(sorted(unknown))
raise InvalidProviderError(
f"provider url_template contains unknown field(s): {unknown_text}"
)
if self.min_zoom < 0:
raise InvalidProviderError("provider min_zoom must be >= 0")
if self.max_zoom < self.min_zoom:
raise InvalidProviderError("provider max_zoom must be >= min_zoom")
if self.tile_size <= 0:
raise InvalidProviderError("provider tile_size must be > 0")
object.__setattr__(self, "headers", dict(self.headers))
object.__setattr__(self, "subdomains", tuple(self.subdomains))
def build_url(self, *, x: int, y: int, z: int) -> str:
"""Build a concrete tile URL for an XYZ tile coordinate."""
if z < self.min_zoom or z > self.max_zoom:
raise InvalidProviderError(
f"zoom {z} is outside provider range {self.min_zoom}-{self.max_zoom}"
)
subdomain = ""
if self.subdomains:
subdomain = self.subdomains[(x + y + z) % len(self.subdomains)]
retina_suffix = "@2x" if self.retina else ""
extension = self.file_extension or ""
return self.url_template.format(
x=x,
y=y,
z=z,
s=subdomain,
r=retina_suffix,
ext=extension,
)
OSM = TileProvider(
name="osm",
url_template="https://tile.openstreetmap.org/{z}/{x}/{y}.png",
min_zoom=0,
max_zoom=19,
tile_size=256,
attribution="\u00a9 OpenStreetMap contributors",
)
_providers: dict[str, TileProvider] = {OSM.name: OSM}
_providers_lock = RLock()
def register_provider(provider: TileProvider, *, replace: bool = False) -> None:
"""Register a tile provider by name."""
if not isinstance(provider, TileProvider):
raise InvalidProviderError("provider must be a TileProvider")
with _providers_lock:
if provider.name in _providers and not replace:
raise ProviderExistsError(f"provider already registered: {provider.name}")
_providers[provider.name] = provider
def unregister_provider(name: str) -> None:
"""Remove a registered tile provider."""
with _providers_lock:
if name not in _providers:
raise ProviderNotFoundError(f"provider not registered: {name}")
del _providers[name]
def get_provider(name: str) -> TileProvider:
"""Return a registered tile provider."""
with _providers_lock:
try:
return _providers[name]
except KeyError as exc:
raise ProviderNotFoundError(f"provider not registered: {name}") from exc
def get_default_provider() -> TileProvider:
"""Return the default OpenStreetMap provider."""
return get_provider(OSM.name)
def list_providers() -> list[str]:
"""List registered provider names in stable sorted order."""
with _providers_lock:
return sorted(_providers)

231
src/dpg_map/renderer.py Normal file
View File

@@ -0,0 +1,231 @@
"""GUI-thread renderer implementation."""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from .commands import CommandKind, MapCommand
from .interaction import HitRect, calculate_hit_rect
from .sizing import SizeMeasurement, apply_size_measurement
from .state import DirtyFlags, MapState
from .tiles import Tile, VisibleTile
class MapRenderer:
"""GUI-thread renderer for the map widget shell and tile layer."""
def __init__(self, state: MapState, dpg: Any) -> None:
self.state = state
self._dpg = dpg
self._background_tag = f"{state.tag}##background"
self._attribution_tag = f"{state.tag}##attribution"
self.last_drained_commands: tuple[MapCommand, ...] = ()
self.last_hit_rect: HitRect | None = None
def schedule_next_frame(self) -> None:
"""Schedule this renderer to run on the next Dear PyGui frame."""
with self.state.lock:
if self.state.frame_scheduled:
return
self.state.frame_scheduled = True
frame = self._dpg.get_frame_count() + 1
self._dpg.set_frame_callback(frame, self._frame_callback)
def _frame_callback(self, sender: Any | None = None, app_data: Any | None = None) -> None:
_ = (sender, app_data)
with self.state.lock:
self.state.frame_scheduled = False
if not self._dpg.does_item_exist(self.state.drawlist_tag):
return
self.render_frame()
self.schedule_next_frame()
def render_frame(self) -> None:
"""Drain pending commands, refresh size, process tiles, and redraw."""
commands = drain_renderer_commands(self.state)
self.last_drained_commands = tuple(commands)
self._update_size_from_dpg()
with self.state.lock:
dirty = self.state.dirty
should_draw = bool(dirty & (DirtyFlags.FULL | DirtyFlags.SIZE | DirtyFlags.TILES))
visible = self.state.is_visible
width = self.state.measured_width or self.state.last_nonzero_width
height = self.state.measured_height or self.state.last_nonzero_height
provider_attribution = self.state.provider.attribution
provider = self.state.provider
center = self.state.center
zoom = self.state.zoom
generation = self.state.generation
cache_dir = self.state.cache_dir
self.state.dirty = DirtyFlags.NONE
accepted_tiles = self.state.tile_manager.drain_results(
generation=generation,
provider_name=provider.name,
)
self._delete_evicted_textures()
for tile in accepted_tiles:
self._ensure_texture(tile)
visible_tiles: list[VisibleTile] = []
if visible and width > 0 and height > 0:
visible_tiles = self.state.tile_manager.request_visible_tiles(
center=center,
zoom=zoom,
width=width,
height=height,
provider=provider,
generation=generation,
cache_dir=cache_dir,
margin=self._prefetch_margin(),
)
if visible and (should_draw or accepted_tiles):
self._draw_tile_layer(
visible_tiles=visible_tiles,
width=width,
height=height,
attribution=provider_attribution,
tile_size=provider.tile_size,
)
def _update_size_from_dpg(self) -> None:
width, height = self._measure_child_content()
visible = bool(self._dpg.is_item_shown(self.state.child_window_tag))
with self.state.lock:
update = apply_size_measurement(
self.state,
SizeMeasurement(width=width, height=height, visible=visible),
)
draw_width = update.effective_width
draw_height = update.effective_height
self._dpg.configure_item(self.state.drawlist_tag, width=draw_width, height=draw_height)
draw_pos = tuple(float(value) for value in self._dpg.get_item_pos(self.state.drawlist_tag))
with self.state.lock:
self.last_hit_rect = calculate_hit_rect(self.state, (draw_pos[0], draw_pos[1]))
def _measure_child_content(self) -> tuple[int, int]:
try:
width, height = self._dpg.get_item_rect_size(self.state.child_window_tag)
except Exception:
return (0, 0)
return (max(0, int(width)), max(0, int(height)))
def _draw_tile_layer(
self,
*,
visible_tiles: list[VisibleTile],
width: int,
height: int,
attribution: str,
tile_size: int,
) -> None:
width = max(1, int(width))
height = max(1, int(height))
self._dpg.delete_item(self.state.drawlist_tag, children_only=True)
self._dpg.draw_rectangle(
(0, 0),
(width, height),
parent=self.state.drawlist_tag,
tag=self._background_tag,
color=(54, 68, 78, 255),
fill=(29, 38, 45, 255),
)
for visible_tile in visible_tiles:
tile = self.state.tile_manager.get_ready_tile(visible_tile.tile_id)
if tile is None or tile.texture_tag is None:
continue
screen_x = visible_tile.screen_x
screen_y = visible_tile.screen_y
self._dpg.draw_image(
tile.texture_tag,
(screen_x, screen_y),
(screen_x + tile_size, screen_y + tile_size),
parent=self.state.drawlist_tag,
)
label = attribution or "Map tiles"
text_y = max(28, height - 24)
self._dpg.draw_text(
(12, text_y),
label,
parent=self.state.drawlist_tag,
tag=self._attribution_tag,
color=(172, 184, 192, 255),
size=12,
)
def _ensure_texture(self, tile: Tile) -> None:
if tile.texture_tag is not None:
return
texture_tag = (
f"{self.state.tag}##tile-{tile.tile_id.provider_name}-"
f"{tile.tile_id.z}-{tile.tile_id.x}-{tile.tile_id.y}"
)
if not self._dpg.does_item_exist(texture_tag):
self._dpg.add_static_texture(
tile.width,
tile.height,
tile.pixels,
tag=texture_tag,
parent=self.state.texture_registry_tag,
)
self.state.tile_manager.set_texture_tag(tile.tile_id, texture_tag)
def _delete_evicted_textures(self) -> None:
for texture_tag in self.state.tile_manager.take_texture_deletions():
if self._dpg.does_item_exist(texture_tag):
self._dpg.delete_item(texture_tag)
def _prefetch_margin(self) -> int:
from .state import get_config
return get_config().prefetch_margin_tiles
def drain_renderer_commands(state: MapState) -> list[MapCommand]:
"""Drain and apply GUI-thread command side effects."""
commands = state.command_queue.drain()
if not commands:
return []
with state.lock:
for command in commands:
if command.kind is CommandKind.SET_VIEW:
state.dirty |= DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS
elif command.kind is CommandKind.SET_PROVIDER:
state.tile_manager.clear_memory_cache()
state.dirty |= DirtyFlags.PROVIDER | DirtyFlags.TILES
elif command.kind in {
CommandKind.ADD_OVERLAY,
CommandKind.UPDATE_OVERLAY,
CommandKind.DELETE_OVERLAY,
CommandKind.SET_LAYER_VISIBILITY,
CommandKind.ADD_LAYER,
CommandKind.CLEAR_LAYER,
}:
state.dirty |= DirtyFlags.OVERLAYS
elif command.kind is CommandKind.CLEAR_MAP:
state.tile_manager.clear_memory_cache()
state.dirty |= DirtyFlags.TILES | DirtyFlags.OVERLAYS
elif command.kind is CommandKind.CLEAR_MEMORY_CACHE:
state.tile_manager.clear_memory_cache()
state.dirty |= DirtyFlags.TILES
elif command.kind is CommandKind.CLEAR_DISK_CACHE:
state.tile_manager.clear_disk_cache(state.cache_dir)
state.dirty |= DirtyFlags.TILES
return commands
def make_frame_pump(state: MapState, dpg: Any) -> Callable[[], None]:
"""Create and attach a renderer frame pump for a map state."""
renderer = MapRenderer(state, dpg)
with state.lock:
state.renderer = renderer
renderer.schedule_next_frame()
return renderer.render_frame

87
src/dpg_map/sizing.py Normal file
View File

@@ -0,0 +1,87 @@
"""Map sizing measurement helpers."""
from __future__ import annotations
from dataclasses import dataclass
from .state import DirtyFlags, MapState, mark_dirty
@dataclass(frozen=True, slots=True)
class SizeMeasurement:
"""Concrete size and visibility reported by the GUI thread."""
width: int
height: int
visible: bool
@dataclass(frozen=True, slots=True)
class SizeUpdate:
"""Result of applying one GUI size measurement."""
changed: bool
became_visible: bool
became_hidden: bool
effective_width: int
effective_height: int
def normalize_dimension(value: int | float | None) -> int:
"""Convert Dear PyGui size values to stable integer pixels."""
if value is None:
return 0
return max(0, int(value))
def effective_draw_size(state: MapState) -> tuple[int, int]:
"""Return the size the drawlist should use for the current frame."""
width = state.measured_width or state.last_nonzero_width or 1
height = state.measured_height or state.last_nonzero_height or 1
return (max(1, width), max(1, height))
def apply_size_measurement(
state: MapState,
measurement: SizeMeasurement,
*,
mark: bool = True,
) -> SizeUpdate:
"""Apply a GUI-thread size measurement to logical state.
A zero measurement is preserved as the current measured size, but it does not
erase the last non-zero size. This keeps hidden maps from permanently
collapsing when Dear PyGui reports a temporary zero content region.
"""
width = normalize_dimension(measurement.width)
height = normalize_dimension(measurement.height)
visible = bool(measurement.visible and width > 0 and height > 0)
previous_width = state.measured_width
previous_height = state.measured_height
previous_visible = state.is_visible
changed = previous_width != width or previous_height != height or previous_visible != visible
state.measured_width = width
state.measured_height = height
state.is_visible = visible
if width > 0:
state.last_nonzero_width = width
if height > 0:
state.last_nonzero_height = height
if changed and mark:
mark_dirty(state, DirtyFlags.SIZE | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
effective_width, effective_height = effective_draw_size(state)
return SizeUpdate(
changed=changed,
became_visible=visible and not previous_visible,
became_hidden=previous_visible and not visible,
effective_width=effective_width,
effective_height=effective_height,
)

325
src/dpg_map/state.py Normal file
View File

@@ -0,0 +1,325 @@
"""Thread-safe state models and registries."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import IntFlag, auto
from pathlib import Path
from threading import RLock
from uuid import uuid4
from .commands import MapCommandQueue
from .exceptions import InvalidProviderError, MapNotFoundError
from .overlays import LayerState, Overlay
from .providers import TileProvider, get_default_provider, get_provider
from .tiles import TileManager
from .types import LatLon, Tag
class DirtyFlags(IntFlag):
"""Reasons the GUI renderer needs to refresh part of a map."""
NONE = 0
VIEW = auto()
TILES = auto()
OVERLAYS = auto()
SIZE = auto()
PROVIDER = auto()
FULL = VIEW | TILES | OVERLAYS | SIZE | PROVIDER
@dataclass(slots=True)
class DpgMapConfig:
"""Global package configuration."""
user_agent: str | None = None
cache_dir: Path | None = None
default_provider: str | TileProvider = "osm"
memory_cache_max_tiles: int = 512
disk_cache_max_bytes: int | None = 2_000_000_000
prefetch_margin_tiles: int = 1
tile_worker_count: int = 4
overlay_update_policy: str = "coalesce"
debug: bool = False
@dataclass(slots=True)
class InteractionState:
"""Logical interaction state until GUI interaction is implemented."""
active_drag: bool = False
last_mouse_position: tuple[float, float] | None = None
def default_layers() -> dict[str, LayerState]:
"""Return the default logical map layers."""
layers = [
LayerState("background", z_index=0),
LayerState("tiles", z_index=10),
LayerState("default", z_index=50),
LayerState("markers", z_index=60),
LayerState("lines", z_index=70),
LayerState("trajectories", z_index=80),
LayerState("attribution", z_index=100),
]
return {layer.name: layer for layer in layers}
@dataclass(slots=True)
class MapState:
"""Thread-safe logical state for one map widget."""
tag: Tag
child_window_tag: Tag
drawlist_tag: Tag
texture_registry_tag: Tag
handler_registry_tag: Tag
requested_width: int = 0
requested_height: int = 0
requested_autosize_x: bool = False
requested_autosize_y: bool = False
measured_width: int = 0
measured_height: int = 0
last_nonzero_width: int = 0
last_nonzero_height: int = 0
is_visible: bool = False
center: LatLon = (0.0, 0.0)
zoom: int = 2
min_zoom: int = 0
max_zoom: int = 19
provider: TileProvider = field(default_factory=get_default_provider)
overlays: dict[Tag, Overlay] = field(default_factory=dict)
layers: dict[str, LayerState] = field(default_factory=default_layers)
command_queue: MapCommandQueue = field(default_factory=MapCommandQueue)
tile_manager: TileManager = field(default_factory=TileManager)
renderer: object | None = None
interaction: InteractionState = field(default_factory=InteractionState)
lock: RLock = field(default_factory=RLock)
dirty: DirtyFlags = DirtyFlags.FULL
frame_scheduled: bool = False
generation: int = 0
cache_dir: Path | None = None
user_agent: str | None = None
_config = DpgMapConfig()
_config_lock = RLock()
_maps: dict[Tag, MapState] = {}
_maps_lock = RLock()
_current_map_stack: list[Tag] = []
_current_map_lock = RLock()
def _resolve_provider(provider: str | TileProvider | None) -> TileProvider:
if provider is None:
with _config_lock:
provider = _config.default_provider
if isinstance(provider, TileProvider):
return provider
if isinstance(provider, str):
return get_provider(provider)
raise InvalidProviderError("provider must be a provider name or TileProvider")
def configure_state(
*,
user_agent: str | None = None,
cache_dir: str | Path | None = None,
default_provider: str | TileProvider = "osm",
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
prefetch_margin_tiles: int = 1,
tile_worker_count: int = 4,
overlay_update_policy: str = "coalesce",
debug: bool = False,
) -> None:
"""Replace global dpg-map configuration."""
if memory_cache_max_tiles < 0:
raise ValueError("memory_cache_max_tiles must be >= 0")
if disk_cache_max_bytes is not None and disk_cache_max_bytes < 0:
raise ValueError("disk_cache_max_bytes must be >= 0 or None")
if prefetch_margin_tiles < 0:
raise ValueError("prefetch_margin_tiles must be >= 0")
if tile_worker_count < 1:
raise ValueError("tile_worker_count must be >= 1")
if overlay_update_policy != "coalesce":
raise ValueError('overlay_update_policy must be "coalesce"')
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else None
_resolve_provider(default_provider)
with _config_lock:
_config.user_agent = user_agent
_config.cache_dir = resolved_cache_dir
_config.default_provider = default_provider
_config.memory_cache_max_tiles = memory_cache_max_tiles
_config.disk_cache_max_bytes = disk_cache_max_bytes
_config.prefetch_margin_tiles = prefetch_margin_tiles
_config.tile_worker_count = tile_worker_count
_config.overlay_update_policy = overlay_update_policy
_config.debug = debug
def get_config() -> DpgMapConfig:
"""Return a copy of current global configuration."""
with _config_lock:
return DpgMapConfig(
user_agent=_config.user_agent,
cache_dir=_config.cache_dir,
default_provider=_config.default_provider,
memory_cache_max_tiles=_config.memory_cache_max_tiles,
disk_cache_max_bytes=_config.disk_cache_max_bytes,
prefetch_margin_tiles=_config.prefetch_margin_tiles,
tile_worker_count=_config.tile_worker_count,
overlay_update_policy=_config.overlay_update_policy,
debug=_config.debug,
)
def create_map_state(
*,
tag: Tag | None = None,
center: LatLon = (0.0, 0.0),
zoom: int = 2,
min_zoom: int | None = None,
max_zoom: int | None = None,
width: int = 0,
height: int = 0,
autosize_x: bool = False,
autosize_y: bool = False,
provider: str | TileProvider | None = None,
cache_dir: str | Path | None = None,
user_agent: str | None = None,
) -> MapState:
"""Create and register a logical map state."""
map_tag = tag if tag is not None else f"dpg_map_{uuid4().hex}"
provider_obj = _resolve_provider(provider)
min_zoom_value = provider_obj.min_zoom if min_zoom is None else min_zoom
max_zoom_value = provider_obj.max_zoom if max_zoom is None else max_zoom
if min_zoom_value < provider_obj.min_zoom:
min_zoom_value = provider_obj.min_zoom
if max_zoom_value > provider_obj.max_zoom:
max_zoom_value = provider_obj.max_zoom
if max_zoom_value < min_zoom_value:
raise ValueError("max_zoom must be >= min_zoom")
zoom_value = max(min_zoom_value, min(max_zoom_value, int(zoom)))
config = get_config()
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else config.cache_dir
state = MapState(
tag=map_tag,
child_window_tag=f"{map_tag}##child",
drawlist_tag=f"{map_tag}##drawlist",
texture_registry_tag=f"{map_tag}##textures",
handler_registry_tag=f"{map_tag}##handlers",
requested_width=width,
requested_height=height,
requested_autosize_x=autosize_x,
requested_autosize_y=autosize_y,
center=(float(center[0]), float(center[1])),
zoom=zoom_value,
min_zoom=min_zoom_value,
max_zoom=max_zoom_value,
provider=provider_obj,
cache_dir=resolved_cache_dir,
user_agent=user_agent if user_agent is not None else config.user_agent,
)
state.tile_manager = TileManager(
memory_cache_max_tiles=config.memory_cache_max_tiles,
disk_cache_max_bytes=config.disk_cache_max_bytes,
worker_count=config.tile_worker_count,
user_agent=state.user_agent,
)
with _maps_lock:
_maps[map_tag] = state
return state
def register_map_state(state: MapState) -> None:
"""Register a prebuilt map state."""
with _maps_lock:
_maps[state.tag] = state
def unregister_map_state(tag: Tag) -> None:
"""Remove a map state from the registry."""
with _maps_lock:
_maps.pop(tag, None)
def get_map_state(map_tag: Tag | None = None) -> MapState:
"""Resolve a map by explicit tag or current map context."""
resolved_tag = resolve_map_tag(map_tag)
with _maps_lock:
try:
return _maps[resolved_tag]
except KeyError as exc:
raise MapNotFoundError(f"map not registered: {resolved_tag}") from exc
def resolve_map_tag(map_tag: Tag | None = None) -> Tag:
"""Resolve an explicit tag or the current context map tag."""
if map_tag is not None:
return map_tag
with _current_map_lock:
if _current_map_stack:
return _current_map_stack[-1]
raise MapNotFoundError("map_tag is required outside a map_widget context")
def find_map_for_overlay(tag: Tag, map_tag: Tag | None = None) -> MapState:
"""Find the map containing an overlay, optionally scoped by map tag."""
if map_tag is not None:
return get_map_state(map_tag)
with _current_map_lock:
if _current_map_stack:
current = get_map_state(_current_map_stack[-1])
with current.lock:
if tag in current.overlays:
return current
matches: list[MapState] = []
with _maps_lock:
states = list(_maps.values())
for state in states:
with state.lock:
if tag in state.overlays:
matches.append(state)
if len(matches) == 1:
return matches[0]
if not matches:
raise MapNotFoundError(f"no map contains overlay: {tag}")
raise MapNotFoundError(f"overlay tag is ambiguous across maps: {tag}")
@contextmanager
def current_map_context(map_tag: Tag) -> Iterator[None]:
"""Push a current map tag for context-style overlay creation."""
with _current_map_lock:
_current_map_stack.append(map_tag)
try:
yield
finally:
with _current_map_lock:
if _current_map_stack and _current_map_stack[-1] == map_tag:
_current_map_stack.pop()
elif map_tag in _current_map_stack:
_current_map_stack.remove(map_tag)
def mark_dirty(state: MapState, flags: DirtyFlags) -> None:
"""Mark a map dirty while holding or acquiring its state lock."""
state.dirty |= flags

555
src/dpg_map/tiles.py Normal file
View File

@@ -0,0 +1,555 @@
"""Tile identity, lifecycle, cache, and worker coordination."""
from __future__ import annotations
import warnings
from dataclasses import dataclass, field
from enum import Enum
from io import BytesIO
from pathlib import Path
from queue import Empty, Queue
from threading import Lock, Thread
from time import time
from typing import Literal, cast
import requests
from PIL import Image, UnidentifiedImageError
from .cache import (
DiskCacheMetadata,
MemoryCacheEntry,
MemoryCacheModel,
clear_disk_cache_path,
prune_disk_cache,
tile_cache_path,
tile_metadata_path,
touch_disk_metadata,
write_disk_metadata,
)
from .exceptions import CacheError
from .projection import latlon_to_world, map_size
from .providers import TileProvider
from .types import LatLon
class TileStatus(Enum):
"""Lifecycle status for a tile."""
QUEUED = "queued"
LOADING = "loading"
READY = "ready"
FAILED = "failed"
@dataclass(frozen=True, slots=True)
class TileID:
"""Provider-namespaced XYZ tile identity."""
provider_name: str
z: int
x: int
y: int
@dataclass(slots=True)
class Tile:
"""Decoded tile data plus GUI-thread texture metadata."""
tile_id: TileID
status: TileStatus
generation: int
width: int = 0
height: int = 0
pixels: tuple[float, ...] = ()
texture_tag: object | None = None
source: Literal["memory", "disk", "network"] | None = None
error: str | None = None
last_accessed_at: float = field(default_factory=time)
@dataclass(frozen=True, slots=True)
class VisibleTile:
"""A visible tile and its screen-space top-left point."""
tile_id: TileID
screen_x: float
screen_y: float
@dataclass(frozen=True, slots=True)
class TileRequest:
"""Worker-thread tile request."""
tile_id: TileID
generation: int
url: str
path: Path
headers: dict[str, str]
disk_cache_max_bytes: int | None
protected_paths: frozenset[Path] = frozenset()
@dataclass(frozen=True, slots=True)
class TileResult:
"""Worker-thread tile result consumed by the GUI thread."""
tile_id: TileID
generation: int
status: TileStatus
width: int = 0
height: int = 0
pixels: tuple[float, ...] = ()
source: Literal["disk", "network"] | None = None
error: str | None = None
@dataclass(frozen=True, slots=True)
class TileManagerSnapshot:
"""Thread-safe counters for diagnostics."""
queued_tiles: int
loading_tiles: int
failed_tiles: int
visible_tile_count: int
memory_tiles: int
memory_hits: int
memory_misses: int
disk_hits: int
disk_misses: int
stale_results: int
def calculate_visible_tiles(
*,
center: LatLon,
zoom: int,
width: int,
height: int,
provider: TileProvider,
margin: int = 0,
) -> list[VisibleTile]:
"""Return visible XYZ tiles plus a margin, with wrapped X and clamped Y."""
if width <= 0 or height <= 0:
return []
tile_size = provider.tile_size
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
left = center_x - width / 2.0
top = center_y - height / 2.0
right = center_x + width / 2.0
bottom = center_y + height / 2.0
max_tile = (2**zoom) - 1
start_x = int(left // tile_size) - margin
end_x = int(right // tile_size) + margin
start_y = max(0, int(top // tile_size) - margin)
end_y = min(max_tile, int(bottom // tile_size) + margin)
world_size = map_size(zoom, tile_size)
tiles: list[VisibleTile] = []
seen: set[TileID] = set()
for y in range(start_y, end_y + 1):
for raw_x in range(start_x, end_x + 1):
x = raw_x % (max_tile + 1)
tile_id = TileID(provider.name, zoom, x, y)
if tile_id in seen:
continue
seen.add(tile_id)
tile_left = raw_x * tile_size
if tile_left < -tile_size:
tile_left += world_size
screen_x = tile_left - left
screen_y = y * tile_size - top
tiles.append(VisibleTile(tile_id, screen_x, screen_y))
return tiles
class TileManager:
"""Asynchronous tile loader with memory and persistent disk caches."""
def __init__(
self,
*,
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
worker_count: int = 4,
user_agent: str | None = None,
) -> None:
self.memory = MemoryCacheModel(max_tiles=memory_cache_max_tiles)
self.disk_cache_max_bytes = disk_cache_max_bytes
self.worker_count = worker_count
self.user_agent = user_agent
self._tiles: dict[TileID, Tile] = {}
self._visible_tile_ids: set[TileID] = set()
self._queued: set[TileID] = set()
self._loading: set[TileID] = set()
self._failed: set[TileID] = set()
self._request_queue: Queue[TileRequest | None] = Queue()
self._result_queue: Queue[TileResult] = Queue()
self._threads: list[Thread] = []
self._lock = Lock()
self._delete_texture_tags: list[object] = []
self._disk_hits = 0
self._disk_misses = 0
self._stale_results = 0
self._warned_osm_user_agent = False
def start(self) -> None:
"""Start worker threads once."""
with self._lock:
if self._threads:
return
count = max(1, self.worker_count)
for index in range(count):
thread = Thread(
target=self._worker_loop,
name=f"dpg-map-tile-worker-{index + 1}",
daemon=True,
)
self._threads.append(thread)
thread.start()
def stop(self) -> None:
"""Ask workers to exit."""
with self._lock:
threads = list(self._threads)
self._threads.clear()
for _ in threads:
self._request_queue.put(None)
def request_visible_tiles(
self,
*,
center: LatLon,
zoom: int,
width: int,
height: int,
provider: TileProvider,
generation: int,
cache_dir: str | Path | None,
margin: int,
) -> list[VisibleTile]:
"""Queue missing visible tiles and return their screen positions."""
visible = calculate_visible_tiles(
center=center,
zoom=zoom,
width=width,
height=height,
provider=provider,
margin=margin,
)
visible_ids = {tile.tile_id for tile in visible}
with self._lock:
self._visible_tile_ids = visible_ids
for entry in self.memory.entries.values():
entry.protected = entry.tile_id in visible_ids
self.start()
for visible_tile in visible:
self._queue_tile(
visible_tile.tile_id,
provider=provider,
generation=generation,
cache_dir=cache_dir,
)
return visible
def _queue_tile(
self,
tile_id: TileID,
*,
provider: TileProvider,
generation: int,
cache_dir: str | Path | None,
) -> None:
with self._lock:
tile = self._tiles.get(tile_id)
if tile is not None and tile.status is TileStatus.READY:
self.memory.record_access(tile_id)
tile.last_accessed_at = time()
return
entry = self.memory.record_access(tile_id)
if entry is not None:
return
if tile_id in self._queued or tile_id in self._loading:
return
self._queued.add(tile_id)
self._tiles[tile_id] = Tile(tile_id, TileStatus.QUEUED, generation=generation)
headers = dict(provider.headers)
if provider.name == "osm":
headers = self._headers_with_osm_user_agent(headers)
path = tile_cache_path(
cache_dir,
provider.name,
tile_id.z,
tile_id.x,
tile_id.y,
provider.file_extension or "png",
)
request = TileRequest(
tile_id=tile_id,
generation=generation,
url=provider.build_url(x=tile_id.x, y=tile_id.y, z=tile_id.z),
path=path,
headers=headers,
disk_cache_max_bytes=self.disk_cache_max_bytes,
protected_paths=frozenset(path for path in self._visible_disk_paths(cache_dir)),
)
self._request_queue.put(request)
def _headers_with_osm_user_agent(self, headers: dict[str, str]) -> dict[str, str]:
if any(key.lower() == "user-agent" for key in headers):
return headers
if self.user_agent:
headers["User-Agent"] = self.user_agent
return headers
with self._lock:
should_warn = not self._warned_osm_user_agent
self._warned_osm_user_agent = True
if should_warn:
warnings.warn(
"OpenStreetMap tile usage should configure an application-specific user_agent",
RuntimeWarning,
stacklevel=3,
)
headers["User-Agent"] = "dpg-map/0.1"
return headers
def _visible_disk_paths(self, cache_dir: str | Path | None) -> list[Path]:
paths: list[Path] = []
with self._lock:
visible = tuple(self._visible_tile_ids)
for tile_id in visible:
paths.append(
tile_cache_path(
cache_dir,
tile_id.provider_name,
tile_id.z,
tile_id.x,
tile_id.y,
)
)
return paths
def get_ready_tile(self, tile_id: TileID) -> Tile | None:
"""Return a ready tile, updating memory LRU metadata."""
with self._lock:
tile = self._tiles.get(tile_id)
if tile is None or tile.status is not TileStatus.READY:
return None
tile.last_accessed_at = time()
self.memory.record_access(tile_id)
return tile
def drain_results(self, *, generation: int, provider_name: str) -> list[Tile]:
"""Accept current-generation results and return ready tiles."""
accepted: list[Tile] = []
while True:
try:
result = self._result_queue.get_nowait()
except Empty:
break
with self._lock:
self._queued.discard(result.tile_id)
self._loading.discard(result.tile_id)
if result.generation != generation or result.tile_id.provider_name != provider_name:
self._stale_results += 1
continue
if result.status is TileStatus.FAILED:
self._failed.add(result.tile_id)
self._tiles[result.tile_id] = Tile(
result.tile_id,
TileStatus.FAILED,
generation=result.generation,
error=result.error,
)
continue
tile = Tile(
result.tile_id,
TileStatus.READY,
generation=result.generation,
width=result.width,
height=result.height,
pixels=result.pixels,
source=result.source,
)
self._tiles[result.tile_id] = tile
self.memory.put(
MemoryCacheEntry(
tile_id=result.tile_id,
size_bytes=len(result.pixels) * 4,
protected=result.tile_id in self._visible_tile_ids,
texture_tag=None,
)
)
accepted.append(tile)
self._evict_memory_if_needed()
return accepted
def set_texture_tag(self, tile_id: TileID, texture_tag: object) -> None:
"""Record a GUI-thread texture tag for a ready tile."""
with self._lock:
tile = self._tiles.get(tile_id)
if tile is not None:
tile.texture_tag = texture_tag
entry = self.memory.entries.get(tile_id)
if entry is not None:
entry.texture_tag = texture_tag
def take_texture_deletions(self) -> list[object]:
"""Return texture tags that must be deleted by the GUI thread."""
with self._lock:
tags = list(self._delete_texture_tags)
self._delete_texture_tags.clear()
return tags
def clear_memory_cache(self) -> list[object]:
"""Clear decoded memory tiles and return texture tags for GUI deletion."""
with self._lock:
tags = [
entry.texture_tag
for entry in self.memory.entries.values()
if entry.texture_tag is not None
]
self._delete_texture_tags.extend(tags)
self.memory.entries.clear()
self._tiles.clear()
self._queued.clear()
self._loading.clear()
self._failed.clear()
return tags
def clear_disk_cache(self, cache_dir: str | Path | None) -> None:
"""Clear the persistent cache root."""
clear_disk_cache_path(cache_dir)
def snapshot(self) -> TileManagerSnapshot:
"""Return diagnostic counters."""
with self._lock:
return TileManagerSnapshot(
queued_tiles=len(self._queued),
loading_tiles=len(self._loading),
failed_tiles=len(self._failed),
visible_tile_count=len(self._visible_tile_ids),
memory_tiles=len(self.memory.entries),
memory_hits=self.memory.hits,
memory_misses=self.memory.misses,
disk_hits=self._disk_hits,
disk_misses=self._disk_misses,
stale_results=self._stale_results,
)
def _evict_memory_if_needed(self) -> None:
with self._lock:
evict_ids = self.memory.plan_evictions()
for raw_tile_id in evict_ids:
tile_id = cast(TileID, raw_tile_id)
entry = self.memory.entries.pop(tile_id, None)
tile = self._tiles.pop(tile_id, None)
texture_tag = None
if entry is not None:
texture_tag = entry.texture_tag
if texture_tag is None and tile is not None:
texture_tag = tile.texture_tag
if texture_tag is not None:
self._delete_texture_tags.append(texture_tag)
def _worker_loop(self) -> None:
while True:
request = self._request_queue.get()
if request is None:
return
with self._lock:
self._queued.discard(request.tile_id)
self._loading.add(request.tile_id)
tile = self._tiles.get(request.tile_id)
if tile is not None:
tile.status = TileStatus.LOADING
result = self._load_tile(request)
self._result_queue.put(result)
def _load_tile(self, request: TileRequest) -> TileResult:
try:
raw, source = self._read_or_fetch(request)
width, height, pixels = decode_tile_image(raw)
return TileResult(
request.tile_id,
request.generation,
TileStatus.READY,
width=width,
height=height,
pixels=pixels,
source=source,
)
except Exception as exc:
return TileResult(
request.tile_id,
request.generation,
TileStatus.FAILED,
error=str(exc),
)
def _read_or_fetch(self, request: TileRequest) -> tuple[bytes, Literal["disk", "network"]]:
if request.path.exists():
try:
raw = request.path.read_bytes()
touch_disk_metadata(tile_metadata_path(request.path))
except (OSError, CacheError):
raw = b""
if raw:
with self._lock:
self._disk_hits += 1
return raw, "disk"
with self._lock:
self._disk_misses += 1
response = requests.get(request.url, headers=request.headers, timeout=20)
response.raise_for_status()
raw = response.content
downloaded_at = time()
try:
request.path.parent.mkdir(parents=True, exist_ok=True)
request.path.write_bytes(raw)
write_disk_metadata(
tile_metadata_path(request.path),
DiskCacheMetadata(
url=request.url,
etag=response.headers.get("ETag"),
last_modified=response.headers.get("Last-Modified"),
expires=response.headers.get("Expires"),
downloaded_at=downloaded_at,
last_accessed_at=downloaded_at,
size_bytes=len(raw),
),
)
prune_disk_cache(
request.path.parents[3],
request.disk_cache_max_bytes,
protected_paths=set(request.protected_paths),
)
except (OSError, CacheError):
pass
return raw, "network"
def decode_tile_image(raw: bytes) -> tuple[int, int, tuple[float, ...]]:
"""Decode an image into Dear PyGui-compatible RGBA float data."""
try:
image = Image.open(BytesIO(raw)).convert("RGBA")
except UnidentifiedImageError as exc:
raise ValueError("tile image data could not be decoded") from exc
width, height = image.size
pixels = tuple(channel / 255.0 for channel in image.tobytes())
return width, height, pixels

31
src/dpg_map/types.py Normal file
View File

@@ -0,0 +1,31 @@
"""Shared type aliases and small value objects."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TypeAlias
Tag: TypeAlias = str | int
LatLon: TypeAlias = tuple[float, float]
Point: TypeAlias = tuple[float, float]
Bounds: TypeAlias = tuple[LatLon, LatLon]
TileCoord: TypeAlias = tuple[int, int, int]
Color: TypeAlias = tuple[int, int, int] | tuple[int, int, int, int]
@dataclass(frozen=True, slots=True)
class ScreenPoint:
"""A pixel position in map widget coordinates."""
x: float
y: float
@dataclass(frozen=True, slots=True)
class GeoBounds:
"""Latitude/longitude bounds with south-west and north-east corners."""
south: float
west: float
north: float
east: float

73
src/dpg_map/widget.py Normal file
View File

@@ -0,0 +1,73 @@
"""Dear PyGui map widget construction."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import Any
from .providers import TileProvider
from .renderer import MapRenderer
from .state import create_map_state, current_map_context
from .types import LatLon, Tag
@contextmanager
def map_widget(
*,
tag: Tag | None = None,
center: LatLon = (0.0, 0.0),
zoom: int = 2,
provider: str | TileProvider | None = None,
width: int = 0,
height: int = 0,
autosize_x: bool = False,
autosize_y: bool = False,
**kwargs: object,
) -> Iterator[Tag | None]:
"""Create a Dear PyGui child-window map shell and logical map context."""
cache_dir_value = kwargs.get("cache_dir")
cache_dir = cache_dir_value if isinstance(cache_dir_value, str | Path) else None
user_agent_value = kwargs.get("user_agent")
user_agent = user_agent_value if isinstance(user_agent_value, str) else None
state = create_map_state(
tag=tag,
center=center,
zoom=zoom,
provider=provider,
width=width,
height=height,
autosize_x=autosize_x,
autosize_y=autosize_y,
cache_dir=cache_dir,
user_agent=user_agent,
)
import dearpygui.dearpygui as dpg
child_kwargs: dict[str, Any] = dict(kwargs)
child_kwargs.pop("cache_dir", None)
child_kwargs.pop("user_agent", None)
child_kwargs.setdefault("border", False)
child_kwargs.setdefault("no_scrollbar", True)
child_kwargs.setdefault("no_scroll_with_mouse", True)
dpg.add_child_window(
tag=state.child_window_tag,
width=width,
height=height,
autosize_x=autosize_x,
autosize_y=autosize_y,
**child_kwargs,
)
dpg.add_texture_registry(tag=state.texture_registry_tag)
dpg.add_drawlist(1, 1, tag=state.drawlist_tag, parent=state.child_window_tag)
renderer = MapRenderer(state, dpg)
with state.lock:
state.renderer = renderer
renderer.schedule_next_frame()
with current_map_context(state.tag):
yield state.tag

1
tests/.gitkeep Normal file
View File

@@ -0,0 +1 @@

68
tests/test_cache.py Normal file
View File

@@ -0,0 +1,68 @@
from __future__ import annotations
from pathlib import Path
from dpg_map.cache import (
CacheStats,
DiskCacheConfig,
DiskCacheMetadata,
MemoryCacheConfig,
plan_disk_prune,
tile_cache_path,
write_disk_metadata,
)
def test_cache_stats_dataclass_construction() -> None:
stats = CacheStats(
memory_tiles=3,
memory_max_tiles=512,
memory_hits=10,
memory_misses=2,
disk_bytes=1024,
disk_max_bytes=None,
disk_hits=7,
disk_misses=1,
disk_path=Path("/tmp/dpg-map-cache"),
)
assert stats.memory_tiles == 3
assert stats.disk_max_bytes is None
assert stats.disk_path == Path("/tmp/dpg-map-cache")
def test_initial_cache_config_dataclasses() -> None:
memory_config = MemoryCacheConfig()
disk_config = DiskCacheConfig()
assert memory_config.max_tiles == 512
assert disk_config.max_bytes == 2_000_000_000
def test_disk_cache_path_generation(tmp_path: Path) -> None:
path = tile_cache_path(tmp_path, "osm", 4, 8, 9, "jpg")
assert path == tmp_path / "osm" / "4" / "8" / "9.jpg"
def test_disk_cache_prune_ordering(tmp_path: Path) -> None:
first = tile_cache_path(tmp_path, "osm", 1, 1, 1)
second = tile_cache_path(tmp_path, "osm", 1, 1, 2)
protected = tile_cache_path(tmp_path, "osm", 1, 1, 3)
for path, accessed_at in [(first, 1.0), (second, 2.0), (protected, 0.0)]:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"abcde")
write_disk_metadata(
path.with_suffix(".json"),
DiskCacheMetadata(
url=str(path),
downloaded_at=accessed_at,
last_accessed_at=accessed_at,
size_bytes=5,
),
)
planned = plan_disk_prune(tmp_path, 5, protected_paths={protected})
assert planned == [first, second]

35
tests/test_commands.py Normal file
View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from dpg_map.commands import CommandKind, MapCommand, MapCommandQueue
def test_overlay_updates_coalesce_by_overlay_tag() -> None:
queue = MapCommandQueue()
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 1.0}))
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 2.0}))
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "other", "lat": 3.0}))
drained = queue.drain()
assert len(drained) == 2
assert drained[0].payload == {"tag": "vehicle", "lat": 2.0}
assert drained[1].payload == {"tag": "other", "lat": 3.0}
def test_latest_view_update_wins_but_structural_commands_keep_order() -> None:
queue = MapCommandQueue()
queue.put(MapCommand(CommandKind.ADD_OVERLAY, "map", {"tag": "a"}))
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 3}))
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 4}))
queue.put(MapCommand(CommandKind.DELETE_OVERLAY, "map", {"tag": "a"}))
drained = queue.drain()
assert [command.kind for command in drained] == [
CommandKind.ADD_OVERLAY,
CommandKind.SET_VIEW,
CommandKind.DELETE_OVERLAY,
]
assert drained[1].payload == {"zoom": 4}

19
tests/test_interaction.py Normal file
View File

@@ -0,0 +1,19 @@
from __future__ import annotations
from dpg_map.interaction import calculate_hit_rect
from dpg_map.sizing import SizeMeasurement, apply_size_measurement
from dpg_map.state import create_map_state
def test_hit_rect_uses_effective_map_size() -> None:
state = create_map_state(tag="hit-rect")
apply_size_measurement(state, SizeMeasurement(width=400, height=250, visible=True))
rect = calculate_hit_rect(state, (10.0, 20.0))
assert rect.x == 10.0
assert rect.y == 20.0
assert rect.width == 400
assert rect.height == 250
assert rect.contains(410.0, 270.0)
assert not rect.contains(411.0, 270.0)

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import pytest
import dpg_map as dpgm
from dpg_map.exceptions import CoordinateError
from dpg_map.overlays import TrajectoryOverlay
from dpg_map.state import DirtyFlags, create_map_state, get_map_state
def test_overlay_update_does_not_alter_center_or_zoom() -> None:
create_map_state(tag="overlay-isolation", center=(47.0, 2.0), zoom=8)
dpgm.add_marker("vehicle", lat=47.1, lon=2.1, map_tag="overlay-isolation")
before_center = dpgm.get_center(map_tag="overlay-isolation")
before_zoom = dpgm.get_zoom(map_tag="overlay-isolation")
dpgm.update_marker("vehicle", lat=47.2, lon=2.2, map_tag="overlay-isolation")
assert dpgm.get_center(map_tag="overlay-isolation") == before_center
assert dpgm.get_zoom(map_tag="overlay-isolation") == before_zoom
state = get_map_state("overlay-isolation")
assert state.dirty & DirtyFlags.OVERLAYS
def test_trajectory_inputs_are_copied() -> None:
create_map_state(tag="trajectory-copy")
lats = [1.0, 2.0]
lons = [3.0, 4.0]
dpgm.add_trajectory("track", lats=lats, lons=lons, map_tag="trajectory-copy")
lats[0] = 99.0
lons[0] = 99.0
state = get_map_state("trajectory-copy")
overlay = state.overlays["track"]
assert isinstance(overlay, TrajectoryOverlay)
assert overlay.points == ((1.0, 3.0), (2.0, 4.0))
def test_mismatched_lat_lon_lengths_raise() -> None:
create_map_state(tag="bad-coordinates")
with pytest.raises(CoordinateError):
dpgm.add_trajectory("track", lats=[1.0], lons=[2.0, 3.0], map_tag="bad-coordinates")
def test_layer_state_tracks_visibility_and_overlay_membership() -> None:
create_map_state(tag="layers")
dpgm.add_layer("fleet", map_tag="layers")
dpgm.add_marker("vehicle", lat=1.0, lon=2.0, layer="fleet", map_tag="layers")
dpgm.hide_layer("fleet", map_tag="layers")
state = get_map_state("layers")
assert state.layers["fleet"].show is False
assert state.layers["fleet"].overlay_tags == {"vehicle"}
dpgm.clear_layer("fleet", map_tag="layers")
assert state.layers["fleet"].overlay_tags == set()
assert "vehicle" not in state.overlays

44
tests/test_projection.py Normal file
View File

@@ -0,0 +1,44 @@
from __future__ import annotations
import pytest
from dpg_map.projection import (
WEB_MERCATOR_MAX_LAT,
clamp_latitude,
latlon_to_tile,
latlon_to_world,
screen_to_world,
world_to_latlon,
world_to_screen,
)
@pytest.mark.parametrize(
("lat", "lon", "zoom"),
[
(0.0, 0.0, 0),
(47.9029, 1.9093, 15),
(-33.8688, 151.2093, 10),
(WEB_MERCATOR_MAX_LAT, 179.999, 4),
],
)
def test_projection_roundtrip(lat: float, lon: float, zoom: int) -> None:
x, y = latlon_to_world(lat, lon, zoom)
roundtrip_lat, roundtrip_lon = world_to_latlon(x, y, zoom)
assert roundtrip_lat == pytest.approx(clamp_latitude(lat), abs=1e-7)
assert roundtrip_lon == pytest.approx(lon, abs=1e-7)
def test_latlon_to_tile_returns_xyz_coordinate() -> None:
assert latlon_to_tile(0.0, 0.0, 1) == (1, 1, 1)
def test_world_screen_roundtrip() -> None:
center = (47.9029, 1.9093)
world = latlon_to_world(47.91, 1.92, 14)
screen = world_to_screen(*world, center=center, zoom=14, width=800, height=600)
assert screen_to_world(*screen, center=center, zoom=14, width=800, height=600) == pytest.approx(
world
)

71
tests/test_providers.py Normal file
View File

@@ -0,0 +1,71 @@
from __future__ import annotations
import pytest
import dpg_map as dpgm
from dpg_map.exceptions import InvalidProviderError, ProviderExistsError, ProviderNotFoundError
def test_default_provider_is_registered() -> None:
provider = dpgm.get_provider("osm")
assert provider.name == "osm"
assert "osm" in dpgm.list_providers()
assert provider.build_url(x=1, y=2, z=3) == "https://tile.openstreetmap.org/3/1/2.png"
def test_provider_registration_roundtrip() -> None:
provider = dpgm.TileProvider(
name="unit-test-provider",
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
attribution="Example",
)
dpgm.register_provider(provider)
try:
assert dpgm.get_provider("unit-test-provider") == provider
assert "unit-test-provider" in dpgm.list_providers()
finally:
dpgm.unregister_provider("unit-test-provider")
with pytest.raises(ProviderNotFoundError):
dpgm.get_provider("unit-test-provider")
def test_duplicate_provider_registration_fails() -> None:
provider = dpgm.TileProvider(
name="unit-test-duplicate",
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
)
dpgm.register_provider(provider)
try:
with pytest.raises(ProviderExistsError):
dpgm.register_provider(provider)
finally:
dpgm.unregister_provider("unit-test-duplicate")
def test_provider_url_building_with_subdomains_retina_and_extension() -> None:
provider = dpgm.TileProvider(
name="unit-test-template",
url_template="https://{s}.tiles.example.test/{z}/{x}/{y}{r}.{ext}",
subdomains=("a", "b", "c"),
retina=True,
file_extension="webp",
)
assert provider.build_url(x=1, y=2, z=3) == "https://a.tiles.example.test/3/1/2@2x.webp"
@pytest.mark.parametrize(
"template",
[
"https://tiles.example.test/{z}/{x}.png",
"https://tiles.example.test/{z}/{x}/{y}/{quadkey}.png",
"",
],
)
def test_invalid_provider_templates_fail(template: str) -> None:
with pytest.raises(InvalidProviderError):
dpgm.TileProvider(name="bad-template", url_template=template)

48
tests/test_public_api.py Normal file
View File

@@ -0,0 +1,48 @@
from __future__ import annotations
def test_package_exports_required_public_api() -> None:
import dpg_map as dpgm
expected = {
"configure",
"TileProvider",
"register_provider",
"unregister_provider",
"get_provider",
"list_providers",
"map_widget",
"set_center",
"get_center",
"set_zoom",
"get_zoom",
"set_view",
"fit_bounds",
"screen_to_latlon",
"latlon_to_screen",
"add_marker",
"add_polyline",
"add_trajectory",
"update_marker",
"update_polyline",
"update_trajectory",
"set_marker_position",
"set_marker_label",
"set_polyline_points",
"set_overlay_show",
"delete_overlay",
"add_layer",
"show_layer",
"hide_layer",
"clear_layer",
"clear_map",
"set_provider",
"clear_memory_cache",
"clear_disk_cache",
"get_cache_stats",
"get_map_debug_state",
}
assert set(dpgm.__all__) == expected
for name in expected:
assert hasattr(dpgm, name)

30
tests/test_renderer.py Normal file
View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from dpg_map.commands import CommandKind, MapCommand
from dpg_map.renderer import drain_renderer_commands
from dpg_map.state import DirtyFlags, create_map_state
def test_renderer_command_drain_preserves_structural_order_and_coalesces() -> None:
state = create_map_state(tag="renderer-drain")
state.dirty = DirtyFlags.NONE
state.command_queue.put(MapCommand(CommandKind.ADD_OVERLAY, state.tag, {"tag": "a"}))
state.command_queue.put(MapCommand(CommandKind.SET_VIEW, state.tag, {"zoom": 3}))
state.command_queue.put(MapCommand(CommandKind.SET_VIEW, state.tag, {"zoom": 4}))
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a", "v": 1}))
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a", "v": 2}))
state.command_queue.put(MapCommand(CommandKind.DELETE_OVERLAY, state.tag, {"tag": "a"}))
commands = drain_renderer_commands(state)
assert [command.kind for command in commands] == [
CommandKind.ADD_OVERLAY,
CommandKind.SET_VIEW,
CommandKind.UPDATE_OVERLAY,
CommandKind.DELETE_OVERLAY,
]
assert commands[1].payload == {"zoom": 4}
assert commands[2].payload == {"tag": "a", "v": 2}
assert state.dirty & DirtyFlags.VIEW
assert state.dirty & DirtyFlags.OVERLAYS

44
tests/test_sizing.py Normal file
View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from dpg_map.sizing import SizeMeasurement, apply_size_measurement, effective_draw_size
from dpg_map.state import DirtyFlags, create_map_state
def test_size_measurement_tracks_last_nonzero_size() -> None:
state = create_map_state(tag="size-last-nonzero")
first = apply_size_measurement(state, SizeMeasurement(width=640, height=360, visible=True))
hidden = apply_size_measurement(state, SizeMeasurement(width=0, height=0, visible=False))
assert first.changed is True
assert state.measured_width == 0
assert state.measured_height == 0
assert state.last_nonzero_width == 640
assert state.last_nonzero_height == 360
assert hidden.became_hidden is True
assert effective_draw_size(state) == (640, 360)
def test_zero_size_does_not_permanently_collapse_map() -> None:
state = create_map_state(tag="size-reappears")
apply_size_measurement(state, SizeMeasurement(width=500, height=300, visible=True))
apply_size_measurement(state, SizeMeasurement(width=0, height=0, visible=False))
update = apply_size_measurement(state, SizeMeasurement(width=700, height=450, visible=True))
assert update.became_visible is True
assert update.effective_width == 700
assert update.effective_height == 450
assert state.last_nonzero_width == 700
assert state.last_nonzero_height == 450
def test_resize_marks_size_dirty() -> None:
state = create_map_state(tag="size-dirty")
state.dirty = DirtyFlags.NONE
apply_size_measurement(state, SizeMeasurement(width=320, height=240, visible=True))
assert state.dirty & DirtyFlags.SIZE
assert state.dirty & DirtyFlags.TILES
assert state.dirty & DirtyFlags.OVERLAYS

87
tests/test_tiles.py Normal file
View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from io import BytesIO
from PIL import Image
from dpg_map.providers import OSM
from dpg_map.tiles import (
TileID,
TileManager,
TileResult,
TileStatus,
calculate_visible_tiles,
decode_tile_image,
)
def test_visible_tile_calculation_uses_provider_namespace() -> None:
tiles = calculate_visible_tiles(
center=(0.0, 0.0),
zoom=2,
width=256,
height=256,
provider=OSM,
margin=0,
)
assert tiles
assert {tile.tile_id.provider_name for tile in tiles} == {"osm"}
assert all(0 <= tile.tile_id.x <= 3 for tile in tiles)
assert all(0 <= tile.tile_id.y <= 3 for tile in tiles)
def test_tile_manager_ignores_stale_generation_results() -> None:
manager = TileManager()
stale = TileResult(
TileID("osm", 1, 0, 0),
generation=1,
status=TileStatus.READY,
width=1,
height=1,
pixels=(1.0, 1.0, 1.0, 1.0),
source="disk",
)
manager._result_queue.put(stale)
accepted = manager.drain_results(generation=2, provider_name="osm")
assert accepted == []
assert manager.snapshot().stale_results == 1
def test_memory_eviction_protects_visible_tiles() -> None:
manager = TileManager(memory_cache_max_tiles=1)
protected = TileID("osm", 1, 0, 0)
evictable = TileID("osm", 1, 0, 1)
with manager._lock:
manager._visible_tile_ids = {protected}
for tile_id in (protected, evictable):
manager._result_queue.put(
TileResult(
tile_id,
generation=1,
status=TileStatus.READY,
width=1,
height=1,
pixels=(1.0, 1.0, 1.0, 1.0),
source="disk",
)
)
manager.drain_results(generation=1, provider_name="osm")
assert manager.get_ready_tile(protected) is not None
assert manager.get_ready_tile(evictable) is None
def test_decode_png_tile_image() -> None:
image = Image.new("RGBA", (1, 1), (255, 0, 128, 255))
buffer = BytesIO()
image.save(buffer, format="PNG")
width, height, pixels = decode_tile_image(buffer.getvalue())
assert (width, height) == (1, 1)
assert pixels == (1.0, 0.0, 128 / 255, 1.0)