from __future__ import annotations from io import BytesIO from PIL import Image from dpg_map.providers import OSM from dpg_map.tiles import ( TileID, TileManager, TileResult, TileStatus, calculate_visible_tiles, decode_tile_image, ) def test_visible_tile_calculation_uses_provider_namespace() -> None: tiles = calculate_visible_tiles( center=(0.0, 0.0), zoom=2, width=256, height=256, provider=OSM, margin=0, ) assert tiles assert {tile.tile_id.provider_name for tile in tiles} == {"osm"} assert all(0 <= tile.tile_id.x <= 3 for tile in tiles) assert all(0 <= tile.tile_id.y <= 3 for tile in tiles) def test_tile_manager_ignores_stale_generation_results() -> None: manager = TileManager() stale = TileResult( TileID("osm", 1, 0, 0), generation=1, status=TileStatus.READY, width=1, height=1, pixels=(1.0, 1.0, 1.0, 1.0), source="disk", ) manager._result_queue.put(stale) accepted = manager.drain_results(generation=2, provider_name="osm") assert accepted == [] assert manager.snapshot().stale_results == 1 def test_memory_eviction_protects_visible_tiles() -> None: manager = TileManager(memory_cache_max_tiles=1) protected = TileID("osm", 1, 0, 0) evictable = TileID("osm", 1, 0, 1) with manager._lock: manager._visible_tile_ids = {protected} for tile_id in (protected, evictable): manager._result_queue.put( TileResult( tile_id, generation=1, status=TileStatus.READY, width=1, height=1, pixels=(1.0, 1.0, 1.0, 1.0), source="disk", ) ) manager.drain_results(generation=1, provider_name="osm") assert manager.get_ready_tile(protected) is not None assert manager.get_ready_tile(evictable) is None def test_decode_png_tile_image() -> None: image = Image.new("RGBA", (1, 1), (255, 0, 128, 255)) buffer = BytesIO() image.save(buffer, format="PNG") width, height, pixels = decode_tile_image(buffer.getvalue()) assert (width, height) == (1, 1) assert pixels == (1.0, 0.0, 128 / 255, 1.0)