derivatives

This commit is contained in:
2026-05-27 14:27:02 +02:00
parent 7484d4e282
commit 1452d231aa

95
src/derivatives.py Normal file
View File

@@ -0,0 +1,95 @@
# Copyright (C) 2026 Hector van der Aa <hector@h3cx.dev>
# Copyright (C) 2026 Pierre Barbier <pierrebarbier741@gmail.com>
# Copyright (C) 2026 Association Exergie <association.exergie@gmail.com>
# SPDX-License-Identifier: GPL-3.0-or-later
import pandas as pd
from pathlib import Path
import argparse
from tqdm import tqdm
def find_last_crank(df: pd.DataFrame, time_us: int) -> int | None:
previous_crank_hits = df.loc[: time_us - 1]
previous_crank_hits = previous_crank_hits[previous_crank_hits["crank"] == 1]
if previous_crank_hits.empty:
return None
return previous_crank_hits.index[-1]
def find_next_crank(df: pd.DataFrame, time_us: int) -> int | None:
next_crank_hits = df.loc[time_us + 1 :]
next_crank_hits = next_crank_hits[next_crank_hits["crank"] == 1]
if next_crank_hits.empty:
return None
return next_crank_hits.index[0]
def filter_data(file: Path) -> pd.DataFrame:
df = pd.read_csv(file).set_index("time_us", drop=False)
rows = []
last_crank = -1
last_crank_delta = -1
previous_crank = -1
last_cam = -1
cam_flag = 0
crank_flag = 0
for _, row in tqdm(df.iterrows(), total=len(df), desc="Derivative"):
time_us: int = row["time_us"]
crank: int = row["crank"]
cam: int = row["cam"]
c1 = 0
c2 = 0
if crank==1:
rows.append({
"time_us": time_us,
"d1": time_us-c1,
"d2": (time_us-2*c1+c2)/(c1-c2),
})
c2=c1
c1=time_us
output = pd.DataFrame(rows)
return output
parser = argparse.ArgumentParser()
parser.add_argument("directory", type=Path, help="Source data directory")
args = parser.parse_args()
directory: Path = args.directory
if not directory.is_dir():
parser.error(f"{directory} is not a valid directory")
print(f"Processing data in: {directory}")
concat_files: list[Path] = []
for path in directory.glob("*.csv"):
stem = path.stem
try:
base_name, channel = stem.rsplit("_", 1)
except ValueError:
print(f"Skipping badly named file: {path}")
continue
if channel != "trimmed":
print(f"Skipping unknown file: {path}")
continue
concat_files.append(path)
for file in concat_files:
base_name, _ = file.stem.rsplit("_", 1)
output = file.parent / f"{base_name}_derivative.csv"
out_df = filter_data(file)
out_df.to_csv(output)