# Copyright (C) 2026 Hector van der Aa # Copyright (C) 2026 Association Exergie # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations import csv import queue import struct import sys import threading from dataclasses import dataclass from datetime import datetime from pathlib import Path import serial from serial.tools import list_ports BAUD_RATE = 115_200 VALID_CHANNELS = frozenset((b"R", b"S")) @dataclass(frozen=True) class Sample: channel: str value: int class SerialReader(threading.Thread): def __init__( self, port: str, recorder: RecorderState, errors: queue.Queue[str], stop_event: threading.Event, ) -> None: super().__init__(daemon=True) self.port = port self.recorder = recorder self.errors = errors self.stop_event = stop_event def run(self) -> None: try: with serial.Serial(self.port, BAUD_RATE, timeout=0.1) as ser: ser.reset_input_buffer() while not self.stop_event.is_set(): channel = ser.read(1) if not channel: continue if channel not in VALID_CHANNELS: continue payload = ser.read(4) if len(payload) != 4: continue value = struct.unpack(" None: started_at = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir.mkdir(parents=True, exist_ok=True) self.files = { "R": (output_dir / f"{started_at}_R.csv").open( "w", newline="", buffering=1024 * 1024 ), "S": (output_dir / f"{started_at}_S.csv").open( "w", newline="", buffering=1024 * 1024 ), } self.writers = { channel: csv.writer(file) for channel, file in self.files.items() } self.counts = {"R": 0, "S": 0} @property def paths(self) -> list[Path]: return [Path(file.name) for file in self.files.values()] def write(self, sample: Sample) -> None: self.writers[sample.channel].writerow((sample.value,)) self.counts[sample.channel] += 1 def close(self) -> None: for file in self.files.values(): file.close() class RecorderState: def __init__(self, output_dir: Path) -> None: self.output_dir = output_dir self.recording: Recording | None = None self.lock = threading.Lock() def start(self) -> Recording | None: with self.lock: if self.recording is not None: return None self.recording = Recording(self.output_dir) return self.recording def stop(self) -> Recording | None: with self.lock: if self.recording is None: return None recording = self.recording self.recording = None recording.close() return recording def write(self, sample: Sample) -> None: with self.lock: if self.recording is not None: self.recording.write(sample) def choose_port() -> str | None: ports = list(list_ports.comports()) if not ports: print("No serial ports found.") return None print("Available serial ports:") for index, port in enumerate(ports, start=1): description = port.description or "serial port" print(f" {index}. {port.device} - {description}") while True: choice = input("Choose a port number, or q to quit: ").strip().lower() if choice == "q": return None try: index = int(choice) except ValueError: print("Please enter a port number.") continue if 1 <= index <= len(ports): return ports[index - 1].device print(f"Please choose a number between 1 and {len(ports)}.") def show_recording_summary(recording: Recording) -> None: print( "Stopped recording: " f"{recording.counts['R']} R samples, {recording.counts['S']} S samples" ) for path in recording.paths: print(f" {path}") def main() -> int: port = choose_port() if port is None: return 0 errors: queue.Queue[str] = queue.Queue() stop_event = threading.Event() recorder = RecorderState(Path("recordings")) reader = SerialReader(port, recorder, errors, stop_event) reader.start() print(f"Reading {port} at {BAUD_RATE} baud.") print("Commands: r = record, s = stop recording, q = quit") try: while True: try: error = errors.get_nowait() except queue.Empty: error = None if error is not None: print(error) return 1 command = input("> ").strip().lower() if command == "r": recording = recorder.start() if recording is None: print("Already recording.") continue print("Recording to:") for path in recording.paths: print(f" {path}") elif command == "s": recording = recorder.stop() if recording is None: print("Not recording.") continue show_recording_summary(recording) elif command == "q": recording = recorder.stop() if recording is not None: show_recording_summary(recording) return 0 elif command: print("Unknown command. Use r, s, or q.") except KeyboardInterrupt: print() recording = recorder.stop() if recording is not None: show_recording_summary(recording) return 130 finally: stop_event.set() reader.join(timeout=1.0) if __name__ == "__main__": sys.exit(main())