Initial commit

This commit is contained in:
2026-05-27 08:44:49 +02:00
commit 7e44092d3f
12 changed files with 1101 additions and 0 deletions

11
Python/.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
recordings/

1
Python/.python-version Normal file
View File

@@ -0,0 +1 @@
3.14

226
Python/main.py Normal file
View File

@@ -0,0 +1,226 @@
# Copyright (C) 2026 Hector van der Aa <hector@h3cx.dev>
# Copyright (C) 2026 Association Exergie <association.exergie@gmail.com>
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
import csv
import queue
import struct
import sys
import threading
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import serial
from serial.tools import list_ports
BAUD_RATE = 115_200
VALID_CHANNELS = frozenset((b"R", b"S"))
@dataclass(frozen=True)
class Sample:
channel: str
value: int
class SerialReader(threading.Thread):
def __init__(
self,
port: str,
recorder: RecorderState,
errors: queue.Queue[str],
stop_event: threading.Event,
) -> None:
super().__init__(daemon=True)
self.port = port
self.recorder = recorder
self.errors = errors
self.stop_event = stop_event
def run(self) -> None:
try:
with serial.Serial(self.port, BAUD_RATE, timeout=0.1) as ser:
ser.reset_input_buffer()
while not self.stop_event.is_set():
channel = ser.read(1)
if not channel:
continue
if channel not in VALID_CHANNELS:
continue
payload = ser.read(4)
if len(payload) != 4:
continue
value = struct.unpack("<I", payload)[0]
self.recorder.write(Sample(channel.decode("ascii"), value))
except serial.SerialException as exc:
self.errors.put(f"Serial error: {exc}")
class Recording:
def __init__(self, output_dir: Path) -> None:
started_at = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir.mkdir(parents=True, exist_ok=True)
self.files = {
"R": (output_dir / f"{started_at}_R.csv").open(
"w", newline="", buffering=1024 * 1024
),
"S": (output_dir / f"{started_at}_S.csv").open(
"w", newline="", buffering=1024 * 1024
),
}
self.writers = {
channel: csv.writer(file) for channel, file in self.files.items()
}
self.counts = {"R": 0, "S": 0}
@property
def paths(self) -> list[Path]:
return [Path(file.name) for file in self.files.values()]
def write(self, sample: Sample) -> None:
self.writers[sample.channel].writerow((sample.value,))
self.counts[sample.channel] += 1
def close(self) -> None:
for file in self.files.values():
file.close()
class RecorderState:
def __init__(self, output_dir: Path) -> None:
self.output_dir = output_dir
self.recording: Recording | None = None
self.lock = threading.Lock()
def start(self) -> Recording | None:
with self.lock:
if self.recording is not None:
return None
self.recording = Recording(self.output_dir)
return self.recording
def stop(self) -> Recording | None:
with self.lock:
if self.recording is None:
return None
recording = self.recording
self.recording = None
recording.close()
return recording
def write(self, sample: Sample) -> None:
with self.lock:
if self.recording is not None:
self.recording.write(sample)
def choose_port() -> str | None:
ports = list(list_ports.comports())
if not ports:
print("No serial ports found.")
return None
print("Available serial ports:")
for index, port in enumerate(ports, start=1):
description = port.description or "serial port"
print(f" {index}. {port.device} - {description}")
while True:
choice = input("Choose a port number, or q to quit: ").strip().lower()
if choice == "q":
return None
try:
index = int(choice)
except ValueError:
print("Please enter a port number.")
continue
if 1 <= index <= len(ports):
return ports[index - 1].device
print(f"Please choose a number between 1 and {len(ports)}.")
def show_recording_summary(recording: Recording) -> None:
print(
"Stopped recording: "
f"{recording.counts['R']} R samples, {recording.counts['S']} S samples"
)
for path in recording.paths:
print(f" {path}")
def main() -> int:
port = choose_port()
if port is None:
return 0
errors: queue.Queue[str] = queue.Queue()
stop_event = threading.Event()
recorder = RecorderState(Path("recordings"))
reader = SerialReader(port, recorder, errors, stop_event)
reader.start()
print(f"Reading {port} at {BAUD_RATE} baud.")
print("Commands: r = record, s = stop recording, q = quit")
try:
while True:
try:
error = errors.get_nowait()
except queue.Empty:
error = None
if error is not None:
print(error)
return 1
command = input("> ").strip().lower()
if command == "r":
recording = recorder.start()
if recording is None:
print("Already recording.")
continue
print("Recording to:")
for path in recording.paths:
print(f" {path}")
elif command == "s":
recording = recorder.stop()
if recording is None:
print("Not recording.")
continue
show_recording_summary(recording)
elif command == "q":
recording = recorder.stop()
if recording is not None:
show_recording_summary(recording)
return 0
elif command:
print("Unknown command. Use r, s, or q.")
except KeyboardInterrupt:
print()
recording = recorder.stop()
if recording is not None:
show_recording_summary(recording)
return 130
finally:
stop_event.set()
reader.join(timeout=1.0)
if __name__ == "__main__":
sys.exit(main())

9
Python/pyproject.toml Normal file
View File

@@ -0,0 +1,9 @@
[project]
name = "engine-recorder-dump"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"pyserial>=3.5",
]

23
Python/uv.lock generated Normal file
View File

@@ -0,0 +1,23 @@
version = 1
revision = 3
requires-python = ">=3.14"
[[package]]
name = "engine-recorder-dump"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "pyserial" },
]
[package.metadata]
requires-dist = [{ name = "pyserial", specifier = ">=3.5" }]
[[package]]
name = "pyserial"
version = "3.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb", size = 159125, upload-time = "2020-11-23T03:59:15.045Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" },
]