diff --git a/src/derivatives.py b/src/derivatives.py new file mode 100644 index 0000000..dd18ba2 --- /dev/null +++ b/src/derivatives.py @@ -0,0 +1,95 @@ +# Copyright (C) 2026 Hector van der Aa +# Copyright (C) 2026 Pierre Barbier +# Copyright (C) 2026 Association Exergie +# SPDX-License-Identifier: GPL-3.0-or-later + +import pandas as pd +from pathlib import Path +import argparse +from tqdm import tqdm + + +def find_last_crank(df: pd.DataFrame, time_us: int) -> int | None: + previous_crank_hits = df.loc[: time_us - 1] + previous_crank_hits = previous_crank_hits[previous_crank_hits["crank"] == 1] + + if previous_crank_hits.empty: + return None + + return previous_crank_hits.index[-1] + + +def find_next_crank(df: pd.DataFrame, time_us: int) -> int | None: + next_crank_hits = df.loc[time_us + 1 :] + next_crank_hits = next_crank_hits[next_crank_hits["crank"] == 1] + + if next_crank_hits.empty: + return None + + return next_crank_hits.index[0] + + + + +def filter_data(file: Path) -> pd.DataFrame: + df = pd.read_csv(file).set_index("time_us", drop=False) + rows = [] + last_crank = -1 + last_crank_delta = -1 + previous_crank = -1 + last_cam = -1 + cam_flag = 0 + crank_flag = 0 + + for _, row in tqdm(df.iterrows(), total=len(df), desc="Derivative"): + time_us: int = row["time_us"] + crank: int = row["crank"] + cam: int = row["cam"] + c1 = 0 + c2 = 0 + if crank==1: + rows.append({ + "time_us": time_us, + "d1": time_us-c1, + "d2": (time_us-2*c1+c2)/(c1-c2), + }) + c2=c1 + c1=time_us + output = pd.DataFrame(rows) + return output + + +parser = argparse.ArgumentParser() +parser.add_argument("directory", type=Path, help="Source data directory") + +args = parser.parse_args() + +directory: Path = args.directory + +if not directory.is_dir(): + parser.error(f"{directory} is not a valid directory") + +print(f"Processing data in: {directory}") + +concat_files: list[Path] = [] + +for path in directory.glob("*.csv"): + stem = path.stem + + try: + base_name, channel = stem.rsplit("_", 1) + except ValueError: + print(f"Skipping badly named file: {path}") + continue + + if channel != "trimmed": + print(f"Skipping unknown file: {path}") + continue + + concat_files.append(path) + +for file in concat_files: + base_name, _ = file.stem.rsplit("_", 1) + output = file.parent / f"{base_name}_derivative.csv" + out_df = filter_data(file) + out_df.to_csv(output)