# Copyright (C) 2026 Hector van der Aa # Copyright (C) 2026 Pierre Barbier # Copyright (C) 2026 Association Exergie # SPDX-License-Identifier: GPL-3.0-or-later import pandas as pd from pathlib import Path import argparse from tqdm import tqdm def find_last_crank(df: pd.DataFrame, time_us: int) -> int | None: previous_crank_hits = df.loc[: time_us - 1] previous_crank_hits = previous_crank_hits[previous_crank_hits["crank"] == 1] if previous_crank_hits.empty: return None return previous_crank_hits.index[-1] def find_next_crank(df: pd.DataFrame, time_us: int) -> int | None: next_crank_hits = df.loc[time_us + 1 :] next_crank_hits = next_crank_hits[next_crank_hits["crank"] == 1] if next_crank_hits.empty: return None return next_crank_hits.index[0] def filter_data(file: Path) -> pd.DataFrame: df = pd.read_csv(file).set_index("time_us", drop=False) rows = [] last_crank = -1 last_crank_delta = -1 previous_crank = -1 last_cam = -1 cam_flag = 0 crank_flag = False for _, row in tqdm(df.iterrows(), total=len(df), desc="Derivative"): time_us: int = row["time_us"] crank: int = row["crank"] cam: int = row["cam"] c1 = 0 c2 = 0 if crank==1: d1 = time_us-c1 d2 = d1-(c1-c2) if crank_flag: rows.append({ "time_us": time_us, "d1": d1, "d2": d2, "ratio": d2/d1 }) else: crank_flag = True c2=c1 c1=time_us output = pd.DataFrame(rows) return output parser = argparse.ArgumentParser() parser.add_argument("directory", type=Path, help="Source data directory") args = parser.parse_args() directory: Path = args.directory if not directory.is_dir(): parser.error(f"{directory} is not a valid directory") print(f"Processing data in: {directory}") concat_files: list[Path] = [] for path in directory.glob("*.csv"): stem = path.stem try: base_name, channel = stem.rsplit("_", 1) except ValueError: print(f"Skipping badly named file: {path}") continue if channel != "trimmed": print(f"Skipping unknown file: {path}") continue concat_files.append(path) for file in concat_files: base_name, _ = file.stem.rsplit("_", 1) output = file.parent / f"{base_name}_derivative.csv" out_df = filter_data(file) out_df.to_csv(output)