Compare commits

...

3 Commits

14 changed files with 995 additions and 31 deletions

View File

@@ -2,7 +2,7 @@
## Current status ## Current status
Step 4 complete. Step 6 complete.
## Completed steps ## Completed steps
@@ -10,10 +10,12 @@ Step 1 - Public API contract and pure core.
Step 2 - Thread-safe state, commands, overlays, and cache model. Step 2 - Thread-safe state, commands, overlays, and cache model.
Step 3 - Widget shell, sizing system, and GUI-thread frame pump. Step 3 - Widget shell, sizing system, and GUI-thread frame pump.
Step 4 - Tile manager, persistent cache, and asynchronous loading. Step 4 - Tile manager, persistent cache, and asynchronous loading.
Step 5 - Interaction: pan, zoom, and view commands.
Step 6 - Overlay rendering and runtime update stress tests.
## Current step ## Current step
Step 5 - Interaction: pan, zoom, and view commands. Step 7 - Layers, provider switching, and clearing APIs.
## Design decisions ## Design decisions
@@ -68,7 +70,31 @@ None yet.
- Ran `uv run ruff format --check .`. - Ran `uv run ruff format --check .`.
- Ran `uv run pyright`. - Ran `uv run pyright`.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry creation. - Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry creation.
- Implemented left mouse drag panning using the measured drawlist rectangle.
- Implemented mouse wheel zoom around the cursor.
- Implemented projection-backed `screen_to_latlon`, `latlon_to_screen`, and zoom-fitting `fit_bounds`.
- Attached Dear PyGui mouse handlers through the map handler registry.
- Added interaction debug state for active drag and last mouse position.
- Added Step 5 tests for pan, cursor zoom, view conversion, bounds fitting, and overlay/view isolation.
- Added Pyright virtualenv settings so `uv run pyright` resolves installed dependencies.
- Ran `uv run pytest`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format --check .`.
- Ran `uv run pyright`.
- Ran a Dear PyGui context smoke check for `map_widget` child-window/drawlist/texture-registry/handler-registry creation.
- Implemented Dear PyGui draw-layer bookkeeping for background, tiles, overlays, and attribution.
- Rendered markers, polylines, and trajectories from GUI-thread overlay snapshots.
- Isolated overlay redraws so they clear only the overlay draw layer and do not clear tile draw commands or textures.
- Added live background-thread marker and trajectory stress examples.
- Added Step 6 tests for overlay draw-layer isolation, overlay-only dirty flags, threaded update coalescing, and view/drag-state isolation.
- Updated `README.md` with Step 6 overlay rendering behavior and examples.
- Ran `uv run ruff format .`.
- Ran `uv run pytest`.
- Ran `uv run pyright`.
- Ran `uv run ruff check .`.
- Ran `uv run ruff format --check .`.
- Ran a Dear PyGui context smoke check for `map_widget` with marker, polyline, and trajectory overlays.
## Next action ## Next action
Implement Step 5. Implement Step 7.

View File

@@ -2,7 +2,7 @@
`dpg-map` is a Dear PyGui map widget package under rebuild. `dpg-map` is a Dear PyGui map widget package under rebuild.
The Step 4 tile manager is in place: The Step 6 overlay rendering layer is in place:
```python ```python
import dpg_map as dpgm import dpg_map as dpgm
@@ -32,12 +32,12 @@ Implemented so far:
- Dear PyGui `child_window` + measured-size `drawlist` widget shell - Dear PyGui `child_window` + measured-size `drawlist` widget shell
- GUI-thread frame pump that drains commands, manages textures, and draws raster tiles - GUI-thread frame pump that drains commands, manages textures, and draws raster tiles
- sizing helpers that preserve the last non-zero size across hidden layouts - sizing helpers that preserve the last non-zero size across hidden layouts
- interaction hit-rectangle calculation for the measured map area
- asynchronous tile workers that read disk cache, fetch HTTP, and decode images - asynchronous tile workers that read disk cache, fetch HTTP, and decode images
- memory cache with visible-tile protection and GUI-thread texture deletion - memory cache with visible-tile protection and GUI-thread texture deletion
- measured-rectangle mouse interaction for left-drag panning and wheel zooming
Overlay drawing is not implemented yet. Step 5 will add pan/zoom interaction and - programmatic view commands and screen/latitude-longitude conversion helpers
view command projection. - marker, polyline, and trajectory rendering on a draw layer separate from tiles
- background-thread overlay updates that coalesce without resetting center or zoom
Examples: Examples:
@@ -48,4 +48,6 @@ uv run python examples/sizing_child.py
uv run python examples/sizing_table.py uv run python examples/sizing_table.py
uv run python examples/hidden_tab.py uv run python examples/hidden_tab.py
uv run python examples/cache_stress.py uv run python examples/cache_stress.py
uv run python examples/markers_live_thread.py
uv run python examples/trajectory_live_thread.py
``` ```

View File

@@ -0,0 +1,66 @@
from __future__ import annotations
from math import cos, sin
from threading import Event, Thread
from time import sleep
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map markers_live_thread example")
stop = Event()
dpg.create_context()
dpg.create_viewport(title="dpg-map live markers", width=1000, height=700)
with (
dpg.window(label="Live Markers", width=-1, height=-1),
dpgm.map_widget(
tag="live-markers-map", center=(47.9029, 1.9093), zoom=15, width=-1, height=-1
),
):
for index in range(12):
dpgm.add_marker(
f"vehicle-{index}",
lat=47.9029,
lon=1.9093,
label=str(index + 1),
show_label=True,
color=(240, 92, 70, 255),
)
def update_markers() -> None:
tick = 0
while not stop.is_set():
for index in range(12):
angle = tick * 0.08 + index * 0.52
radius = 0.0015 + (index % 4) * 0.0002
dpgm.update_marker(
f"vehicle-{index}",
lat=47.9029 + sin(angle) * radius,
lon=1.9093 + cos(angle) * radius,
map_tag="live-markers-map",
)
tick += 1
sleep(1 / 30)
worker = Thread(target=update_markers, name="dpg-map-live-markers", daemon=True)
worker.start()
try:
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
finally:
stop.set()
worker.join(timeout=1.0)
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from collections import deque
from math import cos, sin
from threading import Event, Thread
from time import sleep
from typing import Any
import dearpygui.dearpygui as _dpg
import dpg_map as dpgm
dpg: Any = _dpg
def main() -> None:
dpgm.configure(user_agent="dpg-map trajectory_live_thread example")
stop = Event()
points: deque[tuple[float, float]] = deque(maxlen=240)
dpg.create_context()
dpg.create_viewport(title="dpg-map live trajectory", width=1000, height=700)
with (
dpg.window(label="Live Trajectory", width=-1, height=-1),
dpgm.map_widget(
tag="live-trajectory-map",
center=(47.9029, 1.9093),
zoom=15,
width=-1,
height=-1,
),
):
dpgm.add_trajectory(
"track",
points=[],
color=(250, 190, 80, 255),
thickness=3.0,
show_points=True,
point_stride=12,
)
dpgm.add_marker(
"head",
lat=47.9029,
lon=1.9093,
color=(72, 205, 154, 255),
radius=6,
)
def update_trajectory() -> None:
tick = 0
while not stop.is_set():
angle = tick * 0.06
lat = 47.9029 + sin(angle) * 0.0016 + sin(angle * 2.7) * 0.00025
lon = 1.9093 + cos(angle) * 0.0016
points.append((lat, lon))
snapshot = tuple(points)
dpgm.update_trajectory("track", points=snapshot, map_tag="live-trajectory-map")
dpgm.update_marker("head", lat=lat, lon=lon, map_tag="live-trajectory-map")
tick += 1
sleep(1 / 20)
worker = Thread(target=update_trajectory, name="dpg-map-live-trajectory", daemon=True)
worker.start()
try:
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()
finally:
stop.set()
worker.join(timeout=1.0)
dpg.destroy_context()
if __name__ == "__main__":
main()

View File

@@ -37,3 +37,5 @@ select = ["E", "F", "I", "UP", "B", "SIM"]
[tool.pyright] [tool.pyright]
typeCheckingMode = "basic" typeCheckingMode = "basic"
venvPath = "."
venv = ".venv"

View File

@@ -11,8 +11,11 @@ from typing import Any
from .cache import CacheStats, clear_disk_cache_path, disk_cache_root, disk_cache_size_bytes from .cache import CacheStats, clear_disk_cache_path, disk_cache_root, disk_cache_size_bytes
from .commands import CommandKind, MapCommand from .commands import CommandKind, MapCommand
from .exceptions import CoordinateError, OverlayNotFoundError from .exceptions import CoordinateError, OverlayNotFoundError
from .interaction import latlon_to_screen_in_state, screen_to_latlon_in_state
from .overlays import LayerState, MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay from .overlays import LayerState, MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
from .projection import latlon_to_world
from .providers import TileProvider, get_provider from .providers import TileProvider, get_provider
from .sizing import effective_draw_size
from .state import ( from .state import (
DirtyFlags, DirtyFlags,
configure_state, configure_state,
@@ -145,19 +148,37 @@ def fit_bounds(bounds: Bounds, *, map_tag: Tag | None = None) -> None:
(south_west, north_east) = bounds (south_west, north_east) = bounds
south, west = _validate_latlon(south_west[0], south_west[1]) south, west = _validate_latlon(south_west[0], south_west[1])
north, east = _validate_latlon(north_east[0], north_east[1]) north, east = _validate_latlon(north_east[0], north_east[1])
set_center((south + north) / 2.0, (west + east) / 2.0, map_tag=map_tag) south, north = min(south, north), max(south, north)
state = get_map_state(map_tag)
with state.lock:
width, height = effective_draw_size(state)
padding = 32
usable_width = max(1, width - padding * 2)
usable_height = max(1, height - padding * 2)
tile_size = state.provider.tile_size
target_zoom = state.min_zoom
for candidate_zoom in range(state.max_zoom, state.min_zoom - 1, -1):
west_x, north_y = latlon_to_world(north, west, candidate_zoom, tile_size)
east_x, south_y = latlon_to_world(south, east, candidate_zoom, tile_size)
world_size = tile_size * (2**candidate_zoom)
x_span = abs(east_x - west_x)
x_span = min(x_span, world_size - x_span)
y_span = abs(south_y - north_y)
if x_span <= usable_width and y_span <= usable_height:
target_zoom = candidate_zoom
break
set_view(center=((south + north) / 2.0, (west + east) / 2.0), zoom=target_zoom, map_tag=map_tag)
def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon: def screen_to_latlon(x: float, y: float, *, map_tag: Tag | None = None) -> LatLon:
_ = (x, y) state = get_map_state(map_tag)
return get_center(map_tag=map_tag) return screen_to_latlon_in_state(state, float(x), float(y))
def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point: def latlon_to_screen(lat: float, lon: float, *, map_tag: Tag | None = None) -> Point:
_validate_latlon(lat, lon) lat_value, lon_value = _validate_latlon(lat, lon)
state = get_map_state(map_tag) state = get_map_state(map_tag)
with state.lock: return latlon_to_screen_in_state(state, lat_value, lon_value)
return (state.measured_width / 2.0, state.measured_height / 2.0)
def add_marker( def add_marker(
@@ -533,5 +554,6 @@ def get_map_debug_state(*, map_tag: Tag | None = None) -> dict[str, Any]:
"pending_command_count": len(state.command_queue), "pending_command_count": len(state.command_queue),
"generation": state.generation, "generation": state.generation,
"active_drag": state.interaction.active_drag, "active_drag": state.interaction.active_drag,
"last_mouse_position": state.interaction.last_mouse_position,
"tiles": asdict(state.tile_manager.snapshot()), "tiles": asdict(state.tile_manager.snapshot()),
} }

View File

@@ -1 +1,46 @@
"""Draw layer bookkeeping helpers.""" """Draw layer bookkeeping helpers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from .types import Tag
@dataclass(frozen=True, slots=True)
class DrawLayerTags:
"""Internal Dear PyGui draw layer tags for one map."""
background: str
tiles: str
overlays: str
attribution: str
def draw_layer_tags(map_tag: Tag) -> DrawLayerTags:
"""Return stable internal draw layer tags for a map."""
return DrawLayerTags(
background=f"{map_tag}##layer-background",
tiles=f"{map_tag}##layer-tiles",
overlays=f"{map_tag}##layer-overlays",
attribution=f"{map_tag}##layer-attribution",
)
def ensure_draw_layers(dpg: Any, *, drawlist_tag: Tag, map_tag: Tag) -> DrawLayerTags:
"""Create draw layers if needed and return their tags."""
tags = draw_layer_tags(map_tag)
for layer_tag in (tags.background, tags.tiles, tags.overlays, tags.attribution):
if not dpg.does_item_exist(layer_tag):
dpg.add_draw_layer(parent=drawlist_tag, tag=layer_tag)
return tags
def clear_draw_layer(dpg: Any, layer_tag: Tag) -> None:
"""Clear one draw layer without touching sibling layers."""
if dpg.does_item_exist(layer_tag):
dpg.delete_item(layer_tag, children_only=True)

View File

@@ -3,9 +3,14 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from math import isfinite
from typing import Any
from .commands import CommandKind, MapCommand
from .projection import latlon_to_world, screen_to_world, world_to_latlon
from .sizing import effective_draw_size from .sizing import effective_draw_size
from .state import MapState from .state import DirtyFlags, MapState, mark_dirty
from .types import LatLon, Point
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@@ -34,3 +39,195 @@ def calculate_hit_rect(state: MapState, drawlist_pos: tuple[float, float]) -> Hi
width, height = effective_draw_size(state) width, height = effective_draw_size(state)
return HitRect(float(drawlist_pos[0]), float(drawlist_pos[1]), float(width), float(height)) return HitRect(float(drawlist_pos[0]), float(drawlist_pos[1]), float(width), float(height))
def screen_to_latlon_in_state(state: MapState, x: float, y: float) -> LatLon:
"""Convert map-local screen coordinates to latitude/longitude."""
with state.lock:
width, height = effective_draw_size(state)
center = state.center
zoom = state.zoom
tile_size = state.provider.tile_size
world_x, world_y = screen_to_world(
float(x),
float(y),
center=center,
zoom=zoom,
width=width,
height=height,
tile_size=tile_size,
)
return world_to_latlon(world_x, world_y, zoom, tile_size)
def latlon_to_screen_in_state(state: MapState, lat: float, lon: float) -> Point:
"""Convert latitude/longitude to map-local screen coordinates."""
with state.lock:
width, height = effective_draw_size(state)
center = state.center
zoom = state.zoom
tile_size = state.provider.tile_size
world_x, world_y = latlon_to_world(lat, lon, zoom, tile_size)
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
def pan_state_by_pixels(state: MapState, dx: float, dy: float) -> LatLon:
"""Pan the map by a mouse drag delta in screen pixels."""
with state.lock:
if dx == 0 and dy == 0:
return state.center
center = state.center
zoom = state.zoom
tile_size = state.provider.tile_size
world_size = tile_size * (2**zoom)
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
new_x = (center_x - dx) % world_size
new_y = min(max(center_y - dy, 0.0), float(world_size))
state.center = world_to_latlon(new_x, new_y, zoom, tile_size)
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
state.command_queue.put(
MapCommand(
kind=CommandKind.SET_VIEW,
map_tag=state.tag,
payload={"center": state.center},
)
)
return state.center
def zoom_state_at_screen_point(
state: MapState,
*,
screen_x: float,
screen_y: float,
delta: float,
) -> int:
"""Zoom the map around a map-local screen point where possible."""
if delta == 0 or not isfinite(delta):
with state.lock:
return state.zoom
with state.lock:
old_zoom = state.zoom
new_zoom = max(state.min_zoom, min(state.max_zoom, old_zoom + (1 if delta > 0 else -1)))
if new_zoom == old_zoom:
return old_zoom
width, height = effective_draw_size(state)
tile_size = state.provider.tile_size
anchor_latlon = screen_to_latlon_in_state(state, screen_x, screen_y)
anchor_x, anchor_y = latlon_to_world(
anchor_latlon[0],
anchor_latlon[1],
new_zoom,
tile_size,
)
world_size = tile_size * (2**new_zoom)
center_x = (anchor_x - (screen_x - width / 2.0)) % world_size
center_y = min(max(anchor_y - (screen_y - height / 2.0), 0.0), float(world_size))
state.zoom = new_zoom
state.center = world_to_latlon(center_x, center_y, new_zoom, tile_size)
mark_dirty(state, DirtyFlags.VIEW | DirtyFlags.TILES | DirtyFlags.OVERLAYS)
state.command_queue.put(
MapCommand(
kind=CommandKind.SET_VIEW,
map_tag=state.tag,
payload={"center": state.center, "zoom": state.zoom},
)
)
return state.zoom
def handle_mouse_down(state: MapState, mouse_pos: tuple[float, float], hit_rect: HitRect) -> None:
"""Begin a drag if the left mouse button starts inside the map rectangle."""
with state.lock:
if not state.is_visible or not hit_rect.contains(mouse_pos[0], mouse_pos[1]):
state.interaction.active_drag = False
state.interaction.last_mouse_position = None
return
state.interaction.active_drag = True
state.interaction.last_mouse_position = mouse_pos
def handle_mouse_drag(state: MapState, mouse_pos: tuple[float, float]) -> None:
"""Update center from a mouse drag event."""
with state.lock:
if not state.interaction.active_drag:
return
last_pos = state.interaction.last_mouse_position
state.interaction.last_mouse_position = mouse_pos
if last_pos is None:
return
pan_state_by_pixels(state, mouse_pos[0] - last_pos[0], mouse_pos[1] - last_pos[1])
def handle_mouse_release(state: MapState) -> None:
"""End any active drag."""
with state.lock:
state.interaction.active_drag = False
state.interaction.last_mouse_position = None
def handle_mouse_wheel(
state: MapState,
*,
mouse_pos: tuple[float, float],
wheel_delta: float,
hit_rect: HitRect,
) -> None:
"""Apply wheel zoom when the cursor is over the concrete map rectangle."""
if not hit_rect.contains(mouse_pos[0], mouse_pos[1]):
return
zoom_state_at_screen_point(
state,
screen_x=mouse_pos[0] - hit_rect.x,
screen_y=mouse_pos[1] - hit_rect.y,
delta=wheel_delta,
)
def update_drag_from_button_state(
state: MapState,
*,
mouse_pos: tuple[float, float],
hit_rect: HitRect,
is_down: bool,
) -> None:
"""Poll left-button state and keep drag interaction moving."""
with state.lock:
active_drag = state.interaction.active_drag
if not is_down:
if active_drag:
handle_mouse_release(state)
return
if active_drag:
handle_mouse_drag(state, mouse_pos)
return
if hit_rect.contains(mouse_pos[0], mouse_pos[1]):
handle_mouse_down(state, mouse_pos, hit_rect)
def wheel_delta_from_app_data(app_data: Any) -> float:
"""Normalize Dear PyGui mouse wheel callback data."""
if isinstance(app_data, int | float):
return float(app_data)
if isinstance(app_data, (list, tuple)) and app_data:
value = app_data[-1]
if isinstance(value, int | float):
return float(value)
return 0.0

View File

@@ -3,13 +3,18 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from dataclasses import replace
from typing import Any from typing import Any
from .commands import CommandKind, MapCommand from .commands import CommandKind, MapCommand
from .interaction import HitRect, calculate_hit_rect from .draw_layers import DrawLayerTags, clear_draw_layer, ensure_draw_layers
from .interaction import HitRect, calculate_hit_rect, update_drag_from_button_state
from .overlays import MarkerOverlay, Overlay, PolylineOverlay, TrajectoryOverlay
from .projection import latlon_to_world
from .sizing import SizeMeasurement, apply_size_measurement from .sizing import SizeMeasurement, apply_size_measurement
from .state import DirtyFlags, MapState from .state import DirtyFlags, MapState
from .tiles import Tile, VisibleTile from .tiles import Tile, VisibleTile
from .types import Color, LatLon
class MapRenderer: class MapRenderer:
@@ -18,10 +23,10 @@ class MapRenderer:
def __init__(self, state: MapState, dpg: Any) -> None: def __init__(self, state: MapState, dpg: Any) -> None:
self.state = state self.state = state
self._dpg = dpg self._dpg = dpg
self._background_tag = f"{state.tag}##background" self._layers: DrawLayerTags | None = None
self._attribution_tag = f"{state.tag}##attribution"
self.last_drained_commands: tuple[MapCommand, ...] = () self.last_drained_commands: tuple[MapCommand, ...] = ()
self.last_hit_rect: HitRect | None = None self.last_hit_rect: HitRect | None = None
self.last_overlay_count: int = 0
def schedule_next_frame(self) -> None: def schedule_next_frame(self) -> None:
"""Schedule this renderer to run on the next Dear PyGui frame.""" """Schedule this renderer to run on the next Dear PyGui frame."""
@@ -48,10 +53,12 @@ class MapRenderer:
commands = drain_renderer_commands(self.state) commands = drain_renderer_commands(self.state)
self.last_drained_commands = tuple(commands) self.last_drained_commands = tuple(commands)
self._update_size_from_dpg() self._update_size_from_dpg()
self._poll_mouse_drag()
with self.state.lock: with self.state.lock:
dirty = self.state.dirty dirty = self.state.dirty
should_draw = bool(dirty & (DirtyFlags.FULL | DirtyFlags.SIZE | DirtyFlags.TILES)) draw_tiles = bool(dirty & (DirtyFlags.SIZE | DirtyFlags.TILES | DirtyFlags.PROVIDER))
draw_overlays = bool(dirty & (DirtyFlags.SIZE | DirtyFlags.OVERLAYS))
visible = self.state.is_visible visible = self.state.is_visible
width = self.state.measured_width or self.state.last_nonzero_width width = self.state.measured_width or self.state.last_nonzero_width
height = self.state.measured_height or self.state.last_nonzero_height height = self.state.measured_height or self.state.last_nonzero_height
@@ -61,6 +68,12 @@ class MapRenderer:
zoom = self.state.zoom zoom = self.state.zoom
generation = self.state.generation generation = self.state.generation
cache_dir = self.state.cache_dir cache_dir = self.state.cache_dir
overlays = tuple(
_copy_overlay_for_render(overlay) for overlay in self.state.overlays.values()
)
layers = {
name: (layer.show, layer.z_index) for name, layer in self.state.layers.items()
}
self.state.dirty = DirtyFlags.NONE self.state.dirty = DirtyFlags.NONE
accepted_tiles = self.state.tile_manager.drain_results( accepted_tiles = self.state.tile_manager.drain_results(
@@ -84,7 +97,7 @@ class MapRenderer:
margin=self._prefetch_margin(), margin=self._prefetch_margin(),
) )
if visible and (should_draw or accepted_tiles): if visible and (draw_tiles or accepted_tiles):
self._draw_tile_layer( self._draw_tile_layer(
visible_tiles=visible_tiles, visible_tiles=visible_tiles,
width=width, width=width,
@@ -92,6 +105,16 @@ class MapRenderer:
attribution=provider_attribution, attribution=provider_attribution,
tile_size=provider.tile_size, tile_size=provider.tile_size,
) )
if visible and draw_overlays:
self._draw_overlay_layer(
overlays=overlays,
layers=layers,
center=center,
zoom=zoom,
width=width,
height=height,
tile_size=provider.tile_size,
)
def _update_size_from_dpg(self) -> None: def _update_size_from_dpg(self) -> None:
width, height = self._measure_child_content() width, height = self._measure_child_content()
@@ -104,10 +127,27 @@ class MapRenderer:
draw_width = update.effective_width draw_width = update.effective_width
draw_height = update.effective_height draw_height = update.effective_height
self._dpg.configure_item(self.state.drawlist_tag, width=draw_width, height=draw_height) self._dpg.configure_item(self.state.drawlist_tag, width=draw_width, height=draw_height)
draw_pos = tuple(float(value) for value in self._dpg.get_item_pos(self.state.drawlist_tag)) draw_pos = tuple(
float(value) for value in self._dpg.get_item_rect_min(self.state.drawlist_tag)
)
with self.state.lock: with self.state.lock:
self.last_hit_rect = calculate_hit_rect(self.state, (draw_pos[0], draw_pos[1])) self.last_hit_rect = calculate_hit_rect(self.state, (draw_pos[0], draw_pos[1]))
def _poll_mouse_drag(self) -> None:
if self.last_hit_rect is None:
return
try:
is_down = bool(self._dpg.is_mouse_button_down(self._dpg.mvMouseButton_Left))
mouse_pos = self._dpg.get_mouse_pos(local=False)
except Exception:
return
update_drag_from_button_state(
self.state,
mouse_pos=(float(mouse_pos[0]), float(mouse_pos[1])),
hit_rect=self.last_hit_rect,
is_down=is_down,
)
def _measure_child_content(self) -> tuple[int, int]: def _measure_child_content(self) -> tuple[int, int]:
try: try:
width, height = self._dpg.get_item_rect_size(self.state.child_window_tag) width, height = self._dpg.get_item_rect_size(self.state.child_window_tag)
@@ -126,12 +166,14 @@ class MapRenderer:
) -> None: ) -> None:
width = max(1, int(width)) width = max(1, int(width))
height = max(1, int(height)) height = max(1, int(height))
self._dpg.delete_item(self.state.drawlist_tag, children_only=True) layers = self._ensure_draw_layers()
clear_draw_layer(self._dpg, layers.background)
clear_draw_layer(self._dpg, layers.tiles)
clear_draw_layer(self._dpg, layers.attribution)
self._dpg.draw_rectangle( self._dpg.draw_rectangle(
(0, 0), (0, 0),
(width, height), (width, height),
parent=self.state.drawlist_tag, parent=layers.background,
tag=self._background_tag,
color=(54, 68, 78, 255), color=(54, 68, 78, 255),
fill=(29, 38, 45, 255), fill=(29, 38, 45, 255),
) )
@@ -145,19 +187,193 @@ class MapRenderer:
tile.texture_tag, tile.texture_tag,
(screen_x, screen_y), (screen_x, screen_y),
(screen_x + tile_size, screen_y + tile_size), (screen_x + tile_size, screen_y + tile_size),
parent=self.state.drawlist_tag, parent=layers.tiles,
) )
label = attribution or "Map tiles" label = attribution or "Map tiles"
text_y = max(28, height - 24) text_y = max(28, height - 24)
self._dpg.draw_text( self._dpg.draw_text(
(12, text_y), (12, text_y),
label, label,
parent=self.state.drawlist_tag, parent=layers.attribution,
tag=self._attribution_tag,
color=(172, 184, 192, 255), color=(172, 184, 192, 255),
size=12, size=12,
) )
def _draw_overlay_layer(
self,
*,
overlays: tuple[Overlay, ...],
layers: dict[str, tuple[bool, int]],
center: LatLon,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> None:
draw_layers = self._ensure_draw_layers()
clear_draw_layer(self._dpg, draw_layers.overlays)
if width <= 0 or height <= 0:
self.last_overlay_count = 0
return
center_x, center_y = latlon_to_world(center[0], center[1], zoom, tile_size)
visible_overlays = [
overlay
for overlay in overlays
if overlay.show and layers.get(overlay.layer, (True, 0))[0]
]
visible_overlays.sort(key=lambda overlay: layers.get(overlay.layer, (True, 0))[1])
drawn = 0
for overlay in visible_overlays:
if isinstance(overlay, MarkerOverlay):
self._draw_marker_overlay(
overlay, center_x, center_y, zoom, width, height, tile_size
)
drawn += 1
elif isinstance(overlay, PolylineOverlay):
self._draw_polyline_overlay(
overlay,
center_x,
center_y,
zoom,
width,
height,
tile_size,
draw_layers.overlays,
)
drawn += 1
elif isinstance(overlay, TrajectoryOverlay):
self._draw_trajectory_overlay(
overlay,
center_x,
center_y,
zoom,
width,
height,
tile_size,
draw_layers.overlays,
)
drawn += 1
self.last_overlay_count = drawn
def _draw_marker_overlay(
self,
overlay: MarkerOverlay,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> None:
layers = self._ensure_draw_layers()
x, y = _latlon_to_screen(
overlay.lat,
overlay.lon,
center_x,
center_y,
zoom,
width,
height,
tile_size,
)
radius = max(1.0, float(overlay.radius))
self._dpg.draw_circle(
(x, y),
radius,
parent=layers.overlays,
color=(255, 255, 255, 230),
fill=_rgba(overlay.color),
thickness=1.5,
segments=20,
)
if overlay.show_label and overlay.label:
self._dpg.draw_text(
(x + radius + 4.0, y - 7.0),
overlay.label,
parent=layers.overlays,
color=(245, 248, 250, 255),
size=12,
)
def _draw_polyline_overlay(
self,
overlay: PolylineOverlay,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
parent: str,
) -> None:
points = _screen_points(
overlay.points,
center_x=center_x,
center_y=center_y,
zoom=zoom,
width=width,
height=height,
tile_size=tile_size,
)
if len(points) < 2:
return
self._dpg.draw_polyline(
points,
parent=parent,
closed=overlay.closed,
color=_rgba(overlay.color),
thickness=max(1.0, float(overlay.thickness)),
)
def _draw_trajectory_overlay(
self,
overlay: TrajectoryOverlay,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
parent: str,
) -> None:
points = _screen_points(
overlay.points,
center_x=center_x,
center_y=center_y,
zoom=zoom,
width=width,
height=height,
tile_size=tile_size,
)
if len(points) >= 2:
self._dpg.draw_polyline(
points,
parent=parent,
color=_rgba(overlay.color),
thickness=max(1.0, float(overlay.thickness)),
)
if overlay.show_points and points:
stride = max(1, int(overlay.point_stride))
for point in points[::stride]:
self._dpg.draw_circle(
point,
2.5,
parent=parent,
color=_rgba(overlay.color),
fill=_rgba(overlay.color),
segments=8,
)
def _ensure_draw_layers(self) -> DrawLayerTags:
if self._layers is None or not self._dpg.does_item_exist(self._layers.overlays):
self._layers = ensure_draw_layers(
self._dpg,
drawlist_tag=self.state.drawlist_tag,
map_tag=self.state.tag,
)
return self._layers
def _ensure_texture(self, tile: Tile) -> None: def _ensure_texture(self, tile: Tile) -> None:
if tile.texture_tag is not None: if tile.texture_tag is not None:
return return
@@ -229,3 +445,43 @@ def make_frame_pump(state: MapState, dpg: Any) -> Callable[[], None]:
state.renderer = renderer state.renderer = renderer
renderer.schedule_next_frame() renderer.schedule_next_frame()
return renderer.render_frame return renderer.render_frame
def _rgba(color: Color) -> tuple[int, int, int, int]:
if len(color) == 3:
return (int(color[0]), int(color[1]), int(color[2]), 255)
return (int(color[0]), int(color[1]), int(color[2]), int(color[3]))
def _latlon_to_screen(
lat: float,
lon: float,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> tuple[float, float]:
world_x, world_y = latlon_to_world(lat, lon, zoom, tile_size)
return (world_x - center_x + width / 2.0, world_y - center_y + height / 2.0)
def _screen_points(
points: tuple[LatLon, ...],
*,
center_x: float,
center_y: float,
zoom: int,
width: int,
height: int,
tile_size: int,
) -> list[tuple[float, float]]:
return [
_latlon_to_screen(lat, lon, center_x, center_y, zoom, width, height, tile_size)
for lat, lon in points
]
def _copy_overlay_for_render(overlay: Overlay) -> Overlay:
return replace(overlay)

View File

@@ -47,7 +47,7 @@ class DpgMapConfig:
@dataclass(slots=True) @dataclass(slots=True)
class InteractionState: class InteractionState:
"""Logical interaction state until GUI interaction is implemented.""" """Logical mouse interaction state."""
active_drag: bool = False active_drag: bool = False
last_mouse_position: tuple[float, float] | None = None last_mouse_position: tuple[float, float] | None = None

View File

@@ -7,6 +7,14 @@ from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from .interaction import (
calculate_hit_rect,
handle_mouse_down,
handle_mouse_drag,
handle_mouse_release,
handle_mouse_wheel,
wheel_delta_from_app_data,
)
from .providers import TileProvider from .providers import TileProvider
from .renderer import MapRenderer from .renderer import MapRenderer
from .state import create_map_state, current_map_context from .state import create_map_state, current_map_context
@@ -63,6 +71,54 @@ def map_widget(
) )
dpg.add_texture_registry(tag=state.texture_registry_tag) dpg.add_texture_registry(tag=state.texture_registry_tag)
dpg.add_drawlist(1, 1, tag=state.drawlist_tag, parent=state.child_window_tag) dpg.add_drawlist(1, 1, tag=state.drawlist_tag, parent=state.child_window_tag)
dpg.add_handler_registry(tag=state.handler_registry_tag)
def _mouse_pos() -> tuple[float, float]:
pos = dpg.get_mouse_pos(local=False)
return (float(pos[0]), float(pos[1]))
def _hit_rect() -> Any:
draw_pos = tuple(float(value) for value in dpg.get_item_rect_min(state.drawlist_tag))
return calculate_hit_rect(state, (draw_pos[0], draw_pos[1]))
def _on_mouse_down(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, app_data, user_data)
handle_mouse_down(state, _mouse_pos(), _hit_rect())
def _on_mouse_drag(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, app_data, user_data)
handle_mouse_drag(state, _mouse_pos())
def _on_mouse_release(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, app_data, user_data)
handle_mouse_release(state)
def _on_mouse_wheel(sender: Any, app_data: Any, user_data: Any) -> None:
_ = (sender, user_data)
handle_mouse_wheel(
state,
mouse_pos=_mouse_pos(),
wheel_delta=wheel_delta_from_app_data(app_data),
hit_rect=_hit_rect(),
)
dpg.add_mouse_down_handler(
button=dpg.mvMouseButton_Left,
callback=_on_mouse_down,
parent=state.handler_registry_tag,
)
dpg.add_mouse_drag_handler(
button=dpg.mvMouseButton_Left,
threshold=0.0,
callback=_on_mouse_drag,
parent=state.handler_registry_tag,
)
dpg.add_mouse_release_handler(
button=dpg.mvMouseButton_Left,
callback=_on_mouse_release,
parent=state.handler_registry_tag,
)
dpg.add_mouse_wheel_handler(callback=_on_mouse_wheel, parent=state.handler_registry_tag)
renderer = MapRenderer(state, dpg) renderer = MapRenderer(state, dpg)
with state.lock: with state.lock:

View File

@@ -1,8 +1,20 @@
from __future__ import annotations from __future__ import annotations
from dpg_map.interaction import calculate_hit_rect import pytest
import dpg_map as dpgm
from dpg_map.commands import CommandKind
from dpg_map.interaction import (
calculate_hit_rect,
handle_mouse_down,
handle_mouse_drag,
handle_mouse_release,
handle_mouse_wheel,
pan_state_by_pixels,
update_drag_from_button_state,
)
from dpg_map.sizing import SizeMeasurement, apply_size_measurement from dpg_map.sizing import SizeMeasurement, apply_size_measurement
from dpg_map.state import create_map_state from dpg_map.state import DirtyFlags, create_map_state, get_map_state
def test_hit_rect_uses_effective_map_size() -> None: def test_hit_rect_uses_effective_map_size() -> None:
@@ -17,3 +29,81 @@ def test_hit_rect_uses_effective_map_size() -> None:
assert rect.height == 250 assert rect.height == 250
assert rect.contains(410.0, 270.0) assert rect.contains(410.0, 270.0)
assert not rect.contains(411.0, 270.0) assert not rect.contains(411.0, 270.0)
def test_pan_updates_center_and_queues_view_command() -> None:
state = create_map_state(tag="pan", center=(0.0, 0.0), zoom=3)
apply_size_measurement(state, SizeMeasurement(width=400, height=300, visible=True))
old_center = state.center
pan_state_by_pixels(state, 40.0, 0.0)
assert state.center != old_center
assert state.center[1] < old_center[1]
assert state.dirty & DirtyFlags.VIEW
drained = state.command_queue.drain()
assert drained[-1].kind is CommandKind.SET_VIEW
assert drained[-1].payload["center"] == state.center
def test_mouse_drag_uses_active_drag_state() -> None:
state = create_map_state(tag="drag", center=(0.0, 0.0), zoom=3)
apply_size_measurement(state, SizeMeasurement(width=400, height=300, visible=True))
rect = calculate_hit_rect(state, (10.0, 20.0))
handle_mouse_down(state, (20.0, 30.0), rect)
assert state.interaction.active_drag is True
handle_mouse_drag(state, (45.0, 30.0))
handle_mouse_release(state)
assert state.interaction.active_drag is False
assert state.interaction.last_mouse_position is None
assert state.center[1] < 0.0
def test_polled_drag_starts_and_moves_while_button_is_down() -> None:
state = create_map_state(tag="polled-drag", center=(0.0, 0.0), zoom=3)
apply_size_measurement(state, SizeMeasurement(width=400, height=300, visible=True))
rect = calculate_hit_rect(state, (10.0, 20.0))
update_drag_from_button_state(state, mouse_pos=(20.0, 30.0), hit_rect=rect, is_down=True)
update_drag_from_button_state(state, mouse_pos=(45.0, 30.0), hit_rect=rect, is_down=True)
update_drag_from_button_state(state, mouse_pos=(45.0, 30.0), hit_rect=rect, is_down=False)
assert state.interaction.active_drag is False
assert state.center[1] < 0.0
def test_wheel_zoom_keeps_cursor_latlon_stable() -> None:
state = create_map_state(tag="wheel", center=(47.9029, 1.9093), zoom=8)
apply_size_measurement(state, SizeMeasurement(width=800, height=600, visible=True))
rect = calculate_hit_rect(state, (100.0, 50.0))
before = dpgm.screen_to_latlon(300.0, 200.0, map_tag="wheel")
handle_mouse_wheel(state, mouse_pos=(400.0, 250.0), wheel_delta=1.0, hit_rect=rect)
after = dpgm.screen_to_latlon(300.0, 200.0, map_tag="wheel")
assert state.zoom == 9
assert after == pytest.approx(before, abs=1e-7)
def test_view_coordinate_helpers_roundtrip() -> None:
create_map_state(tag="view-roundtrip", center=(47.9029, 1.9093), zoom=12)
state = get_map_state("view-roundtrip")
apply_size_measurement(state, SizeMeasurement(width=800, height=600, visible=True))
screen = dpgm.latlon_to_screen(47.91, 1.92, map_tag="view-roundtrip")
latlon = dpgm.screen_to_latlon(*screen, map_tag="view-roundtrip")
assert latlon == pytest.approx((47.91, 1.92), abs=1e-7)
def test_fit_bounds_sets_center_and_zoom() -> None:
create_map_state(tag="fit", center=(0.0, 0.0), zoom=2)
state = get_map_state("fit")
apply_size_measurement(state, SizeMeasurement(width=800, height=600, visible=True))
dpgm.fit_bounds(((47.8, 1.8), (48.0, 2.0)), map_tag="fit")
assert dpgm.get_center(map_tag="fit") == pytest.approx((47.9, 1.9))
assert dpgm.get_zoom(map_tag="fit") > 2

View File

@@ -1,8 +1,11 @@
from __future__ import annotations from __future__ import annotations
from threading import Thread
import pytest import pytest
import dpg_map as dpgm import dpg_map as dpgm
from dpg_map.commands import CommandKind
from dpg_map.exceptions import CoordinateError from dpg_map.exceptions import CoordinateError
from dpg_map.overlays import TrajectoryOverlay from dpg_map.overlays import TrajectoryOverlay
from dpg_map.state import DirtyFlags, create_map_state, get_map_state from dpg_map.state import DirtyFlags, create_map_state, get_map_state
@@ -61,3 +64,39 @@ def test_layer_state_tracks_visibility_and_overlay_membership() -> None:
assert state.layers["fleet"].overlay_tags == set() assert state.layers["fleet"].overlay_tags == set()
assert "vehicle" not in state.overlays assert "vehicle" not in state.overlays
def test_threaded_marker_updates_coalesce_without_touching_view_or_drag_state() -> None:
create_map_state(tag="threaded-marker", center=(47.0, 2.0), zoom=9)
dpgm.add_marker("vehicle", lat=47.0, lon=2.0, map_tag="threaded-marker")
state = get_map_state("threaded-marker")
state.command_queue.drain()
state.dirty = DirtyFlags.NONE
state.interaction.active_drag = True
state.interaction.last_mouse_position = (100.0, 100.0)
before_center = state.center
before_zoom = state.zoom
def update_worker(offset: float) -> None:
for index in range(100):
dpgm.update_marker(
"vehicle",
lat=47.0 + offset,
lon=2.0 + index * 0.00001,
map_tag="threaded-marker",
)
threads = [Thread(target=update_worker, args=(worker * 0.0001,)) for worker in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
commands = state.command_queue.drain()
assert state.center == before_center
assert state.zoom == before_zoom
assert state.interaction.active_drag is True
assert state.interaction.last_mouse_position == (100.0, 100.0)
assert state.dirty == DirtyFlags.OVERLAYS
assert [command.kind for command in commands] == [CommandKind.UPDATE_OVERLAY]

View File

@@ -1,10 +1,50 @@
from __future__ import annotations from __future__ import annotations
from typing import Any
import dpg_map as dpgm
from dpg_map.commands import CommandKind, MapCommand from dpg_map.commands import CommandKind, MapCommand
from dpg_map.renderer import drain_renderer_commands from dpg_map.renderer import MapRenderer, drain_renderer_commands
from dpg_map.state import DirtyFlags, create_map_state from dpg_map.state import DirtyFlags, create_map_state
class FakeDpg:
def __init__(self) -> None:
self.items: set[str | int] = set()
self.deleted: list[tuple[str | int, bool]] = []
self.drawn: list[tuple[str, str | int]] = []
def does_item_exist(self, tag: str | int) -> bool:
return tag in self.items
def add_draw_layer(self, *, parent: str | int, tag: str | int) -> None:
_ = parent
self.items.add(tag)
def delete_item(self, tag: str | int, *, children_only: bool = False) -> None:
self.deleted.append((tag, children_only))
def draw_rectangle(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("rectangle", parent))
def draw_image(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("image", parent))
def draw_text(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("text", parent))
def draw_circle(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("circle", parent))
def draw_polyline(self, *args: Any, parent: str | int, **kwargs: Any) -> None:
_ = (args, kwargs)
self.drawn.append(("polyline", parent))
def test_renderer_command_drain_preserves_structural_order_and_coalesces() -> None: def test_renderer_command_drain_preserves_structural_order_and_coalesces() -> None:
state = create_map_state(tag="renderer-drain") state = create_map_state(tag="renderer-drain")
state.dirty = DirtyFlags.NONE state.dirty = DirtyFlags.NONE
@@ -28,3 +68,50 @@ def test_renderer_command_drain_preserves_structural_order_and_coalesces() -> No
assert commands[2].payload == {"tag": "a", "v": 2} assert commands[2].payload == {"tag": "a", "v": 2}
assert state.dirty & DirtyFlags.VIEW assert state.dirty & DirtyFlags.VIEW
assert state.dirty & DirtyFlags.OVERLAYS assert state.dirty & DirtyFlags.OVERLAYS
def test_overlay_draw_clears_only_overlay_layer() -> None:
state = create_map_state(tag="overlay-draw", center=(47.0, 2.0), zoom=8)
dpgm.add_marker(
"vehicle",
lat=47.0,
lon=2.0,
show_label=True,
label="Vehicle",
map_tag="overlay-draw",
)
fake = FakeDpg()
fake.items.add(state.drawlist_tag)
renderer = MapRenderer(state, fake)
renderer._draw_tile_layer(
visible_tiles=[], width=400, height=300, attribution="Tiles", tile_size=256
)
fake.deleted.clear()
with state.lock:
overlays = tuple(state.overlays.values())
layers = {name: (layer.show, layer.z_index) for name, layer in state.layers.items()}
renderer._draw_overlay_layer(
overlays=overlays,
layers=layers,
center=state.center,
zoom=state.zoom,
width=400,
height=300,
tile_size=256,
)
assert fake.deleted == [("overlay-draw##layer-overlays", True)]
assert ("circle", "overlay-draw##layer-overlays") in fake.drawn
assert ("text", "overlay-draw##layer-overlays") in fake.drawn
def test_overlay_update_drain_sets_only_overlay_dirty() -> None:
state = create_map_state(tag="overlay-dirty")
state.dirty = DirtyFlags.NONE
state.command_queue.put(MapCommand(CommandKind.UPDATE_OVERLAY, state.tag, {"tag": "a"}))
drain_renderer_commands(state)
assert state.dirty == DirtyFlags.OVERLAYS