From 91d841cf1c8f9fbdd9d5a0cf00883167f418648e Mon Sep 17 00:00:00 2001 From: Hector van der Aa Date: Tue, 19 May 2026 22:56:57 +0200 Subject: [PATCH] Added lap autosave --- src/dataflux/services/serial/__init__.py | 21 +++- src/dataflux/services/telemetry/__init__.py | 116 ++++++++++++++++++-- src/dataflux/state.py | 13 +++ src/dataflux/ui/routines/status.py | 6 +- 4 files changed, 142 insertions(+), 14 deletions(-) diff --git a/src/dataflux/services/serial/__init__.py b/src/dataflux/services/serial/__init__.py index 3c816ea..84e752d 100644 --- a/src/dataflux/services/serial/__init__.py +++ b/src/dataflux/services/serial/__init__.py @@ -12,7 +12,11 @@ from serial import Serial import serial.tools.list_ports import dearpygui.dearpygui as dpg from dataflux import telemetry_common -from dataflux.tags import TEXT_SERIAL_CONSOLE +from dataflux.tags import ( + STATUS_LORA_STATUS_BOX, + STATUS_SERIAL_STATUS_BOX, + TEXT_SERIAL_CONSOLE, +) import dataflux.telemetry_common.telemetry_common import dataflux.ui.routines.status import dataflux.ui.routines @@ -85,7 +89,9 @@ def lora_status_worker(state: AppState) -> None: duration = state.lora_status_queue.get_nowait() except Empty: continue - dataflux.ui.routines.status.flash_status_connection_status(duration) + dataflux.ui.routines.status.flash_status_connection_status( + duration, STATUS_LORA_STATUS_BOX + ) def serial_worker(state: AppState) -> None: @@ -104,6 +110,7 @@ def serial_worker(state: AppState) -> None: if line: text = line.decode("utf-8", errors="replace") state.serial_data_queue.put(text) + state.serial_status_queue.put(0.05) if port.writable(): try: @@ -112,15 +119,21 @@ def serial_worker(state: AppState) -> None: pass else: state.serial_data_queue.put(data + "\n") + state.serial_status_queue.put(0.05) port.write(data.encode("utf-8")) - print("Wrote data: " + data) disconnect_serial(state) dataflux.ui.routines.update_global_connection_status(state) def serial_status_worker(state: AppState) -> None: while state.serial_thread_running: - time.sleep(1) + try: + duration = state.serial_status_queue.get_nowait() + except Empty: + continue + dataflux.ui.routines.status.flash_status_connection_status( + duration, STATUS_SERIAL_STATUS_BOX + ) def lora_reader_worker(state: AppState) -> None: diff --git a/src/dataflux/services/telemetry/__init__.py b/src/dataflux/services/telemetry/__init__.py index d5a2109..6bbbacd 100644 --- a/src/dataflux/services/telemetry/__init__.py +++ b/src/dataflux/services/telemetry/__init__.py @@ -2,14 +2,46 @@ # Copyright (C) 2026 Association Exergie # SPDX-License-Identifier: GPL-3.0-or-later -from datetime import datetime +from bisect import bisect_left +from datetime import datetime, timezone from queue import Empty -from dataflux.state import AppState, Buffers +import stat +from tracemalloc import start +from dataflux.state import AppState, Buffers, LapInfo import time from pathlib import Path import csv +def cs_to_datetime(date_utc: datetime, timestamp_cs: int) -> datetime: + if not 0 <= timestamp_cs < 24 * 60 * 60 * 100: + raise ValueError("timestamp_cs must be within one day") + + hours, rem = divmod(timestamp_cs, 60 * 60 * 100) + minutes, rem = divmod(rem, 60 * 100) + seconds, cs = divmod(rem, 100) + + return datetime( + date_utc.year, + date_utc.month, + date_utc.day, + hours, + minutes, + seconds, + cs * 10_000, + tzinfo=timezone.utc, + ) + + +def datetime_to_cs(dt: datetime) -> int: + return ( + dt.hour * 60 * 60 * 100 + + dt.minute * 60 * 100 + + dt.second * 100 + + dt.microsecond // 10_000 + ) + + def telemetry_worker(state: AppState): while state.telemetry_thread_running: if not state.lora_thread_running: @@ -20,9 +52,15 @@ def telemetry_worker(state: AppState): except Empty: continue - if dataframe["type"] == "packet2": + now = datetime.now(timezone.utc) + time_stamp = datetime_to_cs(now) + if ( + dataframe["type"] == "packet2" + and abs(dataframe["time_stamp"] - time_stamp) <= 60 * 100 + ): state.latest_telemetry = dataframe state.telemetry_valid = True + with state.lock: state.raw_buffers.timestamp.append(dataframe["time_stamp"]) state.raw_buffers.speed.append(dataframe["speed"]) @@ -66,10 +104,29 @@ def telemetry_worker(state: AppState): state.live_buffers.lat.reverse() state.live_buffers.lng.reverse() elif dataframe["type"] == "packet3": - print(dataframe["type"]) - print(dataframe["start_time"]) - print(dataframe["duration"]) - print(dataframe["count"]) + start_time: int = dataframe["start_time"] + end_time: int = dataframe["duration"] + start_time + lap_count = dataframe["count"] + lap: LapInfo = LapInfo(start_time, end_time, lap_count) + state.laps.append(lap) + state.new_laps.put(lap) + + +def save_lap(state: AppState, start_time: int, end_time: int, count: int) -> None: + time_str = cs_to_datetime(datetime.now(timezone.utc), start_time).strftime( + "%m_%d_%Y_%H_%M" + ) + save_path = Path(state.autosave_path) / f"{time_str}_lap_{count}.csv" + data: Buffers = isolate_lap(state, start_time, end_time) + with save_path.open("w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + + writer.writerow(["timestamp", "speed", "vbat", "teng", "lat", "lng"]) + + for row in zip( + data.timestamp, data.speed, data.vbat, data.teng, data.lat, data.lng + ): + writer.writerow(row) def buffer_dump(state: AppState, path: str) -> None: @@ -107,6 +164,7 @@ def buffer_dump(state: AppState, path: str) -> None: def autosave_worker(state: AppState, path: str) -> None: output_dir = Path(path) + state.autosave_path = output_dir ctr: int = 0 while state.autosave_enabled: date_str = state.start_time.strftime("%m_%d_%Y_%H_%M") @@ -115,4 +173,48 @@ def autosave_worker(state: AppState, path: str) -> None: buffer_dump(state, save_path) print(f"Autosave {ctr} complete") ctr += 1 + + try: + new_lap: LapInfo = state.new_laps.get_nowait() + except Empty: + pass + else: + save_lap(state, new_lap.start_time, new_lap.end_time, new_lap.count) + time.sleep(30) + + +def closest_idx(values: list[int], target: int) -> int: + if not values: + raise ValueError("Cannot find closest index in an empty list") + + pos = bisect_left(values, target) + + if pos == 0: + return 0 + + if pos == len(values): + return len(values) - 1 + + before = pos - 1 + after = pos + + if abs(values[after] - target) < abs(values[before] - target): + return after + + return before + + +def isolate_lap(state: AppState, start_time: int, end_time: int) -> Buffers: + output: Buffers = Buffers() + start_idx = closest_idx(state.raw_buffers.timestamp, start_time) + end_idx = closest_idx(state.raw_buffers.timestamp, end_time) + + output.timestamp = state.raw_buffers.timestamp[start_idx : end_idx + 1] + output.speed = state.raw_buffers.speed[start_idx : end_idx + 1] + output.vbat = state.raw_buffers.vbat[start_idx : end_idx + 1] + output.teng = state.raw_buffers.teng[start_idx : end_idx + 1] + output.lat = state.raw_buffers.lat[start_idx : end_idx + 1] + output.lng = state.raw_buffers.lng[start_idx : end_idx + 1] + + return output diff --git a/src/dataflux/state.py b/src/dataflux/state.py index 95de55a..fb0773d 100644 --- a/src/dataflux/state.py +++ b/src/dataflux/state.py @@ -4,6 +4,7 @@ from dataclasses import dataclass, field from datetime import datetime +from pathlib import Path from threading import Lock, Thread from serial import Serial from queue import Queue @@ -21,6 +22,13 @@ class Buffers: lng: list[float] = field(default_factory=list) +@dataclass +class LapInfo: + start_time: int = field(default_factory=int) + end_time: int = field(default_factory=int) + count: int = field(default_factory=int) + + @dataclass class AppState: running: bool = True @@ -60,8 +68,13 @@ class AppState: live_buffers_updated: bool = False live_buffer_len: int = 30 + lap_lock: Lock = field(default_factory=Lock) + new_laps: Queue = field(default_factory=Queue) + laps: list[LapInfo] = field(default_factory=list) + buffer_dump_thread: Thread | None = None autosave_buffer_thread: Thread | None = None autosave_enabled: bool = False + autosave_path: Path | None = None lock: Lock = field(default_factory=Lock) diff --git a/src/dataflux/ui/routines/status.py b/src/dataflux/ui/routines/status.py index 06f0c27..dc90cce 100644 --- a/src/dataflux/ui/routines/status.py +++ b/src/dataflux/ui/routines/status.py @@ -32,7 +32,7 @@ def update_status_connection_status(state: AppState): dpg.set_value(STATUS_SERIAL_STATUS_TEXT, "Serial: Connected") -def flash_status_connection_status(duration: float) -> None: - dpg.bind_item_theme(STATUS_LORA_STATUS_BOX, THEME_STATUS_CONNECTED_BRIGHT) +def flash_status_connection_status(duration: float, tag: str) -> None: + dpg.bind_item_theme(tag, THEME_STATUS_CONNECTED_BRIGHT) sleep(duration) - dpg.bind_item_theme(STATUS_LORA_STATUS_BOX, THEME_STATUS_CONNECTED) + dpg.bind_item_theme(tag, THEME_STATUS_CONNECTED)