Added multiple data graphs and stronger safety with threads
This commit is contained in:
103
src/dataflux/services/telemetry/__init__.py
Normal file
103
src/dataflux/services/telemetry/__init__.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# Copyright (C) 2026 Hector van der Aa <hector@h3cx.dev>
|
||||
# Copyright (C) 2026 Association Exergie <association.exergie@gmail.com>
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from queue import Empty
|
||||
from threading import local
|
||||
from dataflux.state import AppState, Buffers
|
||||
import time
|
||||
from pathlib import Path
|
||||
import csv
|
||||
|
||||
def hhmmsscc_to_day_seconds(value: int) -> int:
|
||||
hours = value // 1000000
|
||||
minutes = (value // 10000) % 100
|
||||
seconds = (value // 100) % 100
|
||||
|
||||
return (hours * 3600) + (minutes * 60) + seconds
|
||||
|
||||
def telemetry_worker(state: AppState):
|
||||
while state.telemetry_thread_running:
|
||||
if state.serial_thread_running == False:
|
||||
time.sleep(1)
|
||||
continue
|
||||
try:
|
||||
dataframe = state.packet_queue.get(timeout=0.1)
|
||||
except Empty:
|
||||
continue
|
||||
|
||||
state.latest_telemetry = dataframe
|
||||
state.telemetry_valid = True
|
||||
|
||||
|
||||
|
||||
with state.lock:
|
||||
state.raw_buffers.timestamp.append(hhmmsscc_to_day_seconds(dataframe["time_stamp"]))
|
||||
state.raw_buffers.speed.append(dataframe["speed"])
|
||||
state.raw_buffers.vbat.append(dataframe["vbat"])
|
||||
state.raw_buffers.teng.append(dataframe["teng"])
|
||||
state.raw_buffers.lat.append(dataframe["lat"])
|
||||
state.raw_buffers.lng.append(dataframe["lng"])
|
||||
|
||||
state.live_buffers_updated = True
|
||||
state.live_buffers.timestamp.clear()
|
||||
state.live_buffers.speed.clear()
|
||||
state.live_buffers.vbat.clear()
|
||||
state.live_buffers.teng.clear()
|
||||
state.live_buffers.lat.clear()
|
||||
state.live_buffers.lng.clear()
|
||||
|
||||
if not state.raw_buffers.timestamp:
|
||||
return
|
||||
|
||||
last_timestamp = state.raw_buffers.timestamp[-1]
|
||||
cutoff = last_timestamp - 30
|
||||
|
||||
i = len(state.raw_buffers.timestamp) - 1
|
||||
|
||||
while i >= 0 and state.raw_buffers.timestamp[i] >= cutoff:
|
||||
state.live_buffers.timestamp.append(state.raw_buffers.timestamp[i] - last_timestamp)
|
||||
state.live_buffers.speed.append(state.raw_buffers.speed[i])
|
||||
state.live_buffers.vbat.append(state.raw_buffers.vbat[i])
|
||||
state.live_buffers.teng.append(state.raw_buffers.teng[i])
|
||||
state.live_buffers.lat.append(state.raw_buffers.lat[i])
|
||||
state.live_buffers.lng.append(state.raw_buffers.lng[i])
|
||||
i -= 1
|
||||
|
||||
state.live_buffers.timestamp.reverse()
|
||||
state.live_buffers.speed.reverse()
|
||||
state.live_buffers.vbat.reverse()
|
||||
state.live_buffers.teng.reverse()
|
||||
state.live_buffers.lat.reverse()
|
||||
state.live_buffers.lng.reverse()
|
||||
|
||||
def buffer_dump(state: AppState, path: str):
|
||||
print(path)
|
||||
save_path = Path(path)
|
||||
if save_path.is_dir():
|
||||
save_path = save_path / "output.csv"
|
||||
|
||||
with state.lock:
|
||||
local_raw_buffers = Buffers(
|
||||
timestamp=list(state.raw_buffers.timestamp),
|
||||
speed=list(state.raw_buffers.speed),
|
||||
vbat=list(state.raw_buffers.vbat),
|
||||
teng=list(state.raw_buffers.teng),
|
||||
lat=list(state.raw_buffers.lat),
|
||||
lng=list(state.raw_buffers.lng),
|
||||
)
|
||||
|
||||
print(local_raw_buffers.timestamp)
|
||||
|
||||
with save_path.open("w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.writer(f)
|
||||
|
||||
writer.writerow(["timestamp", "speed", "vbat", "teng", "lat", "lng"])
|
||||
|
||||
for row in zip(local_raw_buffers.timestamp, local_raw_buffers.speed, local_raw_buffers.vbat, local_raw_buffers.teng, local_raw_buffers.lat, local_raw_buffers.lng):
|
||||
writer.writerow(row)
|
||||
|
||||
state.buffer_dump_thread = None
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user