Compare commits

...

9 Commits

43 changed files with 5111 additions and 4 deletions

129
AGENTS.md Normal file
View File

@@ -0,0 +1,129 @@
# AGENTS.md
## Current status
Step 8 complete.
## Completed steps
Step 1 - Public API contract and pure core.
Step 2 - Thread-safe state, commands, overlays, and cache model.
Step 3 - Widget shell, sizing system, and GUI-thread frame pump.
Step 4 - Tile manager, persistent cache, and asynchronous loading.
Step 5 - Interaction: pan, zoom, and view commands.
Step 6 - Overlay rendering and runtime update stress tests.
Step 7 - Layers, provider switching, and clearing APIs.
Step 8 - Documentation, hardening, and internal release.
## Current step
Internal rebuilt beta prepared.
## Design decisions
- Package is managed with uv.
- Public import is `import dpg_map as dpgm`.
- Dear PyGui calls are GUI-thread-only.
- Runtime public calls enqueue commands or update logical state.
- Overlay updates must not reset center/zoom.
- The widget uses child_window + measured-size drawlist.
- Tiles use a memory cache and persistent disk cache.
- Tile providers are interchangeable.
## Known issues
None yet.
## Commands used
- Read `STEPS.md`, `FEATURES.md`, and `ARCHITECTURE.md`.
- Created initial package, examples, tests, and agent-log structure.
- Implemented public exports, exceptions, common types, tile provider registry, projection helpers, cache dataclasses, and GUI-dependent API stubs.
- Added Step 1 tests for imports, providers, projection, and cache dataclasses.
- Implemented global configuration, logical MapState, map registry, and current map context stack.
- Implemented DirtyFlags, MapCommand, CommandKind, and coalescing MapCommandQueue.
- Implemented logical marker, polyline, trajectory, and layer state models.
- Implemented public runtime overlay/view/layer/provider/cache/debug wrappers against logical state without Dear PyGui calls.
- Implemented memory cache metadata, disk cache path generation, metadata read/write, disk size scanning, and prune planning.
- Added Step 2 tests for command coalescing, overlay/view isolation, copied trajectory inputs, coordinate length validation, layer state, disk path generation, and prune ordering.
- Implemented `map_widget(...)` as a Dear PyGui child-window plus drawlist shell.
- Implemented GUI-thread renderer frame pump that schedules frame callbacks, drains command queues, measures size, resizes the drawlist, and draws a placeholder background/attribution.
- Implemented sizing helpers for measured size, last non-zero size preservation, visibility transitions, effective draw size, and resize dirty flags.
- Implemented map interaction hit-rectangle calculation.
- Implemented TileID, TileStatus, Tile, visible tile calculation, and TileManager.
- Implemented asynchronous tile worker queue for disk reads, HTTP fetches, and image decoding.
- Implemented provider-namespaced persistent cache writes, access metadata updates, clearing, and LRU pruning.
- Implemented memory tile cache with visible-tile protection and deferred GUI-thread texture deletion.
- Integrated tile result processing, stale generation/provider rejection, texture creation, and tile drawing into the GUI-thread renderer.
- Added OpenStreetMap User-Agent warning/fallback and configured examples with example user agents.
- Added Step 3 examples for basic map, window sizing, child-window sizing, table sizing, and hidden-tab sizing.
- Added Step 4 cache stress example.
- Added Step 3 tests for sizing transitions, zero-size preservation, resize dirty flags, command drain ordering, and hit rectangles.
- Added Step 4 tests for visible tile calculation, stale result rejection, protected memory eviction, and tile image decoding.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist creation.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry creation.
- Implemented left mouse drag panning using the measured drawlist rectangle.
- Implemented mouse wheel zoom around the cursor.
- Implemented projection-backed `screen_to_latlon`, `latlon_to_screen`, and zoom-fitting `fit_bounds`.
- Attached Dear PyGui mouse handlers through the map handler registry.
- Added interaction debug state for active drag and last mouse position.
- Added Step 5 tests for pan, cursor zoom, view conversion, bounds fitting, and overlay/view isolation.
- Added Pyright virtualenv settings so `uv run pyright` resolves installed dependencies.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry/handler-registry creation.
- Implemented Dear PyGui draw-layer bookkeeping for background, tiles, overlays, and attribution.
- Rendered markers, polylines, and trajectories from GUI-thread overlay snapshots.
- Isolated overlay redraws so they clear only the overlay draw layer and do not clear tile draw commands or textures.
- Added live background-thread marker and trajectory stress examples.
- Added Step 6 tests for overlay draw-layer isolation, overlay-only dirty flags, threaded update coalescing, and view/drag-state isolation.
- Updated `README.md` with Step 6 overlay rendering behavior and examples.
- Ran `uv run ruff format .`.
- Ran `uv run pytest`.
- Ran `uv run pyright`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format --check .`.
- Ran a Dear PyGui context smoke check for `map_widget` with marker, polyline, and trajectory overlays.
- Added `z_index` support for `add_layer`, layer visibility/clearing tests, and cache-safe map-wide memory clearing.
- Implemented provider switching that validates providers, clamps zoom to the new provider range, preserves overlays/center, increments generation, and queues GUI-thread tile invalidation.
- Implemented provider-scoped disk cache clearing and cache-size scanning.
- Exported `CacheStats` publicly.
- Added `examples/custom_provider.py` and cache control/stat buttons to `examples/cache_stress.py`.
- Updated `README.md` with Step 7 behavior and examples.
- Added Step 7 tests for provider switch tile invalidation, provider-scoped disk clearing, queued disk clear commands, layer z-order updates, and public exports.
- Ran `uv run ruff format .`.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran `uv run python -c "import dpg_map as dpgm; print(dpgm.list_providers())"`.
- Added docstrings for public API functions.
- Rewrote `README.md` with uv install, local editable dependency, basic usage, sizing, live update, custom provider, cache, OpenStreetMap, and thread-safety documentation.
- Added Step 8 hardening tests for unknown maps, overlays, providers, invalid coordinates, mismatched coordinate lengths, empty trajectory support, deleted overlays, provider switching while tiles are loading, overlay updates during dragging, and public docstrings.
- Bumped package version to `0.3.0b1` and updated the fallback OpenStreetMap User-Agent version.
- Ran `uv run pytest`.
- Ran `uv run ruff format .`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Tested editable install from `/tmp/dpg-map-editable-test` with `uv add --editable /home/hector/projects/dpg-map`.
- Ran editable install import check: `uv run python -c "import dpg_map as dpgm; print(dpgm.list_providers())"`.
- Ran `uv run python -m py_compile` across all example files.
- Started `examples/basic_map.py` under a 5-second timeout; it launched without terminal errors and was stopped by timeout because the GUI loop blocks.
- Ran `uv sync`.
## Next action
Commit and tag `v0.3.0b1`.

176
README.md
View File

@@ -0,0 +1,176 @@
# dpg-map
`dpg-map` is a Dear PyGui map widget for XYZ raster tiles and geographic overlays.
The rebuilt beta exposes a stable public import:
```python
import dpg_map as dpgm
```
## Install
Use `uv` for development and dependency management:
```bash
uv sync
uv run pytest
```
From another local project, add this package as an editable dependency:
```bash
uv add --editable ../dpg-map
uv run python -c "import dpg_map as dpgm; print(dpgm.list_providers())"
```
## Basic Map
```python
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
dpgm.configure(user_agent="my-app/0.1 contact@example.com")
dpg.create_context()
dpg.create_viewport(title="Map", width=1000, height=700)
with dpg.window(label="Map", width=-1, height=-1):
with dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15, width=-1, height=-1):
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093, label="Vehicle")
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
```
## Sizing
The widget is a Dear PyGui `child_window` containing a measured drawlist. The child keeps the requested Dear PyGui sizing intent while the drawlist uses concrete measured pixels.
Supported sizing modes:
- `width=0` and `height=0` keep Dear PyGui default sizing.
- `width=-1` and `height=-1` fill available space where Dear PyGui supports it.
- Positive dimensions request fixed sizes.
- `autosize_x` and `autosize_y` are passed to the child window.
- Hidden layouts preserve the last non-zero measured size until visible again.
Examples:
```bash
uv run python examples/sizing_window.py
uv run python examples/sizing_child.py
uv run python examples/sizing_table.py
uv run python examples/hidden_tab.py
```
## Live Updates
Runtime overlay updates are safe to call from background threads. They update logical state and enqueue renderer commands; Dear PyGui drawing, texture, handler, and viewport calls stay on the GUI thread.
Live marker update:
```python
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093, map_tag="map")
dpgm.update_marker("vehicle", lat=current_lat, lon=current_lon, map_tag="map")
```
Live trajectory update:
```python
dpgm.add_trajectory("track", points=[], map_tag="map")
dpgm.update_trajectory("track", points=tuple(points), map_tag="map")
```
Stress examples:
```bash
uv run python examples/markers_live_thread.py
uv run python examples/trajectory_live_thread.py
```
## Providers
OpenStreetMap is registered as `osm` by default. Custom XYZ providers can be registered and selected at runtime:
```python
provider = dpgm.TileProvider(
name="custom",
url_template="https://example.com/{z}/{x}/{y}.png",
attribution="Tiles (c) Example",
)
dpgm.register_provider(provider)
dpgm.set_provider("custom", map_tag="map")
```
Provider switching preserves overlays and center, clamps zoom to the new provider range, increments the tile generation, and ignores stale tile results from the previous provider.
Example:
```bash
uv run python examples/custom_provider.py
```
## Cache
Tiles use an in-memory cache and a persistent provider-namespaced disk cache.
```python
dpgm.configure(
user_agent="my-app/0.1 contact@example.com",
cache_dir=".tile-cache",
memory_cache_max_tiles=512,
disk_cache_max_bytes=2_000_000_000,
)
stats = dpgm.get_cache_stats(map_tag="map")
dpgm.clear_memory_cache(map_tag="map")
dpgm.clear_disk_cache(provider="osm")
```
`disk_cache_max_bytes=None` disables the disk size limit. Memory cache clears are routed through the renderer command queue so texture deletion happens on the GUI thread. Disk cache clears can target all providers or a single provider namespace.
Cache example:
```bash
uv run python examples/cache_stress.py
```
## OpenStreetMap Usage
The default OpenStreetMap provider requires attribution and should use an application-specific `User-Agent`.
```python
dpgm.configure(user_agent="my-product/1.0 contact@example.com")
```
If no user agent is configured, `dpg-map` emits a runtime warning and falls back to a package user agent. Applications are responsible for displaying provider attribution in accordance with provider terms; the renderer draws the provider attribution text on the map.
## Thread-Safety Contract
Public runtime functions are intended to be callable from non-GUI threads unless explicitly documented otherwise. They acquire map locks briefly, update logical state, and/or enqueue commands.
Thread-safe runtime areas include:
- view updates: `set_center`, `set_zoom`, `set_view`, `fit_bounds`
- overlay updates: `add_marker`, `update_marker`, `update_trajectory`, `delete_overlay`
- layer updates: `add_layer`, `show_layer`, `hide_layer`, `clear_layer`
- provider/cache updates: `set_provider`, `clear_memory_cache`, `clear_disk_cache`
`map_widget(...)` creates Dear PyGui items and must be used on the GUI thread inside an active Dear PyGui context.
## Examples
```bash
uv run python examples/basic_map.py
uv run python examples/cache_stress.py
uv run python examples/custom_provider.py
uv run python examples/markers_live_thread.py
uv run python examples/trajectory_live_thread.py
```

1
examples/.gitkeep Normal file
View File

@@ -0,0 +1 @@

29
examples/basic_map.py Normal file
View File

@@ -0,0 +1,29 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map basic example")
dpg.create_context()
dpg.create_viewport(title="dpg-map basic", width=900, height=600)
with (
dpg.window(label="Map", width=-1, height=-1),
dpgm.map_widget(tag="map", center=(47.9029, 1.9093), zoom=15, width=-1, height=-1),
):
dpgm.add_marker("vehicle", lat=47.9029, lon=1.9093)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

65
examples/cache_stress.py Normal file
View File

@@ -0,0 +1,65 @@
from pathlib import Path
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
cache_dir = Path(__file__).resolve().parent / ".tile-cache"
dpgm.configure(
cache_dir=cache_dir,
memory_cache_max_tiles=32,
disk_cache_max_bytes=30_000_000,
prefetch_margin_tiles=1,
user_agent="dpg-map cache_stress example",
)
dpg.create_context()
dpg.create_viewport(title="dpg-map cache stress", width=1000, height=700)
def clear_memory() -> None:
dpgm.clear_memory_cache(map_tag="cache-map")
def clear_disk() -> None:
dpgm.clear_disk_cache(map_tag="cache-map")
def refresh_stats() -> None:
stats = dpgm.get_cache_stats(map_tag="cache-map")
dpg.set_value(
"cache-stats",
(
f"memory {stats.memory_tiles}/{stats.memory_max_tiles} tiles | "
f"disk {stats.disk_bytes // 1024} KiB | "
f"hits m:{stats.memory_hits} d:{stats.disk_hits}"
),
)
with (
dpg.window(label="Cache Stress", width=-1, height=-1),
):
with dpg.group(horizontal=True):
dpg.add_button(label="Clear Memory", callback=clear_memory)
dpg.add_button(label="Clear Disk", callback=clear_disk)
dpg.add_button(label="Stats", callback=refresh_stats)
dpg.add_text("", tag="cache-stats")
with dpgm.map_widget(
tag="cache-map",
center=(47.9029, 1.9093),
zoom=14,
width=-1,
height=-1,
):
dpgm.add_marker("start", lat=47.9029, lon=1.9093, label="Orleans")
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,52 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map custom_provider example")
provider = dpgm.TileProvider(
name="carto-light",
url_template="https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png",
subdomains=("a", "b", "c", "d"),
attribution="(c) OpenStreetMap contributors (c) CARTO",
file_extension="png",
)
if "carto-light" not in dpgm.list_providers():
dpgm.register_provider(provider)
dpg.create_context()
dpg.create_viewport(title="dpg-map custom provider", width=900, height=600)
def use_osm() -> None:
dpgm.set_provider("osm", map_tag="custom-provider-map")
def use_carto() -> None:
dpgm.set_provider("carto-light", map_tag="custom-provider-map")
with dpg.window(label="Custom Provider", width=-1, height=-1):
with dpg.group(horizontal=True):
dpg.add_button(label="OSM", callback=use_osm)
dpg.add_button(label="Carto", callback=use_carto)
with dpgm.map_widget(
tag="custom-provider-map",
provider="carto-light",
center=(47.9029, 1.9093),
zoom=13,
width=-1,
height=-1,
):
dpgm.add_marker("orleans", lat=47.9029, lon=1.9093, label="Orleans")
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

27
examples/hidden_tab.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map hidden tab", width=900, height=600)
with dpg.window(label="Hidden Tab Sizing", width=-1, height=-1), dpg.tab_bar():
with dpg.tab(label="First"):
dpg.add_text("Switch to the map tab.")
with dpg.tab(label="Map"), dpgm.map_widget(tag="map-hidden-tab", width=-1, height=500):
dpgm.add_marker("tab-marker", lat=35.0, lon=139.0)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,66 @@
from __future__ import annotations
from math import cos, sin
from threading import Event, Thread
from time import sleep
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map markers_live_thread example")
stop = Event()
dpg.create_context()
dpg.create_viewport(title="dpg-map live markers", width=1000, height=700)
with (
dpg.window(label="Live Markers", width=-1, height=-1),
dpgm.map_widget(
tag="live-markers-map", center=(47.9029, 1.9093), zoom=15, width=-1, height=-1
),
):
for index in range(12):
dpgm.add_marker(
f"vehicle-{index}",
lat=47.9029,
lon=1.9093,
label=str(index + 1),
show_label=True,
color=(240, 92, 70, 255),
)
def update_markers() -> None:
tick = 0
while not stop.is_set():
for index in range(12):
angle = tick * 0.08 + index * 0.52
radius = 0.0015 + (index % 4) * 0.0002
dpgm.update_marker(
f"vehicle-{index}",
lat=47.9029 + sin(angle) * radius,
lon=1.9093 + cos(angle) * radius,
map_tag="live-markers-map",
)
tick += 1
sleep(1 / 30)
worker = Thread(target=update_markers, name="dpg-map-live-markers", daemon=True)
worker.start()
try:
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
finally:
stop.set()
worker.join(timeout=1.0)
dpg.destroy_context()
if __name__ == "__main__":
main()

28
examples/sizing_child.py Normal file
View File

@@ -0,0 +1,28 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map child sizing", width=900, height=600)
with (
dpg.window(label="Nested Child", width=-1, height=-1),
dpg.child_window(width=-1, height=420),
dpgm.map_widget(tag="map-child", width=-1, height=-1),
):
dpgm.add_marker("inside-child", lat=47.0, lon=2.0)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

33
examples/sizing_table.py Normal file
View File

@@ -0,0 +1,33 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map table sizing", width=1000, height=600)
with (
dpg.window(label="Table Layout", width=-1, height=-1),
dpg.table(header_row=True, resizable=True, policy=dpg.mvTable_SizingStretchProp),
):
dpg.add_table_column(label="Map")
dpg.add_table_column(label="Controls")
with dpg.table_row():
with dpg.table_cell(), dpgm.map_widget(tag="map-table", width=-1, height=500):
dpgm.add_marker("table-marker", lat=51.5, lon=-0.1)
with dpg.table_cell():
dpg.add_text("Resize the window and table columns.")
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

27
examples/sizing_window.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpg.create_context()
dpg.create_viewport(title="dpg-map window sizing", width=900, height=600)
with (
dpg.window(label="Fill Window", width=-1, height=-1),
dpgm.map_widget(tag="map-window", width=-1, height=-1),
):
dpgm.add_marker("center", lat=0.0, lon=0.0)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from collections import deque
from math import cos, sin
from threading import Event, Thread
from time import sleep
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map trajectory_live_thread example")
stop = Event()
points: deque[tuple[float, float]] = deque(maxlen=240)
dpg.create_context()
dpg.create_viewport(title="dpg-map live trajectory", width=1000, height=700)
with (
dpg.window(label="Live Trajectory", width=-1, height=-1),
dpgm.map_widget(
tag="live-trajectory-map",
center=(47.9029, 1.9093),
zoom=15,
width=-1,
height=-1,
),
):
dpgm.add_trajectory(
"track",
points=[],
color=(250, 190, 80, 255),
thickness=3.0,
show_points=True,
point_stride=12,
)
dpgm.add_marker(
"head",
lat=47.9029,
lon=1.9093,
color=(72, 205, 154, 255),
radius=6,
)
def update_trajectory() -> None:
tick = 0
while not stop.is_set():
angle = tick * 0.06
lat = 47.9029 + sin(angle) * 0.0016 + sin(angle * 2.7) * 0.00025
lon = 1.9093 + cos(angle) * 0.0016
points.append((lat, lon))
snapshot = tuple(points)
dpgm.update_trajectory("track", points=snapshot, map_tag="live-trajectory-map")
dpgm.update_marker("head", lat=lat, lon=lon, map_tag="live-trajectory-map")
tick += 1
sleep(1 / 20)
worker = Thread(target=update_trajectory, name="dpg-map-live-trajectory", daemon=True)
worker.start()
try:
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
finally:
stop.set()
worker.join(timeout=1.0)
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -1,7 +1,7 @@
[project]
name = "dpg-map"
version = "0.1.0"
description = "Add your description here"
version = "0.3.0b1"
description = "Dear PyGui map widget for XYZ raster tiles and geographic overlays"
readme = "README.md"
authors = [
{ name = "Hector van der Aa", email = "hector@h3cx.dev" }
@@ -27,3 +27,15 @@ dev = [
"pytest>=9.0.3",
"ruff>=0.15.14",
]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
[tool.pyright]
typeCheckingMode = "basic"
venvPath = "."
venv = ".venv"

View File

@@ -1,2 +1,89 @@
"""Public API for dpg-map."""
from .api import (
add_layer,
add_marker,
add_polyline,
add_trajectory,
clear_disk_cache,
clear_layer,
clear_map,
clear_memory_cache,
configure,
delete_overlay,
fit_bounds,
get_cache_stats,
get_center,
get_map_debug_state,
get_zoom,
hide_layer,
latlon_to_screen,
screen_to_latlon,
set_center,
set_marker_label,
set_marker_position,
set_overlay_show,
set_polyline_points,
set_provider,
set_view,
set_zoom,
show_layer,
update_marker,
update_polyline,
update_trajectory,
)
from .cache import CacheStats
from .providers import (
TileProvider,
get_provider,
list_providers,
register_provider,
unregister_provider,
)
from .widget import map_widget
__all__ = [
"CacheStats",
"TileProvider",
"add_layer",
"add_marker",
"add_polyline",
"add_trajectory",
"clear_disk_cache",
"clear_layer",
"clear_map",
"clear_memory_cache",
"configure",
"delete_overlay",
"fit_bounds",
"get_cache_stats",
"get_center",
"get_map_debug_state",
"get_provider",
"get_zoom",
"hide_layer",
"latlon_to_screen",
"list_providers",
"map_widget",
"register_provider",
"screen_to_latlon",
"set_center",
"set_marker_label",
"set_marker_position",
"set_overlay_show",
"set_polyline_points",
"set_provider",
"set_view",
"set_zoom",
"show_layer",
"unregister_provider",
"update_marker",
"update_polyline",
"update_trajectory",
]
def main() -> None:
print("Hello from dpg-map!")
"""Console entry point placeholder."""
print("dpg-map")

662
src/dpg_map/api.py Normal file
View File

@@ -0,0 +1,662 @@
"""Public API wrappers for dpg-map."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import asdict
from math import isfinite
from pathlib import Path
from typing import Any
from .cache import CacheStats, clear_disk_cache_path, disk_cache_root, disk_cache_size_bytes
from .commands import CommandKind, MapCommand
from .exceptions import (
CoordinateError,
InvalidProviderError,
MapNotFoundError,
OverlayNotFoundError,
)
from .interaction import latlon_to_screen_in_state, screen_to_latlon_in_state
from .overlays import LayerState, MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
from .projection import latlon_to_world
from .providers import TileProvider, get_provider
from .sizing import effective_draw_size
from .state import (
DirtyFlags,
configure_state,
find_map_for_overlay,
get_config,
get_map_state,
list_map_states,
mark_dirty,
)
from .types import Bounds, LatLon, Point, Tag
def _validate_latlon(lat: float, lon: float) -> LatLon:
lat_value = float(lat)
lon_value = float(lon)
if not isfinite(lat_value) or not isfinite(lon_value):
raise CoordinateError("coordinates must be finite numbers")
if lat_value < -90.0 or lat_value > 90.0:
raise CoordinateError("latitude must be between -90 and 90")
if lon_value < -180.0 or lon_value > 180.0:
raise CoordinateError("longitude must be between -180 and 180")
return (lat_value, lon_value)
def _points_from_inputs(
points: Sequence[LatLon] | None = None,
*,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
) -> tuple[LatLon, ...]:
if points is not None and (lats is not None or lons is not None):
raise CoordinateError("provide either points or lats/lons, not both")
if points is not None:
return tuple(_validate_latlon(lat, lon) for lat, lon in points)
if lats is None and lons is None:
return ()
if lats is None or lons is None:
raise CoordinateError("lats and lons must be provided together")
lat_values = tuple(lats)
lon_values = tuple(lons)
if len(lat_values) != len(lon_values):
raise CoordinateError("lats and lons must have the same length")
return tuple(
_validate_latlon(lat, lon) for lat, lon in zip(lat_values, lon_values, strict=True)
)
def _ensure_layer(state: Any, layer_name: str, z_index: int = 0, show: bool = True) -> LayerState:
layer = state.layers.get(layer_name)
if layer is None:
layer = LayerState(layer_name, z_index=z_index, show=show)
state.layers[layer_name] = layer
return layer
def _overlay_payload(overlay: Overlay) -> dict[str, Any]:
return {"tag": overlay.tag, "overlay": asdict(overlay)}
def _queue(state: Any, kind: CommandKind, payload: dict[str, Any]) -> None:
state.command_queue.put(MapCommand(kind=kind, map_tag=state.tag, payload=payload))
def configure(
*,
user_agent: str | None = None,
cache_dir: str | Path | None = None,
default_provider: str | TileProvider = "osm",
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
prefetch_margin_tiles: int = 1,
tile_worker_count: int = 4,
overlay_update_policy: str = "coalesce",
debug: bool = False,
) -> None:
"""Configure package-wide defaults used by subsequently created maps."""
configure_state(
user_agent=user_agent,
cache_dir=cache_dir,
default_provider=default_provider,
memory_cache_max_tiles=memory_cache_max_tiles,
disk_cache_max_bytes=disk_cache_max_bytes,
prefetch_margin_tiles=prefetch_margin_tiles,
tile_worker_count=tile_worker_count,
overlay_update_policy=overlay_update_policy,
debug=debug,
)
def set_center(lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
"""Set a map center without changing its zoom."""
set_view(center=_validate_latlon(lat, lon), map_tag=map_tag)
def get_center(*, map_tag: Tag | None = None) -> LatLon:
"""Return the current logical center of a map."""
state = get_map_state(map_tag)
with state.lock:
return state.center
def set_zoom(zoom: int, *, map_tag: Tag | None = None) -> None:
"""Set a map zoom, clamped to the map/provider zoom range."""
set_view(zoom=zoom, map_tag=map_tag)
def get_zoom(*, map_tag: Tag | None = None) -> int:
"""Return the current logical zoom of a map."""
state = get_map_state(map_tag)
with state.lock:
return state.zoom
def set_view(
*,
center: LatLon | None = None,
zoom: int | None = None,
map_tag: Tag | None = None,
) -> None:
"""Set map center and/or zoom as one logical view update."""
state = get_map_state(map_tag)
with state.lock:
payload: dict[str, Any] = {}
if center is not None:
state.center = _validate_latlon(center[0], center[1])
payload["center"] = state.center
if zoom is not None:
state.zoom = max(state.min_zoom, min(state.max_zoom, int(zoom)))
payload["zoom"] = state.zoom
if not payload:
return
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_VIEW, payload)
def fit_bounds(bounds: Bounds, *, map_tag: Tag | None = None) -> None:
"""Set center and zoom so geographic bounds fit the current draw area."""
(south_west, north_east) = bounds
south, west = _validate_latlon(south_west[0], south_west[1])
north, east = _validate_latlon(north_east[0], north_east[1])
south, north = min(south, north), max(south, north)
state = get_map_state(map_tag)
with state.lock:
width, height = effective_draw_size(state)
padding = 32
usable_width = max(1, width - padding * 2)
usable_height = max(1, height - padding * 2)
tile_size = state.provider.tile_size
target_zoom = state.min_zoom
for candidate_zoom in range(state.max_zoom, state.min_zoom - 1, -1):
west_x, north_y = latlon_to_world(north, west, candidate_zoom, tile_size)
east_x, south_y = latlon_to_world(south, east, candidate_zoom, tile_size)
world_size = tile_size * (2**candidate_zoom)
x_span = abs(east_x - west_x)
x_span = min(x_span, world_size - x_span)
y_span = abs(south_y - north_y)
if x_span <= usable_width and y_span <= usable_height:
target_zoom = candidate_zoom
break
set_view(center=((south + north) / 2.0, (west + east) / 2.0), zoom=target_zoom, map_tag=map_tag)
def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon:
"""Convert map-local screen coordinates to latitude/longitude."""
state = get_map_state(map_tag)
return screen_to_latlon_in_state(state, float(x), float(y))
def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point:
"""Convert latitude/longitude to map-local screen coordinates."""
lat_value, lon_value = _validate_latlon(lat, lon)
state = get_map_state(map_tag)
return latlon_to_screen_in_state(state, lat_value, lon_value)
def add_marker(
tag: Tag,
*,
lat: float,
lon: float,
label: str | None = None,
layer: str = "default",
show: bool = True,
map_tag: Tag | None = None,
**kwargs: Any,
) -> Tag:
"""Add or replace a marker overlay and return its tag."""
state = get_map_state(map_tag)
color = kwargs.get("color", (255, 80, 80, 255))
radius = float(kwargs.get("radius", 5.0))
show_label = bool(kwargs.get("show_label", False))
user_data = kwargs.get("user_data")
callback = kwargs.get("callback")
with state.lock:
marker = MarkerOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=user_data,
lat=_validate_latlon(lat, lon)[0],
lon=_validate_latlon(lat, lon)[1],
label=label,
color=color,
radius=radius,
show_label=show_label,
callback=callback,
)
state.overlays[tag] = marker
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(marker))
return tag
def add_polyline(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
layer: str = "default",
show: bool = True,
map_tag: Tag | None = None,
**kwargs: Any,
) -> Tag:
"""Add or replace a polyline overlay and return its tag."""
state = get_map_state(map_tag)
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
with state.lock:
polyline = PolylineOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=kwargs.get("user_data"),
points=copied_points,
color=kwargs.get("color", (80, 180, 255, 255)),
thickness=float(kwargs.get("thickness", 2.0)),
closed=bool(kwargs.get("closed", False)),
simplify=bool(kwargs.get("simplify", True)),
)
state.overlays[tag] = polyline
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(polyline))
return tag
def add_trajectory(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
layer: str = "default",
show: bool = True,
map_tag: Tag | None = None,
**kwargs: Any,
) -> Tag:
"""Add or replace a trajectory overlay and return its tag."""
state = get_map_state(map_tag)
copied_points = _points_from_inputs(points, lats=lats, lons=lons)
timestamps = kwargs.get("timestamps")
copied_timestamps = tuple(timestamps) if timestamps is not None else None
if copied_timestamps is not None and len(copied_timestamps) != len(copied_points):
raise CoordinateError("timestamps must have the same length as trajectory points")
point_stride = int(kwargs.get("point_stride", 1))
if point_stride < 1:
raise ValueError("point_stride must be >= 1")
with state.lock:
trajectory = TrajectoryOverlay(
tag=tag,
map_tag=state.tag,
layer=layer,
show=show,
user_data=kwargs.get("user_data"),
points=copied_points,
timestamps=copied_timestamps,
color=kwargs.get("color", (255, 180, 60, 255)),
thickness=float(kwargs.get("thickness", 2.0)),
show_points=bool(kwargs.get("show_points", False)),
point_stride=point_stride,
)
state.overlays[tag] = trajectory
_ensure_layer(state, layer).overlay_tags.add(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.ADD_OVERLAY, _overlay_payload(trajectory))
return tag
def update_marker(
tag: Tag,
*,
lat: float | None = None,
lon: float | None = None,
label: str | None = None,
map_tag: Tag | None = None,
**kwargs: Any,
) -> None:
"""Update marker properties without changing the map view."""
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, MarkerOverlay):
raise OverlayNotFoundError(f"marker not found: {tag}")
if lat is not None or lon is not None:
overlay.lat, overlay.lon = _validate_latlon(
overlay.lat if lat is None else lat,
overlay.lon if lon is None else lon,
)
if label is not None:
overlay.label = label
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "radius" in kwargs:
overlay.radius = float(kwargs["radius"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def update_polyline(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
map_tag: Tag | None = None,
**kwargs: Any,
) -> None:
"""Update polyline properties without changing the map view."""
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, PolylineOverlay):
raise OverlayNotFoundError(f"polyline not found: {tag}")
if points is not None or lats is not None or lons is not None:
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "thickness" in kwargs:
overlay.thickness = float(kwargs["thickness"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def update_trajectory(
tag: Tag,
*,
points: Sequence[LatLon] | None = None,
lats: Sequence[float] | None = None,
lons: Sequence[float] | None = None,
map_tag: Tag | None = None,
**kwargs: Any,
) -> None:
"""Update trajectory properties without changing the map view."""
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if not isinstance(overlay, TrajectoryOverlay):
raise OverlayNotFoundError(f"trajectory not found: {tag}")
if points is not None or lats is not None or lons is not None:
overlay.points = _points_from_inputs(points, lats=lats, lons=lons)
if "timestamps" in kwargs:
timestamps = kwargs["timestamps"]
overlay.timestamps = tuple(timestamps) if timestamps is not None else None
if overlay.timestamps is not None and len(overlay.timestamps) != len(overlay.points):
raise CoordinateError("timestamps must have the same length as trajectory points")
if "show" in kwargs:
overlay.show = bool(kwargs["show"])
if "color" in kwargs:
overlay.color = kwargs["color"]
if "thickness" in kwargs:
overlay.thickness = float(kwargs["thickness"])
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def set_marker_position(tag: Tag, lat: float, lon: float, *, map_tag: Tag | None = None) -> None:
"""Set a marker latitude/longitude."""
update_marker(tag, lat=lat, lon=lon, map_tag=map_tag)
def set_marker_label(tag: Tag, label: str, *, map_tag: Tag | None = None) -> None:
"""Set a marker label."""
update_marker(tag, label=label, map_tag=map_tag)
def set_polyline_points(
tag: Tag,
points: Sequence[LatLon],
*,
map_tag: Tag | None = None,
) -> None:
"""Replace a polyline point sequence."""
update_polyline(tag, points=points, map_tag=map_tag)
def set_overlay_show(tag: Tag, show: bool, *, map_tag: Tag | None = None) -> None:
"""Show or hide an overlay without deleting it."""
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.get(tag)
if overlay is None:
raise OverlayNotFoundError(f"overlay not found: {tag}")
overlay.show = bool(show)
overlay.touch()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.UPDATE_OVERLAY, _overlay_payload(overlay))
def delete_overlay(tag: Tag, *, map_tag: Tag | None = None) -> None:
"""Delete an overlay from its map and layer."""
state = find_map_for_overlay(tag, map_tag)
with state.lock:
overlay = state.overlays.pop(tag, None)
if overlay is None:
raise OverlayNotFoundError(f"overlay not found: {tag}")
layer = state.layers.get(overlay.layer)
if layer is not None:
layer.overlay_tags.discard(tag)
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.DELETE_OVERLAY, {"tag": tag})
def _cache_target_states(map_tag: Tag | None) -> list[Any]:
if map_tag is not None:
return [get_map_state(map_tag)]
try:
return [get_map_state(None)]
except MapNotFoundError:
return list_map_states()
def add_layer(
name: str,
*,
z_index: int | None = None,
show: bool = True,
map_tag: Tag | None = None,
) -> None:
"""Create or update a logical overlay layer."""
state = get_map_state(map_tag)
with state.lock:
layer = _ensure_layer(
state,
name,
z_index=len(state.layers) if z_index is None else int(z_index),
show=show,
)
if z_index is not None:
layer.z_index = int(z_index)
layer.show = show
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(
state,
CommandKind.ADD_LAYER,
{"name": name, "show": show, "z_index": layer.z_index},
)
def show_layer(name: str, *, map_tag: Tag | None = None) -> None:
"""Show all overlays assigned to a layer."""
state = get_map_state(map_tag)
with state.lock:
_ensure_layer(state, name).show = True
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": True})
def hide_layer(name: str, *, map_tag: Tag | None = None) -> None:
"""Hide all overlays assigned to a layer."""
state = get_map_state(map_tag)
with state.lock:
_ensure_layer(state, name).show = False
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_LAYER_VISIBILITY, {"name": name, "show": False})
def clear_layer(name: str, *, map_tag: Tag | None = None) -> None:
"""Delete all overlays assigned to a layer."""
state = get_map_state(map_tag)
with state.lock:
layer = _ensure_layer(state, name)
for overlay_tag in tuple(layer.overlay_tags):
state.overlays.pop(overlay_tag, None)
layer.overlay_tags.clear()
mark_dirty(state, DirtyFlags.OVERLAYS)
_queue(state, CommandKind.CLEAR_LAYER, {"name": name})
def clear_map(*, map_tag: Tag | None = None) -> None:
"""Delete all overlays and invalidate map tile resources."""
state = get_map_state(map_tag)
with state.lock:
state.overlays.clear()
for layer in state.layers.values():
layer.overlay_tags.clear()
state.generation += 1
mark_dirty(state, DirtyFlags.OVERLAYS | DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_MAP, {})
def set_provider(provider: str | TileProvider, *, map_tag: Tag | None = None) -> None:
"""Switch a map to another tile provider while preserving overlays."""
if isinstance(provider, str):
provider_obj = get_provider(provider)
elif isinstance(provider, TileProvider):
provider_obj = provider
else:
raise InvalidProviderError("provider must be a provider name or TileProvider")
state = get_map_state(map_tag)
with state.lock:
if state.provider == provider_obj:
return
state.provider = provider_obj
state.min_zoom = provider_obj.min_zoom
state.max_zoom = provider_obj.max_zoom
state.zoom = max(state.min_zoom, min(state.max_zoom, state.zoom))
state.generation += 1
mark_dirty(state, DirtyFlags.PROVIDER | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
_queue(state, CommandKind.SET_PROVIDER, {"provider": provider_obj.name})
def clear_memory_cache(*, map_tag: Tag | None = None) -> None:
"""Clear decoded in-memory tile data through the renderer command queue."""
for state in _cache_target_states(map_tag):
with state.lock:
state.generation += 1
mark_dirty(state, DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_MEMORY_CACHE, {})
def clear_disk_cache(provider: str | None = None, *, map_tag: Tag | None = None) -> None:
"""Clear persistent tile cache data globally or for one map/provider."""
if provider is not None:
get_provider(provider)
if map_tag is None:
clear_disk_cache_path(get_config().cache_dir, provider=provider)
return
state = get_map_state(map_tag)
with state.lock:
state.generation += 1
mark_dirty(state, DirtyFlags.TILES)
_queue(state, CommandKind.CLEAR_DISK_CACHE, {"provider": provider})
def get_cache_stats(*, map_tag: Tag | None = None) -> CacheStats:
"""Return memory and disk cache diagnostics."""
config = get_config()
if map_tag is None:
cache_dir = config.cache_dir
return CacheStats(
memory_max_tiles=config.memory_cache_max_tiles,
disk_bytes=disk_cache_size_bytes(cache_dir),
disk_max_bytes=config.disk_cache_max_bytes,
disk_path=disk_cache_root(cache_dir),
)
state = get_map_state(map_tag)
with state.lock:
tile_snapshot = state.tile_manager.snapshot()
return CacheStats(
memory_tiles=tile_snapshot.memory_tiles,
memory_max_tiles=config.memory_cache_max_tiles,
memory_hits=tile_snapshot.memory_hits,
memory_misses=tile_snapshot.memory_misses,
disk_bytes=disk_cache_size_bytes(state.cache_dir),
disk_max_bytes=config.disk_cache_max_bytes,
disk_hits=tile_snapshot.disk_hits,
disk_misses=tile_snapshot.disk_misses,
disk_path=disk_cache_root(state.cache_dir),
)
def get_map_debug_state(*, map_tag: Tag | None = None) -> dict[str, Any]:
"""Return a diagnostic snapshot for a map."""
state = get_map_state(map_tag)
with state.lock:
return {
"tag": state.tag,
"center": state.center,
"zoom": state.zoom,
"requested_size": (state.requested_width, state.requested_height),
"measured_size": (state.measured_width, state.measured_height),
"visible": state.is_visible,
"provider": state.provider.name,
"overlay_count": len(state.overlays),
"layers": {
name: {
"show": layer.show,
"z_index": layer.z_index,
"overlay_count": len(layer.overlay_tags),
}
for name, layer in state.layers.items()
},
"dirty_flags": int(state.dirty),
"pending_command_count": len(state.command_queue),
"generation": state.generation,
"active_drag": state.interaction.active_drag,
"last_mouse_position": state.interaction.last_mouse_position,
"tiles": asdict(state.tile_manager.snapshot()),
}

301
src/dpg_map/cache.py Normal file
View File

@@ -0,0 +1,301 @@
"""Memory and disk cache helpers."""
from __future__ import annotations
import json
import shutil
from dataclasses import asdict, dataclass, field
from pathlib import Path
from time import time
from typing import Any
from platformdirs import user_cache_dir
from .exceptions import CacheError
@dataclass(frozen=True, slots=True)
class CacheStats:
"""Public cache statistics snapshot."""
memory_tiles: int = 0
memory_max_tiles: int = 0
memory_hits: int = 0
memory_misses: int = 0
disk_bytes: int = 0
disk_max_bytes: int | None = None
disk_hits: int = 0
disk_misses: int = 0
disk_path: Path | None = None
@dataclass(slots=True)
class MemoryCacheConfig:
"""Initial memory cache configuration."""
max_tiles: int = 512
@dataclass(slots=True)
class DiskCacheConfig:
"""Initial persistent disk cache configuration."""
path: Path | None = None
max_bytes: int | None = 2_000_000_000
@dataclass(slots=True)
class MemoryCacheEntry:
"""Metadata for one in-memory tile."""
tile_id: object
size_bytes: int = 0
last_accessed_at: float = field(default_factory=time)
protected: bool = False
texture_tag: object | None = None
@dataclass(slots=True)
class MemoryCacheModel:
"""Small LRU metadata model for decoded/runtime tiles."""
max_tiles: int = 512
entries: dict[object, MemoryCacheEntry] = field(default_factory=dict)
hits: int = 0
misses: int = 0
def record_access(self, tile_id: object) -> MemoryCacheEntry | None:
"""Mark an entry as recently used and return it if present."""
entry = self.entries.get(tile_id)
if entry is None:
self.misses += 1
return None
self.hits += 1
entry.last_accessed_at = time()
return entry
def put(self, entry: MemoryCacheEntry) -> None:
"""Insert or replace entry metadata."""
entry.last_accessed_at = time()
self.entries[entry.tile_id] = entry
def plan_evictions(self) -> list[object]:
"""Return tile IDs that can be evicted without touching GUI resources."""
overflow = len(self.entries) - self.max_tiles
if overflow <= 0:
return []
candidates = [entry for entry in self.entries.values() if not entry.protected]
candidates.sort(key=lambda entry: entry.last_accessed_at)
return [entry.tile_id for entry in candidates[:overflow]]
@dataclass(frozen=True, slots=True)
class DiskCacheMetadata:
"""Persistent metadata stored next to a tile file."""
url: str = ""
etag: str | None = None
last_modified: str | None = None
expires: str | None = None
downloaded_at: float = 0.0
last_accessed_at: float = 0.0
size_bytes: int = 0
@dataclass(frozen=True, slots=True)
class DiskCacheEntry:
"""Scanned disk cache file plus metadata."""
tile_path: Path
metadata_path: Path
metadata: DiskCacheMetadata
def default_cache_dir() -> Path:
"""Return the default persistent cache directory."""
return Path(user_cache_dir("dpg-map", appauthor=False))
def disk_cache_root(cache_dir: str | Path | None = None) -> Path:
"""Resolve the disk cache root path."""
return Path(cache_dir).expanduser() if cache_dir is not None else default_cache_dir()
def tile_cache_path(
cache_dir: str | Path | None,
provider_name: str,
z: int,
x: int,
y: int,
extension: str | None = None,
) -> Path:
"""Return the provider-namespaced persistent tile path."""
ext = (extension or "png").lstrip(".")
safe_provider = provider_name.replace("/", "_")
return disk_cache_root(cache_dir) / safe_provider / str(z) / str(x) / f"{y}.{ext}"
def tile_metadata_path(tile_path: Path) -> Path:
"""Return the metadata path for a tile path."""
return tile_path.with_suffix(".json")
def read_disk_metadata(path: Path) -> DiskCacheMetadata:
"""Read a metadata JSON file, returning defaults for missing metadata."""
if not path.exists():
return DiskCacheMetadata()
try:
raw: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
except OSError as exc:
raise CacheError(f"could not read cache metadata: {path}") from exc
except json.JSONDecodeError as exc:
raise CacheError(f"invalid cache metadata JSON: {path}") from exc
return DiskCacheMetadata(
url=str(raw.get("url", "")),
etag=raw.get("etag"),
last_modified=raw.get("last_modified"),
expires=raw.get("expires"),
downloaded_at=float(raw.get("downloaded_at", 0.0)),
last_accessed_at=float(raw.get("last_accessed_at", 0.0)),
size_bytes=int(raw.get("size_bytes", 0)),
)
def write_disk_metadata(path: Path, metadata: DiskCacheMetadata) -> None:
"""Write metadata JSON next to a tile file."""
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(metadata), sort_keys=True), encoding="utf-8")
except OSError as exc:
raise CacheError(f"could not write cache metadata: {path}") from exc
def touch_disk_metadata(path: Path, *, accessed_at: float | None = None) -> None:
"""Update only the last access timestamp for a metadata file."""
metadata = read_disk_metadata(path)
write_disk_metadata(
path,
DiskCacheMetadata(
url=metadata.url,
etag=metadata.etag,
last_modified=metadata.last_modified,
expires=metadata.expires,
downloaded_at=metadata.downloaded_at,
last_accessed_at=time() if accessed_at is None else accessed_at,
size_bytes=metadata.size_bytes,
),
)
def scan_disk_cache(cache_dir: str | Path | None) -> list[DiskCacheEntry]:
"""Scan tile files under a disk cache root."""
root = disk_cache_root(cache_dir)
if not root.exists():
return []
entries: list[DiskCacheEntry] = []
for path in root.rglob("*"):
if not path.is_file() or path.suffix == ".json":
continue
metadata_path = tile_metadata_path(path)
metadata = read_disk_metadata(metadata_path)
size_bytes = metadata.size_bytes or path.stat().st_size
if size_bytes != metadata.size_bytes:
metadata = DiskCacheMetadata(
url=metadata.url,
etag=metadata.etag,
last_modified=metadata.last_modified,
expires=metadata.expires,
downloaded_at=metadata.downloaded_at,
last_accessed_at=metadata.last_accessed_at,
size_bytes=size_bytes,
)
entries.append(DiskCacheEntry(path, metadata_path, metadata))
return entries
def disk_cache_size_bytes(
cache_dir: str | Path | None,
*,
provider: str | None = None,
) -> int:
"""Return total bytes for cached tile files, optionally scoped to one provider."""
if provider is None:
return sum(entry.metadata.size_bytes for entry in scan_disk_cache(cache_dir))
safe_provider = provider.replace("/", "_")
provider_root = disk_cache_root(cache_dir) / safe_provider
if not provider_root.exists():
return 0
return sum(entry.metadata.size_bytes for entry in scan_disk_cache(provider_root))
def plan_disk_prune(
cache_dir: str | Path | None,
max_bytes: int | None,
*,
protected_paths: set[Path] | None = None,
) -> list[Path]:
"""Return tile paths that should be pruned by LRU order without deleting them."""
if max_bytes is None:
return []
protected = {path.resolve() for path in protected_paths or set()}
entries = scan_disk_cache(cache_dir)
total = sum(entry.metadata.size_bytes for entry in entries)
if total <= max_bytes:
return []
candidates = [entry for entry in entries if entry.tile_path.resolve() not in protected]
candidates.sort(key=lambda entry: entry.metadata.last_accessed_at)
prune: list[Path] = []
for entry in candidates:
if total <= max_bytes:
break
prune.append(entry.tile_path)
total -= entry.metadata.size_bytes
return prune
def prune_disk_cache(
cache_dir: str | Path | None,
max_bytes: int | None,
*,
protected_paths: set[Path] | None = None,
) -> list[Path]:
"""Delete LRU tile files until the cache fits the configured limit."""
planned = plan_disk_prune(cache_dir, max_bytes, protected_paths=protected_paths)
for path in planned:
metadata_path = tile_metadata_path(path)
try:
path.unlink(missing_ok=True)
metadata_path.unlink(missing_ok=True)
except OSError as exc:
raise CacheError(f"could not prune cached tile: {path}") from exc
return planned
def clear_disk_cache_path(cache_dir: str | Path | None, *, provider: str | None = None) -> None:
"""Remove persistent tile cache files under a cache root."""
root = disk_cache_root(cache_dir)
if provider is not None:
root = root / provider.replace("/", "_")
if not root.exists():
return
try:
shutil.rmtree(root)
except OSError as exc:
raise CacheError(f"could not clear disk cache: {root}") from exc

113
src/dpg_map/commands.py Normal file
View File

@@ -0,0 +1,113 @@
"""Command models for GUI-thread rendering work."""
from __future__ import annotations
from collections import OrderedDict, deque
from dataclasses import dataclass, field
from enum import Enum
from threading import RLock
from time import monotonic
from typing import Any
from .types import Tag
class CommandKind(Enum):
"""Commands that the GUI thread can apply in order."""
SET_VIEW = "set_view"
SET_PROVIDER = "set_provider"
ADD_OVERLAY = "add_overlay"
UPDATE_OVERLAY = "update_overlay"
DELETE_OVERLAY = "delete_overlay"
SET_LAYER_VISIBILITY = "set_layer_visibility"
ADD_LAYER = "add_layer"
CLEAR_LAYER = "clear_layer"
CLEAR_MAP = "clear_map"
CLEAR_MEMORY_CACHE = "clear_memory_cache"
CLEAR_DISK_CACHE = "clear_disk_cache"
@dataclass(frozen=True, slots=True)
class MapCommand:
"""A command submitted from public API calls to the GUI-thread renderer."""
kind: CommandKind
map_tag: Tag
payload: dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=monotonic)
class MapCommandQueue:
"""Thread-safe command queue with bounded coalescing for high-rate updates."""
_VIEW_KEY = "__view__"
def __init__(self) -> None:
self._lock = RLock()
self._ordered: deque[MapCommand] = deque()
self._overlay_updates: OrderedDict[tuple[Tag, Tag], MapCommand] = OrderedDict()
self._view_updates: OrderedDict[Tag, MapCommand] = OrderedDict()
def put(self, command: MapCommand) -> None:
"""Queue a command, coalescing update commands where ordering permits."""
with self._lock:
if command.kind is CommandKind.UPDATE_OVERLAY:
overlay_tag = command.payload.get("tag")
if overlay_tag is None:
self._ordered.append(command)
return
key = (command.map_tag, overlay_tag)
self._ordered.append(command)
self._overlay_updates[key] = command
return
if command.kind is CommandKind.SET_VIEW:
self._ordered.append(command)
self._view_updates[command.map_tag] = command
return
self._ordered.append(command)
def drain(self) -> list[MapCommand]:
"""Return pending commands in render order and clear the queue."""
with self._lock:
drained: list[MapCommand] = []
while self._ordered:
command = self._ordered.popleft()
if command.kind is CommandKind.UPDATE_OVERLAY:
overlay_tag = command.payload.get("tag")
if not isinstance(overlay_tag, str | int):
drained.append(command)
continue
key = (command.map_tag, overlay_tag)
latest = self._overlay_updates.get(key)
if latest is command:
drained.append(command)
del self._overlay_updates[key]
continue
if command.kind is CommandKind.SET_VIEW:
latest = self._view_updates.get(command.map_tag)
if latest is command:
drained.append(command)
del self._view_updates[command.map_tag]
continue
drained.append(command)
return drained
def __len__(self) -> int:
with self._lock:
return len(self._ordered)
def clear(self) -> None:
"""Drop all pending commands."""
with self._lock:
self._ordered.clear()
self._overlay_updates.clear()
self._view_updates.clear()

View File

@@ -0,0 +1 @@
"""Diagnostics and debug state helpers."""

View File

@@ -0,0 +1,46 @@
"""Draw layer bookkeeping helpers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from .types import Tag
@dataclass(frozen=True, slots=True)
class DrawLayerTags:
"""Internal Dear PyGui draw layer tags for one map."""
background: str
tiles: str
overlays: str
attribution: str
def draw_layer_tags(map_tag: Tag) -> DrawLayerTags:
"""Return stable internal draw layer tags for a map."""
return DrawLayerTags(
background=f"{map_tag}##layer-background",
tiles=f"{map_tag}##layer-tiles",
overlays=f"{map_tag}##layer-overlays",
attribution=f"{map_tag}##layer-attribution",
)
def ensure_draw_layers(dpg: Any, *, drawlist_tag: Tag, map_tag: Tag) -> DrawLayerTags:
"""Create draw layers if needed and return their tags."""
tags = draw_layer_tags(map_tag)
for layer_tag in (tags.background, tags.tiles, tags.overlays, tags.attribution):
if not dpg.does_item_exist(layer_tag):
dpg.add_draw_layer(parent=drawlist_tag, tag=layer_tag)
return tags
def clear_draw_layer(dpg: Any, layer_tag: Tag) -> None:
"""Clear one draw layer without touching sibling layers."""
if dpg.does_item_exist(layer_tag):
dpg.delete_item(layer_tag, children_only=True)

49
src/dpg_map/exceptions.py Normal file
View File

@@ -0,0 +1,49 @@
"""Public exception types for dpg-map."""
class DpgMapError(Exception):
"""Base exception for all public dpg-map errors."""
class DpgMapNotImplementedError(DpgMapError, NotImplementedError):
"""Raised by public APIs that are intentionally stubbed during the rebuild."""
class ProviderError(DpgMapError):
"""Base exception for tile provider errors."""
class ProviderExistsError(ProviderError):
"""Raised when registering a provider name that already exists."""
class ProviderNotFoundError(ProviderError):
"""Raised when a requested tile provider is not registered."""
class InvalidProviderError(ProviderError, ValueError):
"""Raised when a tile provider definition is invalid."""
class ProjectionError(DpgMapError, ValueError):
"""Raised when geographic projection input is invalid."""
class MapNotFoundError(DpgMapError, KeyError):
"""Raised when a requested map tag is not registered."""
class OverlayNotFoundError(DpgMapError, KeyError):
"""Raised when a requested overlay tag is not registered."""
class CoordinateError(DpgMapError, ValueError):
"""Raised when geographic coordinate input is invalid."""
class ThreadingError(DpgMapError):
"""Raised when an operation violates dpg-map threading rules."""
class CacheError(DpgMapError):
"""Raised when cache metadata or paths cannot be handled."""

233
src/dpg_map/interaction.py Normal file
View File

@@ -0,0 +1,233 @@
"""Map interaction state and handlers."""
from __future__ import annotations
from dataclasses import dataclass
from math import isfinite
from typing import Any
from .commands import CommandKind, MapCommand
from .projection import latlon_to_world, screen_to_world, world_to_latlon
from .sizing import effective_draw_size
from .state import DirtyFlags, MapState, mark_dirty
from .types import LatLon, Point
@dataclass(frozen=True, slots=True)
class HitRect:
"""Screen-space rectangle used for map interaction tests."""
x: float
y: float
width: float
height: float
@property
def right(self) -> float:
return self.x + self.width
@property
def bottom(self) -> float:
return self.y + self.height
def contains(self, x: float, y: float) -> bool:
return self.x <= x <= self.right and self.y <= y <= self.bottom
def calculate_hit_rect(state: MapState, drawlist_pos: tuple[float, float]) -> HitRect:
"""Return the map interaction rectangle for a drawlist position."""
width, height = effective_draw_size(state)
return HitRect(float(drawlist_pos[0]), float(drawlist_pos[1]), float(width), float(height))
def screen_to_latlon_in_state(state: MapState, x: float, y: float) -> LatLon:
"""Convert map-local screen coordinates to latitude/longitude."""
with state.lock:
width, height = effective_draw_size(state)
center = state.center
zoom = state.zoom
tile_size = state.provider.tile_size
world_x, world_y = screen_to_world(
float(x),
float(y),
center=center,
zoom=zoom,
width=width,
height=height,
tile_size=tile_size,
)
return world_to_latlon(world_x, world_y, zoom, tile_size)
def latlon_to_screen_in_state(state: MapState, lat: float, lon: float) -> Point:
"""Convert latitude/longitude to map-local screen coordinates."""
with state.lock:
width, height = effective_draw_size(state)
center = state.center
zoom = state.zoom
tile_size = state.provider.tile_size
world_x, world_y = latlon_to_world(lat, lon, zoom, tile_size)
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
def pan_state_by_pixels(state: MapState, dx: float, dy: float) -> LatLon:
"""Pan the map by a mouse drag delta in screen pixels."""
with state.lock:
if dx == 0 and dy == 0:
return state.center
center = state.center
zoom = state.zoom
tile_size = state.provider.tile_size
world_size = tile_size * (2**zoom)
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
new_x = (center_x - dx) % world_size
new_y = min(max(center_y - dy, 0.0), float(world_size))
state.center = world_to_latlon(new_x, new_y, zoom, tile_size)
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
state.command_queue.put(
MapCommand(
kind=CommandKind.SET_VIEW,
map_tag=state.tag,
payload={"center": state.center},
)
)
return state.center
def zoom_state_at_screen_point(
state: MapState,
*,
screen_x: float,
screen_y: float,
delta: float,
) -> int:
"""Zoom the map around a map-local screen point where possible."""
if delta == 0 or not isfinite(delta):
with state.lock:
return state.zoom
with state.lock:
old_zoom = state.zoom
new_zoom = max(state.min_zoom, min(state.max_zoom, old_zoom + (1 if delta > 0 else -1)))
if new_zoom == old_zoom:
return old_zoom
width, height = effective_draw_size(state)
tile_size = state.provider.tile_size
anchor_latlon = screen_to_latlon_in_state(state, screen_x, screen_y)
anchor_x, anchor_y = latlon_to_world(
anchor_latlon[0],
anchor_latlon[1],
new_zoom,
tile_size,
)
world_size = tile_size * (2**new_zoom)
center_x = (anchor_x - (screen_x - width / 2.0)) % world_size
center_y = min(max(anchor_y - (screen_y - height / 2.0), 0.0), float(world_size))
state.zoom = new_zoom
state.center = world_to_latlon(center_x, center_y, new_zoom, tile_size)
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
state.command_queue.put(
MapCommand(
kind=CommandKind.SET_VIEW,
map_tag=state.tag,
payload={"center": state.center, "zoom": state.zoom},
)
)
return state.zoom
def handle_mouse_down(state: MapState, mouse_pos: tuple[float, float], hit_rect: HitRect) -> None:
"""Begin a drag if the left mouse button starts inside the map rectangle."""
with state.lock:
if not state.is_visible or not hit_rect.contains(mouse_pos[0], mouse_pos[1]):
state.interaction.active_drag = False
state.interaction.last_mouse_position = None
return
state.interaction.active_drag = True
state.interaction.last_mouse_position = mouse_pos
def handle_mouse_drag(state: MapState, mouse_pos: tuple[float, float]) -> None:
"""Update center from a mouse drag event."""
with state.lock:
if not state.interaction.active_drag:
return
last_pos = state.interaction.last_mouse_position
state.interaction.last_mouse_position = mouse_pos
if last_pos is None:
return
pan_state_by_pixels(state, mouse_pos[0] - last_pos[0], mouse_pos[1] - last_pos[1])
def handle_mouse_release(state: MapState) -> None:
"""End any active drag."""
with state.lock:
state.interaction.active_drag = False
state.interaction.last_mouse_position = None
def handle_mouse_wheel(
state: MapState,
*,
mouse_pos: tuple[float, float],
wheel_delta: float,
hit_rect: HitRect,
) -> None:
"""Apply wheel zoom when the cursor is over the concrete map rectangle."""
if not hit_rect.contains(mouse_pos[0], mouse_pos[1]):
return
zoom_state_at_screen_point(
state,
screen_x=mouse_pos[0] - hit_rect.x,
screen_y=mouse_pos[1] - hit_rect.y,
delta=wheel_delta,
)
def update_drag_from_button_state(
state: MapState,
*,
mouse_pos: tuple[float, float],
hit_rect: HitRect,
is_down: bool,
) -> None:
"""Poll left-button state and keep drag interaction moving."""
with state.lock:
active_drag = state.interaction.active_drag
if not is_down:
if active_drag:
handle_mouse_release(state)
return
if active_drag:
handle_mouse_drag(state, mouse_pos)
return
if hit_rect.contains(mouse_pos[0], mouse_pos[1]):
handle_mouse_down(state, mouse_pos, hit_rect)
def wheel_delta_from_app_data(app_data: Any) -> float:
"""Normalize Dear PyGui mouse wheel callback data."""
if isinstance(app_data, int | float):
return float(app_data)
if isinstance(app_data, (list, tuple)) and app_data:
value = app_data[-1]
if isinstance(value, int | float):
return float(value)
return 0.0

72
src/dpg_map/overlays.py Normal file
View File

@@ -0,0 +1,72 @@
"""Logical overlay models."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from .types import Color, LatLon, Tag
@dataclass(slots=True)
class Overlay:
"""Base logical overlay state."""
tag: Tag
map_tag: Tag
layer: str
show: bool = True
user_data: Any = None
revision: int = 0
def touch(self) -> None:
"""Mark this overlay as changed."""
self.revision += 1
@dataclass(slots=True)
class MarkerOverlay(Overlay):
"""Logical marker overlay."""
lat: float = 0.0
lon: float = 0.0
label: str | None = None
color: Color = (255, 80, 80, 255)
radius: float = 5.0
show_label: bool = False
callback: Callable[..., Any] | None = None
@dataclass(slots=True)
class PolylineOverlay(Overlay):
"""Logical polyline overlay."""
points: tuple[LatLon, ...] = ()
color: Color = (80, 180, 255, 255)
thickness: float = 2.0
closed: bool = False
simplify: bool = True
@dataclass(slots=True)
class TrajectoryOverlay(Overlay):
"""Logical trajectory overlay."""
points: tuple[LatLon, ...] = ()
timestamps: tuple[float, ...] | None = None
color: Color = (255, 180, 60, 255)
thickness: float = 2.0
show_points: bool = False
point_stride: int = 1
@dataclass(slots=True)
class LayerState:
"""Logical layer visibility and ordering state."""
name: str
z_index: int = 0
show: bool = True
overlay_tags: set[Tag] = field(default_factory=set)

92
src/dpg_map/projection.py Normal file
View File

@@ -0,0 +1,92 @@
"""Web Mercator projection helpers."""
from __future__ import annotations
import math
from .exceptions import ProjectionError
from .types import LatLon, Point
WEB_MERCATOR_MAX_LAT = 85.05112878
DEFAULT_TILE_SIZE = 256
def clamp_latitude(lat: float) -> float:
"""Clamp latitude to the Web Mercator representable range."""
return min(max(lat, -WEB_MERCATOR_MAX_LAT), WEB_MERCATOR_MAX_LAT)
def map_size(zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> int:
"""Return square pixel size of the full world at a zoom level."""
if zoom < 0:
raise ProjectionError("zoom must be >= 0")
if tile_size <= 0:
raise ProjectionError("tile_size must be > 0")
return tile_size * (2**zoom)
def latlon_to_world(lat: float, lon: float, zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> Point:
"""Project latitude/longitude to world pixel coordinates."""
size = map_size(zoom, tile_size)
lat = clamp_latitude(lat)
lon = ((lon + 180.0) % 360.0) - 180.0
sin_lat = math.sin(math.radians(lat))
x = (lon + 180.0) / 360.0 * size
y = (0.5 - math.log((1.0 + sin_lat) / (1.0 - sin_lat)) / (4.0 * math.pi)) * size
return (x, y)
def world_to_latlon(x: float, y: float, zoom: int, tile_size: int = DEFAULT_TILE_SIZE) -> LatLon:
"""Unproject world pixel coordinates to latitude/longitude."""
size = map_size(zoom, tile_size)
lon = x / size * 360.0 - 180.0
n = math.pi - 2.0 * math.pi * y / size
lat = math.degrees(math.atan(math.sinh(n)))
return (lat, lon)
def latlon_to_tile(lat: float, lon: float, zoom: int) -> tuple[int, int, int]:
"""Return the XYZ tile coordinate containing a latitude/longitude point."""
x, y = latlon_to_world(lat, lon, zoom)
scale = 2**zoom
tile_x = min(max(int(x // DEFAULT_TILE_SIZE), 0), scale - 1)
tile_y = min(max(int(y // DEFAULT_TILE_SIZE), 0), scale - 1)
return (tile_x, tile_y, zoom)
def world_to_screen(
world_x: float,
world_y: float,
*,
center: LatLon,
zoom: int,
width: int,
height: int,
tile_size: int = DEFAULT_TILE_SIZE,
) -> Point:
"""Convert world pixels to screen pixels for a centered viewport."""
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
def screen_to_world(
screen_x: float,
screen_y: float,
*,
center: LatLon,
zoom: int,
width: int,
height: int,
tile_size: int = DEFAULT_TILE_SIZE,
) -> Point:
"""Convert screen pixels to world pixels for a centered viewport."""
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
return (screen_x + center_x - width / 2.0, screen_y + center_y - height / 2.0)

145
src/dpg_map/providers.py Normal file
View File

@@ -0,0 +1,145 @@
"""Tile provider definitions and registry."""
from __future__ import annotations
from dataclasses import dataclass, field
from string import Formatter
from threading import RLock
from .exceptions import InvalidProviderError, ProviderExistsError, ProviderNotFoundError
_REQUIRED_TEMPLATE_FIELDS = frozenset({"x", "y", "z"})
_OPTIONAL_TEMPLATE_FIELDS = frozenset({"s", "r", "ext"})
@dataclass(frozen=True, slots=True)
class TileProvider:
"""XYZ raster tile provider definition."""
name: str
url_template: str
min_zoom: int = 0
max_zoom: int = 19
tile_size: int = 256
attribution: str = ""
headers: dict[str, str] = field(default_factory=dict)
subdomains: tuple[str, ...] = ()
retina: bool = False
file_extension: str | None = None
def __post_init__(self) -> None:
name = self.name.strip()
if not name:
raise InvalidProviderError("provider name must not be empty")
if name != self.name:
object.__setattr__(self, "name", name)
if not self.url_template.strip():
raise InvalidProviderError("provider url_template must not be empty")
fields = {
field_name
for _, field_name, _, _ in Formatter().parse(self.url_template)
if field_name is not None and field_name != ""
}
missing = _REQUIRED_TEMPLATE_FIELDS - fields
if missing:
missing_text = ", ".join(sorted(missing))
raise InvalidProviderError(f"provider url_template missing field(s): {missing_text}")
unknown = fields - _REQUIRED_TEMPLATE_FIELDS - _OPTIONAL_TEMPLATE_FIELDS
if unknown:
unknown_text = ", ".join(sorted(unknown))
raise InvalidProviderError(
f"provider url_template contains unknown field(s): {unknown_text}"
)
if self.min_zoom < 0:
raise InvalidProviderError("provider min_zoom must be >= 0")
if self.max_zoom < self.min_zoom:
raise InvalidProviderError("provider max_zoom must be >= min_zoom")
if self.tile_size <= 0:
raise InvalidProviderError("provider tile_size must be > 0")
object.__setattr__(self, "headers", dict(self.headers))
object.__setattr__(self, "subdomains", tuple(self.subdomains))
def build_url(self, *, x: int, y: int, z: int) -> str:
"""Build a concrete tile URL for an XYZ tile coordinate."""
if z < self.min_zoom or z > self.max_zoom:
raise InvalidProviderError(
f"zoom {z} is outside provider range {self.min_zoom}-{self.max_zoom}"
)
subdomain = ""
if self.subdomains:
subdomain = self.subdomains[(x + y + z) % len(self.subdomains)]
retina_suffix = "@2x" if self.retina else ""
extension = self.file_extension or ""
return self.url_template.format(
x=x,
y=y,
z=z,
s=subdomain,
r=retina_suffix,
ext=extension,
)
OSM = TileProvider(
name="osm",
url_template="https://tile.openstreetmap.org/{z}/{x}/{y}.png",
min_zoom=0,
max_zoom=19,
tile_size=256,
attribution="\u00a9 OpenStreetMap contributors",
)
_providers: dict[str, TileProvider] = {OSM.name: OSM}
_providers_lock = RLock()
def register_provider(provider: TileProvider, *, replace: bool = False) -> None:
"""Register a tile provider by name."""
if not isinstance(provider, TileProvider):
raise InvalidProviderError("provider must be a TileProvider")
with _providers_lock:
if provider.name in _providers and not replace:
raise ProviderExistsError(f"provider already registered: {provider.name}")
_providers[provider.name] = provider
def unregister_provider(name: str) -> None:
"""Remove a registered tile provider."""
with _providers_lock:
if name not in _providers:
raise ProviderNotFoundError(f"provider not registered: {name}")
del _providers[name]
def get_provider(name: str) -> TileProvider:
"""Return a registered tile provider."""
with _providers_lock:
try:
return _providers[name]
except KeyError as exc:
raise ProviderNotFoundError(f"provider not registered: {name}") from exc
def get_default_provider() -> TileProvider:
"""Return the default OpenStreetMap provider."""
return get_provider(OSM.name)
def list_providers() -> list[str]:
"""List registered provider names in stable sorted order."""
with _providers_lock:
return sorted(_providers)

490
src/dpg_map/renderer.py Normal file
View File

@@ -0,0 +1,490 @@
"""GUI-thread renderer implementation."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import replace
from typing import Any
from .commands import CommandKind, MapCommand
from .draw_layers import DrawLayerTags, clear_draw_layer, ensure_draw_layers
from .interaction import HitRect, calculate_hit_rect, update_drag_from_button_state
from .overlays import MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
from .projection import latlon_to_world
from .sizing import SizeMeasurement, apply_size_measurement
from .state import DirtyFlags, MapState
from .tiles import Tile, VisibleTile
from .types import Color, LatLon
class MapRenderer:
"""GUI-thread renderer for the map widget shell and tile layer."""
def __init__(self, state: MapState, dpg: Any) -> None:
self.state = state
self._dpg = dpg
self._layers: DrawLayerTags | None = None
self.last_drained_commands: tuple[MapCommand, ...] = ()
self.last_hit_rect: HitRect | None = None
self.last_overlay_count: int = 0
def schedule_next_frame(self) -> None:
"""Schedule this renderer to run on the next Dear PyGui frame."""
with self.state.lock:
if self.state.frame_scheduled:
return
self.state.frame_scheduled = True
frame = self._dpg.get_frame_count() + 1
self._dpg.set_frame_callback(frame, self._frame_callback)
def _frame_callback(self, sender: Any | None = None, app_data: Any | None = None) -> None:
_ = (sender, app_data)
with self.state.lock:
self.state.frame_scheduled = False
if not self._dpg.does_item_exist(self.state.drawlist_tag):
return
self.render_frame()
self.schedule_next_frame()
def render_frame(self) -> None:
"""Drain pending commands, refresh size, process tiles, and redraw."""
commands = drain_renderer_commands(self.state)
self.last_drained_commands = tuple(commands)
self._update_size_from_dpg()
self._poll_mouse_drag()
with self.state.lock:
dirty = self.state.dirty
draw_tiles = bool(dirty & (DirtyFlags.SIZE | DirtyFlags.TILES | DirtyFlags.PROVIDER))
draw_overlays = bool(dirty & (DirtyFlags.SIZE | DirtyFlags.OVERLAYS))
visible = self.state.is_visible
width = self.state.measured_width or self.state.last_nonzero_width
height = self.state.measured_height or self.state.last_nonzero_height
provider_attribution = self.state.provider.attribution
provider = self.state.provider
center = self.state.center
zoom = self.state.zoom
generation = self.state.generation
cache_dir = self.state.cache_dir
overlays = tuple(
_copy_overlay_for_render(overlay) for overlay in self.state.overlays.values()
)
layers = {
name: (layer.show, layer.z_index) for name, layer in self.state.layers.items()
}
self.state.dirty = DirtyFlags.NONE
accepted_tiles = self.state.tile_manager.drain_results(
generation=generation,
provider_name=provider.name,
)
self._delete_evicted_textures()
for tile in accepted_tiles:
self._ensure_texture(tile)
visible_tiles: list[VisibleTile] = []
if visible and width > 0 and height > 0:
visible_tiles = self.state.tile_manager.request_visible_tiles(
center=center,
zoom=zoom,
width=width,
height=height,
provider=provider,
generation=generation,
cache_dir=cache_dir,
margin=self._prefetch_margin(),
)
if visible and (draw_tiles or accepted_tiles):
self._draw_tile_layer(
visible_tiles=visible_tiles,
width=width,
height=height,
attribution=provider_attribution,
tile_size=provider.tile_size,
)
if visible and draw_overlays:
self._draw_overlay_layer(
overlays=overlays,
layers=layers,
center=center,
zoom=zoom,
width=width,
height=height,
tile_size=provider.tile_size,
)
def _update_size_from_dpg(self) -> None:
width, height = self._measure_child_content()
visible = bool(self._dpg.is_item_shown(self.state.child_window_tag))
with self.state.lock:
update = apply_size_measurement(
self.state,
SizeMeasurement(width=width, height=height, visible=visible),
)
draw_width = update.effective_width
draw_height = update.effective_height
self._dpg.configure_item(self.state.drawlist_tag, width=draw_width, height=draw_height)
draw_pos = tuple(
float(value) for value in self._dpg.get_item_rect_min(self.state.drawlist_tag)
)
with self.state.lock:
self.last_hit_rect = calculate_hit_rect(self.state, (draw_pos[0], draw_pos[1]))
def _poll_mouse_drag(self) -> None:
if self.last_hit_rect is None:
return
try:
is_down = bool(self._dpg.is_mouse_button_down(self._dpg.mvMouseButton_Left))
mouse_pos = self._dpg.get_mouse_pos(local=False)
except Exception:
return
update_drag_from_button_state(
self.state,
mouse_pos=(float(mouse_pos[0]), float(mouse_pos[1])),
hit_rect=self.last_hit_rect,
is_down=is_down,
)
def _measure_child_content(self) -> tuple[int, int]:
try:
width, height = self._dpg.get_item_rect_size(self.state.child_window_tag)
except Exception:
return (0, 0)
return (max(0, int(width)), max(0, int(height)))
def _draw_tile_layer(
self,
*,
visible_tiles: list[VisibleTile],
width: int,
height: int,
attribution: str,
tile_size: int,
) -> None:
width = max(1, int(width))
height = max(1, int(height))
layers = self._ensure_draw_layers()
clear_draw_layer(self._dpg, layers.background)
clear_draw_layer(self._dpg, layers.tiles)
clear_draw_layer(self._dpg, layers.attribution)
self._dpg.draw_rectangle(
(0, 0),
(width, height),
parent=layers.background,
color=(54, 68, 78, 255),
fill=(29, 38, 45, 255),
)
for visible_tile in visible_tiles:
tile = self.state.tile_manager.get_ready_tile(visible_tile.tile_id)
if tile is None or tile.texture_tag is None:
continue
screen_x = visible_tile.screen_x
screen_y = visible_tile.screen_y
self._dpg.draw_image(
tile.texture_tag,
(screen_x, screen_y),
(screen_x + tile_size, screen_y + tile_size),
parent=layers.tiles,
)
label = attribution or "Map tiles"
text_y = max(28, height - 24)
self._dpg.draw_text(
(12, text_y),
label,
parent=layers.attribution,
color=(172, 184, 192, 255),
size=12,
)
def _draw_overlay_layer(
self,
*,
overlays: tuple[Overlay, ...],
layers: dict[str, tuple[bool, int]],
center: LatLon,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> None:
draw_layers = self._ensure_draw_layers()
clear_draw_layer(self._dpg, draw_layers.overlays)
if width <= 0 or height <= 0:
self.last_overlay_count = 0
return
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
visible_overlays = [
overlay
for overlay in overlays
if overlay.show and layers.get(overlay.layer, (True, 0))[0]
]
visible_overlays.sort(key=lambda overlay: layers.get(overlay.layer, (True, 0))[1])
drawn = 0
for overlay in visible_overlays:
if isinstance(overlay, MarkerOverlay):
self._draw_marker_overlay(
overlay, center_x, center_y, zoom, width, height, tile_size
)
drawn += 1
elif isinstance(overlay, PolylineOverlay):
self._draw_polyline_overlay(
overlay,
center_x,
center_y,
zoom,
width,
height,
tile_size,
draw_layers.overlays,
)
drawn += 1
elif isinstance(overlay, TrajectoryOverlay):
self._draw_trajectory_overlay(
overlay,
center_x,
center_y,
zoom,
width,
height,
tile_size,
draw_layers.overlays,
)
drawn += 1
self.last_overlay_count = drawn
def _draw_marker_overlay(
self,
overlay: MarkerOverlay,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> None:
layers = self._ensure_draw_layers()
x, y = _latlon_to_screen(
overlay.lat,
overlay.lon,
center_x,
center_y,
zoom,
width,
height,
tile_size,
)
radius = max(1.0, float(overlay.radius))
self._dpg.draw_circle(
(x, y),
radius,
parent=layers.overlays,
color=(255, 255, 255, 230),
fill=_rgba(overlay.color),
thickness=1.5,
segments=20,
)
if overlay.show_label and overlay.label:
self._dpg.draw_text(
(x + radius + 4.0, y - 7.0),
overlay.label,
parent=layers.overlays,
color=(245, 248, 250, 255),
size=12,
)
def _draw_polyline_overlay(
self,
overlay: PolylineOverlay,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
parent: str,
) -> None:
points = _screen_points(
overlay.points,
center_x=center_x,
center_y=center_y,
zoom=zoom,
width=width,
height=height,
tile_size=tile_size,
)
if len(points) < 2:
return
self._dpg.draw_polyline(
points,
parent=parent,
closed=overlay.closed,
color=_rgba(overlay.color),
thickness=max(1.0, float(overlay.thickness)),
)
def _draw_trajectory_overlay(
self,
overlay: TrajectoryOverlay,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
parent: str,
) -> None:
points = _screen_points(
overlay.points,
center_x=center_x,
center_y=center_y,
zoom=zoom,
width=width,
height=height,
tile_size=tile_size,
)
if len(points) >= 2:
self._dpg.draw_polyline(
points,
parent=parent,
color=_rgba(overlay.color),
thickness=max(1.0, float(overlay.thickness)),
)
if overlay.show_points and points:
stride = max(1, int(overlay.point_stride))
for point in points[::stride]:
self._dpg.draw_circle(
point,
2.5,
parent=parent,
color=_rgba(overlay.color),
fill=_rgba(overlay.color),
segments=8,
)
def _ensure_draw_layers(self) -> DrawLayerTags:
if self._layers is None or not self._dpg.does_item_exist(self._layers.overlays):
self._layers = ensure_draw_layers(
self._dpg,
drawlist_tag=self.state.drawlist_tag,
map_tag=self.state.tag,
)
return self._layers
def _ensure_texture(self, tile: Tile) -> None:
if tile.texture_tag is not None:
return
texture_tag = (
f"{self.state.tag}##tile-{tile.tile_id.provider_name}-"
f"{tile.tile_id.z}-{tile.tile_id.x}-{tile.tile_id.y}"
)
if not self._dpg.does_item_exist(texture_tag):
self._dpg.add_static_texture(
tile.width,
tile.height,
tile.pixels,
tag=texture_tag,
parent=self.state.texture_registry_tag,
)
self.state.tile_manager.set_texture_tag(tile.tile_id, texture_tag)
def _delete_evicted_textures(self) -> None:
for texture_tag in self.state.tile_manager.take_texture_deletions():
if self._dpg.does_item_exist(texture_tag):
self._dpg.delete_item(texture_tag)
def _prefetch_margin(self) -> int:
from .state import get_config
return get_config().prefetch_margin_tiles
def drain_renderer_commands(state: MapState) -> list[MapCommand]:
"""Drain and apply GUI-thread command side effects."""
commands = state.command_queue.drain()
if not commands:
return []
with state.lock:
for command in commands:
if command.kind is CommandKind.SET_VIEW:
state.dirty |= DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS
elif command.kind is CommandKind.SET_PROVIDER:
state.tile_manager.clear_memory_cache()
state.dirty |= DirtyFlags.PROVIDER | DirtyFlags.TILES | DirtyFlags.OVERLAYS
elif command.kind in {
CommandKind.ADD_OVERLAY,
CommandKind.UPDATE_OVERLAY,
CommandKind.DELETE_OVERLAY,
CommandKind.SET_LAYER_VISIBILITY,
CommandKind.ADD_LAYER,
CommandKind.CLEAR_LAYER,
}:
state.dirty |= DirtyFlags.OVERLAYS
elif command.kind is CommandKind.CLEAR_MAP:
state.tile_manager.clear_memory_cache()
state.dirty |= DirtyFlags.TILES | DirtyFlags.OVERLAYS
elif command.kind is CommandKind.CLEAR_MEMORY_CACHE:
state.tile_manager.clear_memory_cache()
state.dirty |= DirtyFlags.TILES
elif command.kind is CommandKind.CLEAR_DISK_CACHE:
provider = command.payload.get("provider")
if not isinstance(provider, str):
provider = None
state.tile_manager.clear_disk_cache(state.cache_dir, provider=provider)
state.dirty |= DirtyFlags.TILES
return commands
def make_frame_pump(state: MapState, dpg: Any) -> Callable[[], None]:
"""Create and attach a renderer frame pump for a map state."""
renderer = MapRenderer(state, dpg)
with state.lock:
state.renderer = renderer
renderer.schedule_next_frame()
return renderer.render_frame
def _rgba(color: Color) -> tuple[int, int, int, int]:
if len(color) == 3:
return (int(color[0]), int(color[1]), int(color[2]), 255)
return (int(color[0]), int(color[1]), int(color[2]), int(color[3]))
def _latlon_to_screen(
lat: float,
lon: float,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> tuple[float, float]:
world_x, world_y = latlon_to_world(lat, lon, zoom, tile_size)
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
def _screen_points(
points: tuple[LatLon, ...],
*,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> list[tuple[float, float]]:
return [
_latlon_to_screen(lat, lon, center_x, center_y, zoom, width, height, tile_size)
for lat, lon in points
]
def _copy_overlay_for_render(overlay: Overlay) -> Overlay:
return replace(overlay)

87
src/dpg_map/sizing.py Normal file
View File

@@ -0,0 +1,87 @@
"""Map sizing measurement helpers."""
from __future__ import annotations
from dataclasses import dataclass
from .state import DirtyFlags, MapState, mark_dirty
@dataclass(frozen=True, slots=True)
class SizeMeasurement:
"""Concrete size and visibility reported by the GUI thread."""
width: int
height: int
visible: bool
@dataclass(frozen=True, slots=True)
class SizeUpdate:
"""Result of applying one GUI size measurement."""
changed: bool
became_visible: bool
became_hidden: bool
effective_width: int
effective_height: int
def normalize_dimension(value: int | float | None) -> int:
"""Convert Dear PyGui size values to stable integer pixels."""
if value is None:
return 0
return max(0, int(value))
def effective_draw_size(state: MapState) -> tuple[int, int]:
"""Return the size the drawlist should use for the current frame."""
width = state.measured_width or state.last_nonzero_width or 1
height = state.measured_height or state.last_nonzero_height or 1
return (max(1, width), max(1, height))
def apply_size_measurement(
state: MapState,
measurement: SizeMeasurement,
*,
mark: bool = True,
) -> SizeUpdate:
"""Apply a GUI-thread size measurement to logical state.
A zero measurement is preserved as the current measured size, but it does not
erase the last non-zero size. This keeps hidden maps from permanently
collapsing when Dear PyGui reports a temporary zero content region.
"""
width = normalize_dimension(measurement.width)
height = normalize_dimension(measurement.height)
visible = bool(measurement.visible and width > 0 and height > 0)
previous_width = state.measured_width
previous_height = state.measured_height
previous_visible = state.is_visible
changed = previous_width != width or previous_height != height or previous_visible != visible
state.measured_width = width
state.measured_height = height
state.is_visible = visible
if width > 0:
state.last_nonzero_width = width
if height > 0:
state.last_nonzero_height = height
if changed and mark:
mark_dirty(state, DirtyFlags.SIZE | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
effective_width, effective_height = effective_draw_size(state)
return SizeUpdate(
changed=changed,
became_visible=visible and not previous_visible,
became_hidden=previous_visible and not visible,
effective_width=effective_width,
effective_height=effective_height,
)

332
src/dpg_map/state.py Normal file
View File

@@ -0,0 +1,332 @@
"""Thread-safe state models and registries."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import IntFlag, auto
from pathlib import Path
from threading import RLock
from uuid import uuid4
from .commands import MapCommandQueue
from .exceptions import InvalidProviderError, MapNotFoundError
from .overlays import LayerState, Overlay
from .providers import TileProvider, get_default_provider, get_provider
from .tiles import TileManager
from .types import LatLon, Tag
class DirtyFlags(IntFlag):
"""Reasons the GUI renderer needs to refresh part of a map."""
NONE = 0
VIEW = auto()
TILES = auto()
OVERLAYS = auto()
SIZE = auto()
PROVIDER = auto()
FULL = VIEW | TILES | OVERLAYS | SIZE | PROVIDER
@dataclass(slots=True)
class DpgMapConfig:
"""Global package configuration."""
user_agent: str | None = None
cache_dir: Path | None = None
default_provider: str | TileProvider = "osm"
memory_cache_max_tiles: int = 512
disk_cache_max_bytes: int | None = 2_000_000_000
prefetch_margin_tiles: int = 1
tile_worker_count: int = 4
overlay_update_policy: str = "coalesce"
debug: bool = False
@dataclass(slots=True)
class InteractionState:
"""Logical mouse interaction state."""
active_drag: bool = False
last_mouse_position: tuple[float, float] | None = None
def default_layers() -> dict[str, LayerState]:
"""Return the default logical map layers."""
layers = [
LayerState("background", z_index=0),
LayerState("tiles", z_index=10),
LayerState("default", z_index=50),
LayerState("markers", z_index=60),
LayerState("lines", z_index=70),
LayerState("trajectories", z_index=80),
LayerState("attribution", z_index=100),
]
return {layer.name: layer for layer in layers}
@dataclass(slots=True)
class MapState:
"""Thread-safe logical state for one map widget."""
tag: Tag
child_window_tag: Tag
drawlist_tag: Tag
texture_registry_tag: Tag
handler_registry_tag: Tag
requested_width: int = 0
requested_height: int = 0
requested_autosize_x: bool = False
requested_autosize_y: bool = False
measured_width: int = 0
measured_height: int = 0
last_nonzero_width: int = 0
last_nonzero_height: int = 0
is_visible: bool = False
center: LatLon = (0.0, 0.0)
zoom: int = 2
min_zoom: int = 0
max_zoom: int = 19
provider: TileProvider = field(default_factory=get_default_provider)
overlays: dict[Tag, Overlay] = field(default_factory=dict)
layers: dict[str, LayerState] = field(default_factory=default_layers)
command_queue: MapCommandQueue = field(default_factory=MapCommandQueue)
tile_manager: TileManager = field(default_factory=TileManager)
renderer: object | None = None
interaction: InteractionState = field(default_factory=InteractionState)
lock: RLock = field(default_factory=RLock)
dirty: DirtyFlags = DirtyFlags.FULL
frame_scheduled: bool = False
generation: int = 0
cache_dir: Path | None = None
user_agent: str | None = None
_config = DpgMapConfig()
_config_lock = RLock()
_maps: dict[Tag, MapState] = {}
_maps_lock = RLock()
_current_map_stack: list[Tag] = []
_current_map_lock = RLock()
def _resolve_provider(provider: str | TileProvider | None) -> TileProvider:
if provider is None:
with _config_lock:
provider = _config.default_provider
if isinstance(provider, TileProvider):
return provider
if isinstance(provider, str):
return get_provider(provider)
raise InvalidProviderError("provider must be a provider name or TileProvider")
def configure_state(
*,
user_agent: str | None = None,
cache_dir: str | Path | None = None,
default_provider: str | TileProvider = "osm",
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
prefetch_margin_tiles: int = 1,
tile_worker_count: int = 4,
overlay_update_policy: str = "coalesce",
debug: bool = False,
) -> None:
"""Replace global dpg-map configuration."""
if memory_cache_max_tiles < 0:
raise ValueError("memory_cache_max_tiles must be >= 0")
if disk_cache_max_bytes is not None and disk_cache_max_bytes < 0:
raise ValueError("disk_cache_max_bytes must be >= 0 or None")
if prefetch_margin_tiles < 0:
raise ValueError("prefetch_margin_tiles must be >= 0")
if tile_worker_count < 1:
raise ValueError("tile_worker_count must be >= 1")
if overlay_update_policy != "coalesce":
raise ValueError('overlay_update_policy must be "coalesce"')
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else None
_resolve_provider(default_provider)
with _config_lock:
_config.user_agent = user_agent
_config.cache_dir = resolved_cache_dir
_config.default_provider = default_provider
_config.memory_cache_max_tiles = memory_cache_max_tiles
_config.disk_cache_max_bytes = disk_cache_max_bytes
_config.prefetch_margin_tiles = prefetch_margin_tiles
_config.tile_worker_count = tile_worker_count
_config.overlay_update_policy = overlay_update_policy
_config.debug = debug
def get_config() -> DpgMapConfig:
"""Return a copy of current global configuration."""
with _config_lock:
return DpgMapConfig(
user_agent=_config.user_agent,
cache_dir=_config.cache_dir,
default_provider=_config.default_provider,
memory_cache_max_tiles=_config.memory_cache_max_tiles,
disk_cache_max_bytes=_config.disk_cache_max_bytes,
prefetch_margin_tiles=_config.prefetch_margin_tiles,
tile_worker_count=_config.tile_worker_count,
overlay_update_policy=_config.overlay_update_policy,
debug=_config.debug,
)
def create_map_state(
*,
tag: Tag | None = None,
center: LatLon = (0.0, 0.0),
zoom: int = 2,
min_zoom: int | None = None,
max_zoom: int | None = None,
width: int = 0,
height: int = 0,
autosize_x: bool = False,
autosize_y: bool = False,
provider: str | TileProvider | None = None,
cache_dir: str | Path | None = None,
user_agent: str | None = None,
) -> MapState:
"""Create and register a logical map state."""
map_tag = tag if tag is not None else f"dpg_map_{uuid4().hex}"
provider_obj = _resolve_provider(provider)
min_zoom_value = provider_obj.min_zoom if min_zoom is None else min_zoom
max_zoom_value = provider_obj.max_zoom if max_zoom is None else max_zoom
if min_zoom_value < provider_obj.min_zoom:
min_zoom_value = provider_obj.min_zoom
if max_zoom_value > provider_obj.max_zoom:
max_zoom_value = provider_obj.max_zoom
if max_zoom_value < min_zoom_value:
raise ValueError("max_zoom must be >= min_zoom")
zoom_value = max(min_zoom_value, min(max_zoom_value, int(zoom)))
config = get_config()
resolved_cache_dir = Path(cache_dir).expanduser() if cache_dir is not None else config.cache_dir
state = MapState(
tag=map_tag,
child_window_tag=f"{map_tag}##child",
drawlist_tag=f"{map_tag}##drawlist",
texture_registry_tag=f"{map_tag}##textures",
handler_registry_tag=f"{map_tag}##handlers",
requested_width=width,
requested_height=height,
requested_autosize_x=autosize_x,
requested_autosize_y=autosize_y,
center=(float(center[0]), float(center[1])),
zoom=zoom_value,
min_zoom=min_zoom_value,
max_zoom=max_zoom_value,
provider=provider_obj,
cache_dir=resolved_cache_dir,
user_agent=user_agent if user_agent is not None else config.user_agent,
)
state.tile_manager = TileManager(
memory_cache_max_tiles=config.memory_cache_max_tiles,
disk_cache_max_bytes=config.disk_cache_max_bytes,
worker_count=config.tile_worker_count,
user_agent=state.user_agent,
)
with _maps_lock:
_maps[map_tag] = state
return state
def register_map_state(state: MapState) -> None:
"""Register a prebuilt map state."""
with _maps_lock:
_maps[state.tag] = state
def unregister_map_state(tag: Tag) -> None:
"""Remove a map state from the registry."""
with _maps_lock:
_maps.pop(tag, None)
def get_map_state(map_tag: Tag | None = None) -> MapState:
"""Resolve a map by explicit tag or current map context."""
resolved_tag = resolve_map_tag(map_tag)
with _maps_lock:
try:
return _maps[resolved_tag]
except KeyError as exc:
raise MapNotFoundError(f"map not registered: {resolved_tag}") from exc
def resolve_map_tag(map_tag: Tag | None = None) -> Tag:
"""Resolve an explicit tag or the current context map tag."""
if map_tag is not None:
return map_tag
with _current_map_lock:
if _current_map_stack:
return _current_map_stack[-1]
raise MapNotFoundError("map_tag is required outside a map_widget context")
def list_map_states() -> list[MapState]:
"""Return registered map states as a snapshot."""
with _maps_lock:
return list(_maps.values())
def find_map_for_overlay(tag: Tag, map_tag: Tag | None = None) -> MapState:
"""Find the map containing an overlay, optionally scoped by map tag."""
if map_tag is not None:
return get_map_state(map_tag)
with _current_map_lock:
if _current_map_stack:
current = get_map_state(_current_map_stack[-1])
with current.lock:
if tag in current.overlays:
return current
matches: list[MapState] = []
with _maps_lock:
states = list(_maps.values())
for state in states:
with state.lock:
if tag in state.overlays:
matches.append(state)
if len(matches) == 1:
return matches[0]
if not matches:
raise MapNotFoundError(f"no map contains overlay: {tag}")
raise MapNotFoundError(f"overlay tag is ambiguous across maps: {tag}")
@contextmanager
def current_map_context(map_tag: Tag) -> Iterator[None]:
"""Push a current map tag for context-style overlay creation."""
with _current_map_lock:
_current_map_stack.append(map_tag)
try:
yield
finally:
with _current_map_lock:
if _current_map_stack and _current_map_stack[-1] == map_tag:
_current_map_stack.pop()
elif map_tag in _current_map_stack:
_current_map_stack.remove(map_tag)
def mark_dirty(state: MapState, flags: DirtyFlags) -> None:
"""Mark a map dirty while holding or acquiring its state lock."""
state.dirty |= flags

557
src/dpg_map/tiles.py Normal file
View File

@@ -0,0 +1,557 @@
"""Tile identity, lifecycle, cache, and worker coordination."""
from __future__ import annotations
import warnings
from dataclasses import dataclass, field
from enum import Enum
from io import BytesIO
from pathlib import Path
from queue import Empty, Queue
from threading import Lock, Thread
from time import time
from typing import Literal, cast
import requests
from PIL import Image, UnidentifiedImageError
from .cache import (
DiskCacheMetadata,
MemoryCacheEntry,
MemoryCacheModel,
clear_disk_cache_path,
prune_disk_cache,
tile_cache_path,
tile_metadata_path,
touch_disk_metadata,
write_disk_metadata,
)
from .exceptions import CacheError
from .projection import latlon_to_world, map_size
from .providers import TileProvider
from .types import LatLon
class TileStatus(Enum):
"""Lifecycle status for a tile."""
QUEUED = "queued"
LOADING = "loading"
READY = "ready"
FAILED = "failed"
@dataclass(frozen=True, slots=True)
class TileID:
"""Provider-namespaced XYZ tile identity."""
provider_name: str
z: int
x: int
y: int
@dataclass(slots=True)
class Tile:
"""Decoded tile data plus GUI-thread texture metadata."""
tile_id: TileID
status: TileStatus
generation: int
width: int = 0
height: int = 0
pixels: tuple[float, ...] = ()
texture_tag: object | None = None
source: Literal["memory", "disk", "network"] | None = None
error: str | None = None
last_accessed_at: float = field(default_factory=time)
@dataclass(frozen=True, slots=True)
class VisibleTile:
"""A visible tile and its screen-space top-left point."""
tile_id: TileID
screen_x: float
screen_y: float
@dataclass(frozen=True, slots=True)
class TileRequest:
"""Worker-thread tile request."""
tile_id: TileID
generation: int
url: str
path: Path
headers: dict[str, str]
disk_cache_max_bytes: int | None
protected_paths: frozenset[Path] = frozenset()
@dataclass(frozen=True, slots=True)
class TileResult:
"""Worker-thread tile result consumed by the GUI thread."""
tile_id: TileID
generation: int
status: TileStatus
width: int = 0
height: int = 0
pixels: tuple[float, ...] = ()
source: Literal["disk", "network"] | None = None
error: str | None = None
@dataclass(frozen=True, slots=True)
class TileManagerSnapshot:
"""Thread-safe counters for diagnostics."""
queued_tiles: int
loading_tiles: int
failed_tiles: int
visible_tile_count: int
memory_tiles: int
memory_hits: int
memory_misses: int
disk_hits: int
disk_misses: int
stale_results: int
def calculate_visible_tiles(
*,
center: LatLon,
zoom: int,
width: int,
height: int,
provider: TileProvider,
margin: int = 0,
) -> list[VisibleTile]:
"""Return visible XYZ tiles plus a margin, with wrapped X and clamped Y."""
if width <= 0 or height <= 0:
return []
tile_size = provider.tile_size
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
left = center_x - width / 2.0
top = center_y - height / 2.0
right = center_x + width / 2.0
bottom = center_y + height / 2.0
max_tile = (2**zoom) - 1
start_x = int(left // tile_size) - margin
end_x = int(right // tile_size) + margin
start_y = max(0, int(top // tile_size) - margin)
end_y = min(max_tile, int(bottom // tile_size) + margin)
world_size = map_size(zoom, tile_size)
tiles: list[VisibleTile] = []
seen: set[TileID] = set()
for y in range(start_y, end_y + 1):
for raw_x in range(start_x, end_x + 1):
x = raw_x % (max_tile + 1)
tile_id = TileID(provider.name, zoom, x, y)
if tile_id in seen:
continue
seen.add(tile_id)
tile_left = raw_x * tile_size
if tile_left < -tile_size:
tile_left += world_size
screen_x = tile_left - left
screen_y = y * tile_size - top
tiles.append(VisibleTile(tile_id, screen_x, screen_y))
return tiles
class TileManager:
"""Asynchronous tile loader with memory and persistent disk caches."""
def __init__(
self,
*,
memory_cache_max_tiles: int = 512,
disk_cache_max_bytes: int | None = 2_000_000_000,
worker_count: int = 4,
user_agent: str | None = None,
) -> None:
self.memory = MemoryCacheModel(max_tiles=memory_cache_max_tiles)
self.disk_cache_max_bytes = disk_cache_max_bytes
self.worker_count = worker_count
self.user_agent = user_agent
self._tiles: dict[TileID, Tile] = {}
self._visible_tile_ids: set[TileID] = set()
self._queued: set[TileID] = set()
self._loading: set[TileID] = set()
self._failed: set[TileID] = set()
self._request_queue: Queue[TileRequest | None] = Queue()
self._result_queue: Queue[TileResult] = Queue()
self._threads: list[Thread] = []
self._lock = Lock()
self._delete_texture_tags: list[object] = []
self._disk_hits = 0
self._disk_misses = 0
self._stale_results = 0
self._warned_osm_user_agent = False
def start(self) -> None:
"""Start worker threads once."""
with self._lock:
if self._threads:
return
count = max(1, self.worker_count)
for index in range(count):
thread = Thread(
target=self._worker_loop,
name=f"dpg-map-tile-worker-{index + 1}",
daemon=True,
)
self._threads.append(thread)
thread.start()
def stop(self) -> None:
"""Ask workers to exit."""
with self._lock:
threads = list(self._threads)
self._threads.clear()
for _ in threads:
self._request_queue.put(None)
def request_visible_tiles(
self,
*,
center: LatLon,
zoom: int,
width: int,
height: int,
provider: TileProvider,
generation: int,
cache_dir: str | Path | None,
margin: int,
) -> list[VisibleTile]:
"""Queue missing visible tiles and return their screen positions."""
visible = calculate_visible_tiles(
center=center,
zoom=zoom,
width=width,
height=height,
provider=provider,
margin=margin,
)
visible_ids = {tile.tile_id for tile in visible}
with self._lock:
self._visible_tile_ids = visible_ids
for entry in self.memory.entries.values():
entry.protected = entry.tile_id in visible_ids
self.start()
for visible_tile in visible:
self._queue_tile(
visible_tile.tile_id,
provider=provider,
generation=generation,
cache_dir=cache_dir,
)
return visible
def _queue_tile(
self,
tile_id: TileID,
*,
provider: TileProvider,
generation: int,
cache_dir: str | Path | None,
) -> None:
with self._lock:
tile = self._tiles.get(tile_id)
if tile is not None and tile.status is TileStatus.READY:
self.memory.record_access(tile_id)
tile.last_accessed_at = time()
return
entry = self.memory.record_access(tile_id)
if entry is not None:
return
if tile_id in self._queued or tile_id in self._loading:
return
self._queued.add(tile_id)
self._tiles[tile_id] = Tile(tile_id, TileStatus.QUEUED, generation=generation)
headers = dict(provider.headers)
if provider.name == "osm":
headers = self._headers_with_osm_user_agent(headers)
path = tile_cache_path(
cache_dir,
provider.name,
tile_id.z,
tile_id.x,
tile_id.y,
provider.file_extension or "png",
)
request = TileRequest(
tile_id=tile_id,
generation=generation,
url=provider.build_url(x=tile_id.x, y=tile_id.y, z=tile_id.z),
path=path,
headers=headers,
disk_cache_max_bytes=self.disk_cache_max_bytes,
protected_paths=frozenset(path for path in self._visible_disk_paths(cache_dir)),
)
self._request_queue.put(request)
def _headers_with_osm_user_agent(self, headers: dict[str, str]) -> dict[str, str]:
if any(key.lower() == "user-agent" for key in headers):
return headers
if self.user_agent:
headers["User-Agent"] = self.user_agent
return headers
with self._lock:
should_warn = not self._warned_osm_user_agent
self._warned_osm_user_agent = True
if should_warn:
warnings.warn(
"OpenStreetMap tile usage should configure an application-specific user_agent",
RuntimeWarning,
stacklevel=3,
)
headers["User-Agent"] = "dpg-map/0.3.0b1"
return headers
def _visible_disk_paths(self, cache_dir: str | Path | None) -> list[Path]:
paths: list[Path] = []
with self._lock:
visible = tuple(self._visible_tile_ids)
for tile_id in visible:
paths.append(
tile_cache_path(
cache_dir,
tile_id.provider_name,
tile_id.z,
tile_id.x,
tile_id.y,
)
)
return paths
def get_ready_tile(self, tile_id: TileID) -> Tile | None:
"""Return a ready tile, updating memory LRU metadata."""
with self._lock:
tile = self._tiles.get(tile_id)
if tile is None or tile.status is not TileStatus.READY:
return None
tile.last_accessed_at = time()
self.memory.record_access(tile_id)
return tile
def drain_results(self, *, generation: int, provider_name: str) -> list[Tile]:
"""Accept current-generation results and return ready tiles."""
accepted: list[Tile] = []
while True:
try:
result = self._result_queue.get_nowait()
except Empty:
break
with self._lock:
self._queued.discard(result.tile_id)
self._loading.discard(result.tile_id)
if result.generation != generation or result.tile_id.provider_name != provider_name:
self._stale_results += 1
continue
if result.status is TileStatus.FAILED:
self._failed.add(result.tile_id)
self._tiles[result.tile_id] = Tile(
result.tile_id,
TileStatus.FAILED,
generation=result.generation,
error=result.error,
)
continue
tile = Tile(
result.tile_id,
TileStatus.READY,
generation=result.generation,
width=result.width,
height=result.height,
pixels=result.pixels,
source=result.source,
)
self._tiles[result.tile_id] = tile
self.memory.put(
MemoryCacheEntry(
tile_id=result.tile_id,
size_bytes=len(result.pixels) * 4,
protected=result.tile_id in self._visible_tile_ids,
texture_tag=None,
)
)
accepted.append(tile)
self._evict_memory_if_needed()
return accepted
def set_texture_tag(self, tile_id: TileID, texture_tag: object) -> None:
"""Record a GUI-thread texture tag for a ready tile."""
with self._lock:
tile = self._tiles.get(tile_id)
if tile is not None:
tile.texture_tag = texture_tag
entry = self.memory.entries.get(tile_id)
if entry is not None:
entry.texture_tag = texture_tag
def take_texture_deletions(self) -> list[object]:
"""Return texture tags that must be deleted by the GUI thread."""
with self._lock:
tags = list(self._delete_texture_tags)
self._delete_texture_tags.clear()
return tags
def clear_memory_cache(self) -> list[object]:
"""Clear decoded memory tiles and return texture tags for GUI deletion."""
with self._lock:
tags = [
entry.texture_tag
for entry in self.memory.entries.values()
if entry.texture_tag is not None
]
self._delete_texture_tags.extend(tags)
self.memory.entries.clear()
self._tiles.clear()
self._queued.clear()
self._loading.clear()
self._failed.clear()
return tags
def clear_disk_cache(
self, cache_dir: str | Path | None, *, provider: str | None = None
) -> None:
"""Clear the persistent cache root or one provider namespace."""
clear_disk_cache_path(cache_dir, provider=provider)
def snapshot(self) -> TileManagerSnapshot:
"""Return diagnostic counters."""
with self._lock:
return TileManagerSnapshot(
queued_tiles=len(self._queued),
loading_tiles=len(self._loading),
failed_tiles=len(self._failed),
visible_tile_count=len(self._visible_tile_ids),
memory_tiles=len(self.memory.entries),
memory_hits=self.memory.hits,
memory_misses=self.memory.misses,
disk_hits=self._disk_hits,
disk_misses=self._disk_misses,
stale_results=self._stale_results,
)
def _evict_memory_if_needed(self) -> None:
with self._lock:
evict_ids = self.memory.plan_evictions()
for raw_tile_id in evict_ids:
tile_id = cast(TileID, raw_tile_id)
entry = self.memory.entries.pop(tile_id, None)
tile = self._tiles.pop(tile_id, None)
texture_tag = None
if entry is not None:
texture_tag = entry.texture_tag
if texture_tag is None and tile is not None:
texture_tag = tile.texture_tag
if texture_tag is not None:
self._delete_texture_tags.append(texture_tag)
def _worker_loop(self) -> None:
while True:
request = self._request_queue.get()
if request is None:
return
with self._lock:
self._queued.discard(request.tile_id)
self._loading.add(request.tile_id)
tile = self._tiles.get(request.tile_id)
if tile is not None:
tile.status = TileStatus.LOADING
result = self._load_tile(request)
self._result_queue.put(result)
def _load_tile(self, request: TileRequest) -> TileResult:
try:
raw, source = self._read_or_fetch(request)
width, height, pixels = decode_tile_image(raw)
return TileResult(
request.tile_id,
request.generation,
TileStatus.READY,
width=width,
height=height,
pixels=pixels,
source=source,
)
except Exception as exc:
return TileResult(
request.tile_id,
request.generation,
TileStatus.FAILED,
error=str(exc),
)
def _read_or_fetch(self, request: TileRequest) -> tuple[bytes, Literal["disk", "network"]]:
if request.path.exists():
try:
raw = request.path.read_bytes()
touch_disk_metadata(tile_metadata_path(request.path))
except (OSError, CacheError):
raw = b""
if raw:
with self._lock:
self._disk_hits += 1
return raw, "disk"
with self._lock:
self._disk_misses += 1
response = requests.get(request.url, headers=request.headers, timeout=20)
response.raise_for_status()
raw = response.content
downloaded_at = time()
try:
request.path.parent.mkdir(parents=True, exist_ok=True)
request.path.write_bytes(raw)
write_disk_metadata(
tile_metadata_path(request.path),
DiskCacheMetadata(
url=request.url,
etag=response.headers.get("ETag"),
last_modified=response.headers.get("Last-Modified"),
expires=response.headers.get("Expires"),
downloaded_at=downloaded_at,
last_accessed_at=downloaded_at,
size_bytes=len(raw),
),
)
prune_disk_cache(
request.path.parents[3],
request.disk_cache_max_bytes,
protected_paths=set(request.protected_paths),
)
except (OSError, CacheError):
pass
return raw, "network"
def decode_tile_image(raw: bytes) -> tuple[int, int, tuple[float, ...]]:
"""Decode an image into Dear PyGui-compatible RGBA float data."""
try:
image = Image.open(BytesIO(raw)).convert("RGBA")
except UnidentifiedImageError as exc:
raise ValueError("tile image data could not be decoded") from exc
width, height = image.size
pixels = tuple(channel / 255.0 for channel in image.tobytes())
return width, height, pixels

31
src/dpg_map/types.py Normal file
View File

@@ -0,0 +1,31 @@
"""Shared type aliases and small value objects."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TypeAlias
Tag: TypeAlias = str | int
LatLon: TypeAlias = tuple[float, float]
Point: TypeAlias = tuple[float, float]
Bounds: TypeAlias = tuple[LatLon, LatLon]
TileCoord: TypeAlias = tuple[int, int, int]
Color: TypeAlias = tuple[int, int, int] | tuple[int, int, int, int]
@dataclass(frozen=True, slots=True)
class ScreenPoint:
"""A pixel position in map widget coordinates."""
x: float
y: float
@dataclass(frozen=True, slots=True)
class GeoBounds:
"""Latitude/longitude bounds with south-west and north-east corners."""
south: float
west: float
north: float
east: float

129
src/dpg_map/widget.py Normal file
View File

@@ -0,0 +1,129 @@
"""Dear PyGui map widget construction."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import Any
from .interaction import (
calculate_hit_rect,
handle_mouse_down,
handle_mouse_drag,
handle_mouse_release,
handle_mouse_wheel,
wheel_delta_from_app_data,
)
from .providers import TileProvider
from .renderer import MapRenderer
from .state import create_map_state, current_map_context
from .types import LatLon, Tag
@contextmanager
def map_widget(
*,
tag: Tag | None = None,
center: LatLon = (0.0, 0.0),
zoom: int = 2,
provider: str | TileProvider | None = None,
width: int = 0,
height: int = 0,
autosize_x: bool = False,
autosize_y: bool = False,
**kwargs: object,
) -> Iterator[Tag | None]:
"""Create a Dear PyGui child-window map shell and logical map context."""
cache_dir_value = kwargs.get("cache_dir")
cache_dir = cache_dir_value if isinstance(cache_dir_value, str | Path) else None
user_agent_value = kwargs.get("user_agent")
user_agent = user_agent_value if isinstance(user_agent_value, str) else None
state = create_map_state(
tag=tag,
center=center,
zoom=zoom,
provider=provider,
width=width,
height=height,
autosize_x=autosize_x,
autosize_y=autosize_y,
cache_dir=cache_dir,
user_agent=user_agent,
)
import dearpygui.dearpygui as dpg
child_kwargs: dict[str, Any] = dict(kwargs)
child_kwargs.pop("cache_dir", None)
child_kwargs.pop("user_agent", None)
child_kwargs.setdefault("border", False)
child_kwargs.setdefault("no_scrollbar", True)
child_kwargs.setdefault("no_scroll_with_mouse", True)
dpg.add_child_window(
tag=state.child_window_tag,
width=width,
height=height,
autosize_x=autosize_x,
autosize_y=autosize_y,
**child_kwargs,
)
dpg.add_texture_registry(tag=state.texture_registry_tag)
dpg.add_drawlist(1, 1, tag=state.drawlist_tag, parent=state.child_window_tag)
dpg.add_handler_registry(tag=state.handler_registry_tag)
def _mouse_pos() -> tuple[float, float]:
pos = dpg.get_mouse_pos(local=False)
return (float(pos[0]), float(pos[1]))
def _hit_rect() -> Any:
draw_pos = tuple(float(value) for value in dpg.get_item_rect_min(state.drawlist_tag))
return calculate_hit_rect(state, (draw_pos[0], draw_pos[1]))
def _on_mouse_down(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, app_data, user_data)
handle_mouse_down(state, _mouse_pos(), _hit_rect())
def _on_mouse_drag(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, app_data, user_data)
handle_mouse_drag(state, _mouse_pos())
def _on_mouse_release(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, app_data, user_data)
handle_mouse_release(state)
def _on_mouse_wheel(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, user_data)
handle_mouse_wheel(
state,
mouse_pos=_mouse_pos(),
wheel_delta=wheel_delta_from_app_data(app_data),
hit_rect=_hit_rect(),
)
dpg.add_mouse_down_handler(
button=dpg.mvMouseButton_Left,
callback=_on_mouse_down,
parent=state.handler_registry_tag,
)
dpg.add_mouse_drag_handler(
button=dpg.mvMouseButton_Left,
threshold=0.0,
callback=_on_mouse_drag,
parent=state.handler_registry_tag,
)
dpg.add_mouse_release_handler(
button=dpg.mvMouseButton_Left,
callback=_on_mouse_release,
parent=state.handler_registry_tag,
)
dpg.add_mouse_wheel_handler(callback=_on_mouse_wheel, parent=state.handler_registry_tag)
renderer = MapRenderer(state, dpg)
with state.lock:
state.renderer = renderer
renderer.schedule_next_frame()
with current_map_context(state.tag):
yield state.tag

1
tests/.gitkeep Normal file
View File

@@ -0,0 +1 @@

90
tests/test_cache.py Normal file
View File

@@ -0,0 +1,90 @@
from __future__ import annotations
from pathlib import Path
from dpg_map.cache import (
CacheStats,
DiskCacheConfig,
DiskCacheMetadata,
MemoryCacheConfig,
clear_disk_cache_path,
disk_cache_size_bytes,
plan_disk_prune,
tile_cache_path,
write_disk_metadata,
)
def test_cache_stats_dataclass_construction() -> None:
stats = CacheStats(
memory_tiles=3,
memory_max_tiles=512,
memory_hits=10,
memory_misses=2,
disk_bytes=1024,
disk_max_bytes=None,
disk_hits=7,
disk_misses=1,
disk_path=Path("/tmp/dpg-map-cache"),
)
assert stats.memory_tiles == 3
assert stats.disk_max_bytes is None
assert stats.disk_path == Path("/tmp/dpg-map-cache")
def test_initial_cache_config_dataclasses() -> None:
memory_config = MemoryCacheConfig()
disk_config = DiskCacheConfig()
assert memory_config.max_tiles == 512
assert disk_config.max_bytes == 2_000_000_000
def test_disk_cache_path_generation(tmp_path: Path) -> None:
path = tile_cache_path(tmp_path, "osm", 4, 8, 9, "jpg")
assert path == tmp_path / "osm" / "4" / "8" / "9.jpg"
def test_disk_cache_prune_ordering(tmp_path: Path) -> None:
first = tile_cache_path(tmp_path, "osm", 1, 1, 1)
second = tile_cache_path(tmp_path, "osm", 1, 1, 2)
protected = tile_cache_path(tmp_path, "osm", 1, 1, 3)
for path, accessed_at in [(first, 1.0), (second, 2.0), (protected, 0.0)]:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"abcde")
write_disk_metadata(
path.with_suffix(".json"),
DiskCacheMetadata(
url=str(path),
downloaded_at=accessed_at,
last_accessed_at=accessed_at,
size_bytes=5,
),
)
planned = plan_disk_prune(tmp_path, 5, protected_paths={protected})
assert planned == [first, second]
def test_provider_scoped_disk_cache_clear(tmp_path: Path) -> None:
osm = tile_cache_path(tmp_path, "osm", 1, 1, 1)
custom = tile_cache_path(tmp_path, "custom", 1, 1, 1)
for path in (osm, custom):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"abcde")
write_disk_metadata(
path.with_suffix(".json"),
DiskCacheMetadata(url=str(path), last_accessed_at=1.0, size_bytes=5),
)
assert disk_cache_size_bytes(tmp_path, provider="osm") == 5
clear_disk_cache_path(tmp_path, provider="osm")
assert not osm.exists()
assert custom.exists()
assert disk_cache_size_bytes(tmp_path) == 5

35
tests/test_commands.py Normal file
View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from dpg_map.commands import CommandKind, MapCommand, MapCommandQueue
def test_overlay_updates_coalesce_by_overlay_tag() -> None:
queue = MapCommandQueue()
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 1.0}))
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "vehicle", "lat": 2.0}))
queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, "map", {"tag": "other", "lat": 3.0}))
drained = queue.drain()
assert len(drained) == 2
assert drained[0].payload == {"tag": "vehicle", "lat": 2.0}
assert drained[1].payload == {"tag": "other", "lat": 3.0}
def test_latest_view_update_wins_but_structural_commands_keep_order() -> None:
queue = MapCommandQueue()
queue.put(MapCommand(CommandKind.ADD_OVERLAY, "map", {"tag": "a"}))
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 3}))
queue.put(MapCommand(CommandKind.SET_VIEW, "map", {"zoom": 4}))
queue.put(MapCommand(CommandKind.DELETE_OVERLAY, "map", {"tag": "a"}))
drained = queue.drain()
assert [command.kind for command in drained] == [
CommandKind.ADD_OVERLAY,
CommandKind.SET_VIEW,
CommandKind.DELETE_OVERLAY,
]
assert drained[1].payload == {"zoom": 4}

140
tests/test_hardening.py Normal file
View File

@@ -0,0 +1,140 @@
from __future__ import annotations
from math import nan
import pytest
import dpg_map as dpgm
from dpg_map.commands import CommandKind
from dpg_map.exceptions import (
CoordinateError,
MapNotFoundError,
OverlayNotFoundError,
ProviderNotFoundError,
)
from dpg_map.overlays import TrajectoryOverlay
from dpg_map.providers import TileProvider
from dpg_map.renderer import drain_renderer_commands
from dpg_map.state import DirtyFlags, InteractionState, create_map_state, get_map_state
from dpg_map.tiles import TileID, TileResult, TileStatus
def test_public_callables_have_docstrings() -> None:
for name in dpgm.__all__:
value = getattr(dpgm, name)
if callable(value):
assert value.__doc__, name
def test_unknown_map_raises_public_error() -> None:
with pytest.raises(MapNotFoundError):
dpgm.get_center(map_tag="missing-map")
def test_unknown_overlay_raises_public_error() -> None:
create_map_state(tag="missing-overlay")
with pytest.raises(OverlayNotFoundError):
dpgm.update_marker("vehicle", lat=47.0, lon=2.0, map_tag="missing-overlay")
def test_unknown_provider_raises_public_error() -> None:
create_map_state(tag="missing-provider")
with pytest.raises(ProviderNotFoundError):
dpgm.set_provider("missing-provider-name", map_tag="missing-provider")
def test_invalid_coordinates_raise_public_error() -> None:
create_map_state(tag="invalid-coordinates")
with pytest.raises(CoordinateError):
dpgm.add_marker("bad-lat", lat=91.0, lon=2.0, map_tag="invalid-coordinates")
with pytest.raises(CoordinateError):
dpgm.set_center(nan, 2.0, map_tag="invalid-coordinates")
def test_mismatched_polyline_lat_lon_lengths_raise_public_error() -> None:
create_map_state(tag="mismatched-polyline")
with pytest.raises(CoordinateError):
dpgm.add_polyline("line", lats=[47.0, 47.1], lons=[2.0], map_tag="mismatched-polyline")
def test_empty_trajectory_is_valid_for_live_updates() -> None:
create_map_state(tag="empty-trajectory")
dpgm.add_trajectory("track", points=[], map_tag="empty-trajectory")
state = get_map_state("empty-trajectory")
overlay = state.overlays["track"]
assert isinstance(overlay, TrajectoryOverlay)
assert overlay.points == ()
def test_clear_deleted_overlay_raises_public_error() -> None:
create_map_state(tag="deleted-overlay")
dpgm.add_marker("vehicle", lat=47.0, lon=2.0, map_tag="deleted-overlay")
dpgm.delete_overlay("vehicle", map_tag="deleted-overlay")
with pytest.raises(OverlayNotFoundError):
dpgm.delete_overlay("vehicle", map_tag="deleted-overlay")
def test_provider_switch_ignores_tiles_that_finish_after_switch() -> None:
provider = TileProvider(
name="hardening-switch-provider",
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
min_zoom=0,
max_zoom=4,
)
dpgm.register_provider(provider)
try:
state = create_map_state(tag="provider-switch-loading", zoom=3)
old_tile = TileID("osm", 3, 1, 2)
with state.tile_manager._lock:
state.tile_manager._loading.add(old_tile)
dpgm.set_provider("hardening-switch-provider", map_tag="provider-switch-loading")
state.tile_manager._result_queue.put(
TileResult(
old_tile,
generation=0,
status=TileStatus.READY,
width=1,
height=1,
pixels=(1.0, 1.0, 1.0, 1.0),
source="network",
)
)
commands = drain_renderer_commands(state)
accepted = state.tile_manager.drain_results(
generation=state.generation,
provider_name=state.provider.name,
)
assert [command.kind for command in commands] == [CommandKind.SET_PROVIDER]
assert accepted == []
assert state.tile_manager.snapshot().stale_results == 1
assert state.tile_manager.get_ready_tile(old_tile) is None
finally:
dpgm.unregister_provider("hardening-switch-provider")
def test_overlay_update_preserves_active_drag_model_state() -> None:
create_map_state(tag="update-while-dragging", center=(47.0, 2.0), zoom=9)
dpgm.add_marker("vehicle", lat=47.0, lon=2.0, map_tag="update-while-dragging")
state = get_map_state("update-while-dragging")
state.command_queue.drain()
state.dirty = DirtyFlags.NONE
state.interaction = InteractionState(active_drag=True, last_mouse_position=(20.0, 30.0))
dpgm.update_marker("vehicle", lat=47.1, lon=2.1, map_tag="update-while-dragging")
assert state.interaction.active_drag is True
assert state.interaction.last_mouse_position == (20.0, 30.0)
assert state.center == (47.0, 2.0)
assert state.zoom == 9
assert state.dirty == DirtyFlags.OVERLAYS

109
tests/test_interaction.py Normal file
View File

@@ -0,0 +1,109 @@
from __future__ import annotations
import pytest
import dpg_map as dpgm
from dpg_map.commands import CommandKind
from dpg_map.interaction import (
calculate_hit_rect,
handle_mouse_down,
handle_mouse_drag,
handle_mouse_release,
handle_mouse_wheel,
pan_state_by_pixels,
update_drag_from_button_state,
)
from dpg_map.sizing import SizeMeasurement, apply_size_measurement
from dpg_map.state import DirtyFlags, create_map_state, get_map_state
def test_hit_rect_uses_effective_map_size() -> None:
state = create_map_state(tag="hit-rect")
apply_size_measurement(state, SizeMeasurement(width=400, height=250, visible=True))
rect = calculate_hit_rect(state, (10.0, 20.0))
assert rect.x == 10.0
assert rect.y == 20.0
assert rect.width == 400
assert rect.height == 250
assert rect.contains(410.0, 270.0)
assert not rect.contains(411.0, 270.0)
def test_pan_updates_center_and_queues_view_command() -> None:
state = create_map_state(tag="pan", center=(0.0, 0.0), zoom=3)
apply_size_measurement(state, SizeMeasurement(width=400, height=300, visible=True))
old_center = state.center
pan_state_by_pixels(state, 40.0, 0.0)
assert state.center != old_center
assert state.center[1] < old_center[1]
assert state.dirty & DirtyFlags.VIEW
drained = state.command_queue.drain()
assert drained[-1].kind is CommandKind.SET_VIEW
assert drained[-1].payload["center"] == state.center
def test_mouse_drag_uses_active_drag_state() -> None:
state = create_map_state(tag="drag", center=(0.0, 0.0), zoom=3)
apply_size_measurement(state, SizeMeasurement(width=400, height=300, visible=True))
rect = calculate_hit_rect(state, (10.0, 20.0))
handle_mouse_down(state, (20.0, 30.0), rect)
assert state.interaction.active_drag is True
handle_mouse_drag(state, (45.0, 30.0))
handle_mouse_release(state)
assert state.interaction.active_drag is False
assert state.interaction.last_mouse_position is None
assert state.center[1] < 0.0
def test_polled_drag_starts_and_moves_while_button_is_down() -> None:
state = create_map_state(tag="polled-drag", center=(0.0, 0.0), zoom=3)
apply_size_measurement(state, SizeMeasurement(width=400, height=300, visible=True))
rect = calculate_hit_rect(state, (10.0, 20.0))
update_drag_from_button_state(state, mouse_pos=(20.0, 30.0), hit_rect=rect, is_down=True)
update_drag_from_button_state(state, mouse_pos=(45.0, 30.0), hit_rect=rect, is_down=True)
update_drag_from_button_state(state, mouse_pos=(45.0, 30.0), hit_rect=rect, is_down=False)
assert state.interaction.active_drag is False
assert state.center[1] < 0.0
def test_wheel_zoom_keeps_cursor_latlon_stable() -> None:
state = create_map_state(tag="wheel", center=(47.9029, 1.9093), zoom=8)
apply_size_measurement(state, SizeMeasurement(width=800, height=600, visible=True))
rect = calculate_hit_rect(state, (100.0, 50.0))
before = dpgm.screen_to_latlon(300.0, 200.0, map_tag="wheel")
handle_mouse_wheel(state, mouse_pos=(400.0, 250.0), wheel_delta=1.0, hit_rect=rect)
after = dpgm.screen_to_latlon(300.0, 200.0, map_tag="wheel")
assert state.zoom == 9
assert after == pytest.approx(before, abs=1e-7)
def test_view_coordinate_helpers_roundtrip() -> None:
create_map_state(tag="view-roundtrip", center=(47.9029, 1.9093), zoom=12)
state = get_map_state("view-roundtrip")
apply_size_measurement(state, SizeMeasurement(width=800, height=600, visible=True))
screen = dpgm.latlon_to_screen(47.91, 1.92, map_tag="view-roundtrip")
latlon = dpgm.screen_to_latlon(*screen, map_tag="view-roundtrip")
assert latlon == pytest.approx((47.91, 1.92), abs=1e-7)
def test_fit_bounds_sets_center_and_zoom() -> None:
create_map_state(tag="fit", center=(0.0, 0.0), zoom=2)
state = get_map_state("fit")
apply_size_measurement(state, SizeMeasurement(width=800, height=600, visible=True))
dpgm.fit_bounds(((47.8, 1.8), (48.0, 2.0)), map_tag="fit")
assert dpgm.get_center(map_tag="fit") == pytest.approx((47.9, 1.9))
assert dpgm.get_zoom(map_tag="fit") > 2

View File

@@ -0,0 +1,113 @@
from __future__ import annotations
from threading import Thread
import pytest
import dpg_map as dpgm
from dpg_map.commands import CommandKind
from dpg_map.exceptions import CoordinateError
from dpg_map.overlays import TrajectoryOverlay
from dpg_map.state import DirtyFlags, create_map_state, get_map_state
def test_overlay_update_does_not_alter_center_or_zoom() -> None:
create_map_state(tag="overlay-isolation", center=(47.0, 2.0), zoom=8)
dpgm.add_marker("vehicle", lat=47.1, lon=2.1, map_tag="overlay-isolation")
before_center = dpgm.get_center(map_tag="overlay-isolation")
before_zoom = dpgm.get_zoom(map_tag="overlay-isolation")
dpgm.update_marker("vehicle", lat=47.2, lon=2.2, map_tag="overlay-isolation")
assert dpgm.get_center(map_tag="overlay-isolation") == before_center
assert dpgm.get_zoom(map_tag="overlay-isolation") == before_zoom
state = get_map_state("overlay-isolation")
assert state.dirty & DirtyFlags.OVERLAYS
def test_trajectory_inputs_are_copied() -> None:
create_map_state(tag="trajectory-copy")
lats = [1.0, 2.0]
lons = [3.0, 4.0]
dpgm.add_trajectory("track", lats=lats, lons=lons, map_tag="trajectory-copy")
lats[0] = 99.0
lons[0] = 99.0
state = get_map_state("trajectory-copy")
overlay = state.overlays["track"]
assert isinstance(overlay, TrajectoryOverlay)
assert overlay.points == ((1.0, 3.0), (2.0, 4.0))
def test_mismatched_lat_lon_lengths_raise() -> None:
create_map_state(tag="bad-coordinates")
with pytest.raises(CoordinateError):
dpgm.add_trajectory("track", lats=[1.0], lons=[2.0, 3.0], map_tag="bad-coordinates")
def test_layer_state_tracks_visibility_and_overlay_membership() -> None:
create_map_state(tag="layers")
dpgm.add_layer("fleet", map_tag="layers")
dpgm.add_marker("vehicle", lat=1.0, lon=2.0, layer="fleet", map_tag="layers")
dpgm.hide_layer("fleet", map_tag="layers")
state = get_map_state("layers")
assert state.layers["fleet"].show is False
assert state.layers["fleet"].overlay_tags == {"vehicle"}
dpgm.clear_layer("fleet", map_tag="layers")
assert state.layers["fleet"].overlay_tags == set()
assert "vehicle" not in state.overlays
def test_add_layer_can_update_visibility_and_z_index() -> None:
create_map_state(tag="layer-order")
dpgm.add_layer("fleet", z_index=25, show=False, map_tag="layer-order")
dpgm.add_layer("fleet", z_index=30, show=True, map_tag="layer-order")
state = get_map_state("layer-order")
assert state.layers["fleet"].show is True
assert state.layers["fleet"].z_index == 30
def test_threaded_marker_updates_coalesce_without_touching_view_or_drag_state() -> None:
create_map_state(tag="threaded-marker", center=(47.0, 2.0), zoom=9)
dpgm.add_marker("vehicle", lat=47.0, lon=2.0, map_tag="threaded-marker")
state = get_map_state("threaded-marker")
state.command_queue.drain()
state.dirty = DirtyFlags.NONE
state.interaction.active_drag = True
state.interaction.last_mouse_position = (100.0, 100.0)
before_center = state.center
before_zoom = state.zoom
def update_worker(offset: float) -> None:
for index in range(100):
dpgm.update_marker(
"vehicle",
lat=47.0 + offset,
lon=2.0 + index * 0.00001,
map_tag="threaded-marker",
)
threads = [Thread(target=update_worker, args=(worker * 0.0001,)) for worker in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
commands = state.command_queue.drain()
assert state.center == before_center
assert state.zoom == before_zoom
assert state.interaction.active_drag is True
assert state.interaction.last_mouse_position == (100.0, 100.0)
assert state.dirty == DirtyFlags.OVERLAYS
assert [command.kind for command in commands] == [CommandKind.UPDATE_OVERLAY]

44
tests/test_projection.py Normal file
View File

@@ -0,0 +1,44 @@
from __future__ import annotations
import pytest
from dpg_map.projection import (
WEB_MERCATOR_MAX_LAT,
clamp_latitude,
latlon_to_tile,
latlon_to_world,
screen_to_world,
world_to_latlon,
world_to_screen,
)
@pytest.mark.parametrize(
("lat", "lon", "zoom"),
[
(0.0, 0.0, 0),
(47.9029, 1.9093, 15),
(-33.8688, 151.2093, 10),
(WEB_MERCATOR_MAX_LAT, 179.999, 4),
],
)
def test_projection_roundtrip(lat: float, lon: float, zoom: int) -> None:
x, y = latlon_to_world(lat, lon, zoom)
roundtrip_lat, roundtrip_lon = world_to_latlon(x, y, zoom)
assert roundtrip_lat == pytest.approx(clamp_latitude(lat), abs=1e-7)
assert roundtrip_lon == pytest.approx(lon, abs=1e-7)
def test_latlon_to_tile_returns_xyz_coordinate() -> None:
assert latlon_to_tile(0.0, 0.0, 1) == (1, 1, 1)
def test_world_screen_roundtrip() -> None:
center = (47.9029, 1.9093)
world = latlon_to_world(47.91, 1.92, 14)
screen = world_to_screen(*world, center=center, zoom=14, width=800, height=600)
assert screen_to_world(*screen, center=center, zoom=14, width=800, height=600) == pytest.approx(
world
)

71
tests/test_providers.py Normal file
View File

@@ -0,0 +1,71 @@
from __future__ import annotations
import pytest
import dpg_map as dpgm
from dpg_map.exceptions import InvalidProviderError, ProviderExistsError, ProviderNotFoundError
def test_default_provider_is_registered() -> None:
provider = dpgm.get_provider("osm")
assert provider.name == "osm"
assert "osm" in dpgm.list_providers()
assert provider.build_url(x=1, y=2, z=3) == "https://tile.openstreetmap.org/3/1/2.png"
def test_provider_registration_roundtrip() -> None:
provider = dpgm.TileProvider(
name="unit-test-provider",
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
attribution="Example",
)
dpgm.register_provider(provider)
try:
assert dpgm.get_provider("unit-test-provider") == provider
assert "unit-test-provider" in dpgm.list_providers()
finally:
dpgm.unregister_provider("unit-test-provider")
with pytest.raises(ProviderNotFoundError):
dpgm.get_provider("unit-test-provider")
def test_duplicate_provider_registration_fails() -> None:
provider = dpgm.TileProvider(
name="unit-test-duplicate",
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
)
dpgm.register_provider(provider)
try:
with pytest.raises(ProviderExistsError):
dpgm.register_provider(provider)
finally:
dpgm.unregister_provider("unit-test-duplicate")
def test_provider_url_building_with_subdomains_retina_and_extension() -> None:
provider = dpgm.TileProvider(
name="unit-test-template",
url_template="https://{s}.tiles.example.test/{z}/{x}/{y}{r}.{ext}",
subdomains=("a", "b", "c"),
retina=True,
file_extension="webp",
)
assert provider.build_url(x=1, y=2, z=3) == "https://a.tiles.example.test/3/1/2@2x.webp"
@pytest.mark.parametrize(
"template",
[
"https://tiles.example.test/{z}/{x}.png",
"https://tiles.example.test/{z}/{x}/{y}/{quadkey}.png",
"",
],
)
def test_invalid_provider_templates_fail(template: str) -> None:
with pytest.raises(InvalidProviderError):
dpgm.TileProvider(name="bad-template", url_template=template)

49
tests/test_public_api.py Normal file
View File

@@ -0,0 +1,49 @@
from __future__ import annotations
def test_package_exports_required_public_api() -> None:
import dpg_map as dpgm
expected = {
"configure",
"CacheStats",
"TileProvider",
"register_provider",
"unregister_provider",
"get_provider",
"list_providers",
"map_widget",
"set_center",
"get_center",
"set_zoom",
"get_zoom",
"set_view",
"fit_bounds",
"screen_to_latlon",
"latlon_to_screen",
"add_marker",
"add_polyline",
"add_trajectory",
"update_marker",
"update_polyline",
"update_trajectory",
"set_marker_position",
"set_marker_label",
"set_polyline_points",
"set_overlay_show",
"delete_overlay",
"add_layer",
"show_layer",
"hide_layer",
"clear_layer",
"clear_map",
"set_provider",
"clear_memory_cache",
"clear_disk_cache",
"get_cache_stats",
"get_map_debug_state",
}
assert set(dpgm.__all__) == expected
for name in expected:
assert hasattr(dpgm, name)

176
tests/test_renderer.py Normal file
View File

@@ -0,0 +1,176 @@
from __future__ import annotations
from typing import Any
import dpg_map as dpgm
from dpg_map.commands import CommandKind, MapCommand
from dpg_map.providers import TileProvider
from dpg_map.renderer import MapRenderer, drain_renderer_commands
from dpg_map.state import DirtyFlags, create_map_state
from dpg_map.tiles import TileID, TileResult, TileStatus
class FakeDpg:
def __init__(self) -> None:
self.items: set[str | int] = set()
self.deleted: list[tuple[str | int, bool]] = []
self.drawn: list[tuple[str, str | int]] = []
def does_item_exist(self, tag: str | int) -> bool:
return tag in self.items
def add_draw_layer(self, *, parent: str | int, tag: str | int) -> None:
_ = parent
self.items.add(tag)
def delete_item(self, tag: str | int, *, children_only: bool = False) -> None:
self.deleted.append((tag, children_only))
def draw_rectangle(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("rectangle", parent))
def draw_image(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("image", parent))
def draw_text(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("text", parent))
def draw_circle(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("circle", parent))
def draw_polyline(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("polyline", parent))
def test_renderer_command_drain_preserves_structural_order_and_coalesces() -> None:
state = create_map_state(tag="renderer-drain")
state.dirty = DirtyFlags.NONE
state.command_queue.put(MapCommand(CommandKind.ADD_OVERLAY, state.tag, {"tag": "a"}))
state.command_queue.put(MapCommand(CommandKind.SET_VIEW, state.tag, {"zoom": 3}))
state.command_queue.put(MapCommand(CommandKind.SET_VIEW, state.tag, {"zoom": 4}))
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a", "v": 1}))
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a", "v": 2}))
state.command_queue.put(MapCommand(CommandKind.DELETE_OVERLAY, state.tag, {"tag": "a"}))
commands = drain_renderer_commands(state)
assert [command.kind for command in commands] == [
CommandKind.ADD_OVERLAY,
CommandKind.SET_VIEW,
CommandKind.UPDATE_OVERLAY,
CommandKind.DELETE_OVERLAY,
]
assert commands[1].payload == {"zoom": 4}
assert commands[2].payload == {"tag": "a", "v": 2}
assert state.dirty & DirtyFlags.VIEW
assert state.dirty & DirtyFlags.OVERLAYS
def test_overlay_draw_clears_only_overlay_layer() -> None:
state = create_map_state(tag="overlay-draw", center=(47.0, 2.0), zoom=8)
dpgm.add_marker(
"vehicle",
lat=47.0,
lon=2.0,
show_label=True,
label="Vehicle",
map_tag="overlay-draw",
)
fake = FakeDpg()
fake.items.add(state.drawlist_tag)
renderer = MapRenderer(state, fake)
renderer._draw_tile_layer(
visible_tiles=[], width=400, height=300, attribution="Tiles", tile_size=256
)
fake.deleted.clear()
with state.lock:
overlays = tuple(state.overlays.values())
layers = {name: (layer.show, layer.z_index) for name, layer in state.layers.items()}
renderer._draw_overlay_layer(
overlays=overlays,
layers=layers,
center=state.center,
zoom=state.zoom,
width=400,
height=300,
tile_size=256,
)
assert fake.deleted == [("overlay-draw##layer-overlays", True)]
assert ("circle", "overlay-draw##layer-overlays") in fake.drawn
assert ("text", "overlay-draw##layer-overlays") in fake.drawn
def test_overlay_update_drain_sets_only_overlay_dirty() -> None:
state = create_map_state(tag="overlay-dirty")
state.dirty = DirtyFlags.NONE
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a"}))
drain_renderer_commands(state)
assert state.dirty == DirtyFlags.OVERLAYS
def test_provider_switch_keeps_overlays_and_invalidates_tiles() -> None:
provider = TileProvider(
name="renderer-switch-provider",
url_template="https://tiles.example.test/{z}/{x}/{y}.png",
min_zoom=3,
max_zoom=4,
attribution="Example",
)
dpgm.register_provider(provider)
try:
state = create_map_state(tag="provider-switch", center=(47.0, 2.0), zoom=8)
dpgm.add_marker("vehicle", lat=47.0, lon=2.0, map_tag="provider-switch")
state.command_queue.drain()
state.dirty = DirtyFlags.NONE
tile_id = TileID("osm", 3, 1, 2)
state.tile_manager._result_queue.put(
TileResult(
tile_id,
generation=state.generation,
status=TileStatus.READY,
width=1,
height=1,
pixels=(1.0, 1.0, 1.0, 1.0),
source="disk",
)
)
state.tile_manager.drain_results(generation=state.generation, provider_name="osm")
state.tile_manager.set_texture_tag(tile_id, "old-texture")
dpgm.set_provider("renderer-switch-provider", map_tag="provider-switch")
drain_renderer_commands(state)
assert "vehicle" in state.overlays
assert state.center == (47.0, 2.0)
assert state.zoom == 4
assert state.generation == 1
assert state.provider.name == "renderer-switch-provider"
assert state.dirty & DirtyFlags.PROVIDER
assert state.dirty & DirtyFlags.TILES
assert state.dirty & DirtyFlags.OVERLAYS
assert state.tile_manager.get_ready_tile(tile_id) is None
assert state.tile_manager.take_texture_deletions() == ["old-texture"]
finally:
dpgm.unregister_provider("renderer-switch-provider")
def test_map_scoped_clear_disk_cache_command_keeps_dearpygui_out_of_caller_thread() -> None:
state = create_map_state(tag="clear-disk-command")
state.dirty = DirtyFlags.NONE
dpgm.clear_disk_cache(map_tag="clear-disk-command")
commands = state.command_queue.drain()
assert [command.kind for command in commands] == [CommandKind.CLEAR_DISK_CACHE]
assert state.dirty == DirtyFlags.TILES

44
tests/test_sizing.py Normal file
View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from dpg_map.sizing import SizeMeasurement, apply_size_measurement, effective_draw_size
from dpg_map.state import DirtyFlags, create_map_state
def test_size_measurement_tracks_last_nonzero_size() -> None:
state = create_map_state(tag="size-last-nonzero")
first = apply_size_measurement(state, SizeMeasurement(width=640, height=360, visible=True))
hidden = apply_size_measurement(state, SizeMeasurement(width=0, height=0, visible=False))
assert first.changed is True
assert state.measured_width == 0
assert state.measured_height == 0
assert state.last_nonzero_width == 640
assert state.last_nonzero_height == 360
assert hidden.became_hidden is True
assert effective_draw_size(state) == (640, 360)
def test_zero_size_does_not_permanently_collapse_map() -> None:
state = create_map_state(tag="size-reappears")
apply_size_measurement(state, SizeMeasurement(width=500, height=300, visible=True))
apply_size_measurement(state, SizeMeasurement(width=0, height=0, visible=False))
update = apply_size_measurement(state, SizeMeasurement(width=700, height=450, visible=True))
assert update.became_visible is True
assert update.effective_width == 700
assert update.effective_height == 450
assert state.last_nonzero_width == 700
assert state.last_nonzero_height == 450
def test_resize_marks_size_dirty() -> None:
state = create_map_state(tag="size-dirty")
state.dirty = DirtyFlags.NONE
apply_size_measurement(state, SizeMeasurement(width=320, height=240, visible=True))
assert state.dirty & DirtyFlags.SIZE
assert state.dirty & DirtyFlags.TILES
assert state.dirty & DirtyFlags.OVERLAYS

87
tests/test_tiles.py Normal file
View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from io import BytesIO
from PIL import Image
from dpg_map.providers import OSM
from dpg_map.tiles import (
TileID,
TileManager,
TileResult,
TileStatus,
calculate_visible_tiles,
decode_tile_image,
)
def test_visible_tile_calculation_uses_provider_namespace() -> None:
tiles = calculate_visible_tiles(
center=(0.0, 0.0),
zoom=2,
width=256,
height=256,
provider=OSM,
margin=0,
)
assert tiles
assert {tile.tile_id.provider_name for tile in tiles} == {"osm"}
assert all(0 <= tile.tile_id.x <= 3 for tile in tiles)
assert all(0 <= tile.tile_id.y <= 3 for tile in tiles)
def test_tile_manager_ignores_stale_generation_results() -> None:
manager = TileManager()
stale = TileResult(
TileID("osm", 1, 0, 0),
generation=1,
status=TileStatus.READY,
width=1,
height=1,
pixels=(1.0, 1.0, 1.0, 1.0),
source="disk",
)
manager._result_queue.put(stale)
accepted = manager.drain_results(generation=2, provider_name="osm")
assert accepted == []
assert manager.snapshot().stale_results == 1
def test_memory_eviction_protects_visible_tiles() -> None:
manager = TileManager(memory_cache_max_tiles=1)
protected = TileID("osm", 1, 0, 0)
evictable = TileID("osm", 1, 0, 1)
with manager._lock:
manager._visible_tile_ids = {protected}
for tile_id in (protected, evictable):
manager._result_queue.put(
TileResult(
tile_id,
generation=1,
status=TileStatus.READY,
width=1,
height=1,
pixels=(1.0, 1.0, 1.0, 1.0),
source="disk",
)
)
manager.drain_results(generation=1, provider_name="osm")
assert manager.get_ready_tile(protected) is not None
assert manager.get_ready_tile(evictable) is None
def test_decode_png_tile_image() -> None:
image = Image.new("RGBA", (1, 1), (255, 0, 128, 255))
buffer = BytesIO()
image.save(buffer, format="PNG")
width, height, pixels = decode_tile_image(buffer.getvalue())
assert (width, height) == (1, 1)
assert pixels == (1.0, 0.0, 128 / 255, 1.0)

2
uv.lock generated
View File

@@ -74,7 +74,7 @@ wheels = [
[[package]]
name = "dpg-map"
version = "0.1.0"
version = "0.3.0b1"
source = { editable = "." }
dependencies = [
{ name = "dearpygui" },