Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2da49b50bf | |||
|
|
4e9277a023 | ||
|
|
018517b662 | ||
|
|
1452d231aa |
66
src/derivatives.py
Normal file
66
src/derivatives.py
Normal file
@@ -0,0 +1,66 @@
|
||||
# Copyright (C) 2026 Hector van der Aa <hector@h3cx.dev>
|
||||
# Copyright (C) 2026 Pierre Barbier <pierrebarbier741@gmail.com>
|
||||
# Copyright (C) 2026 Association Exergie <association.exergie@gmail.com>
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
def filter_data(file: Path) -> pd.DataFrame:
|
||||
df = pd.read_csv(file, usecols=["time_us", "crank", "cam"])
|
||||
|
||||
crank_df = df.loc[df["crank"] == 1, ["time_us"]].copy()
|
||||
|
||||
crank_df["d1"] = crank_df["time_us"].diff()
|
||||
crank_df["prev_d1"] = crank_df["d1"].shift(1)
|
||||
crank_df["d2"] = crank_df["d1"] - crank_df["prev_d1"]
|
||||
crank_df["ratio"] = crank_df["d2"] / crank_df["d1"]
|
||||
|
||||
crank_df = crank_df.dropna(subset=["d1", "d2", "ratio"])
|
||||
|
||||
return crank_df[["time_us", "d1", "d2", "ratio"]]
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("directory", type=Path, help="Source data directory")
|
||||
args = parser.parse_args()
|
||||
|
||||
directory: Path = args.directory
|
||||
|
||||
if not directory.is_dir():
|
||||
parser.error(f"{directory} is not a valid directory")
|
||||
|
||||
print(f"Processing data in: {directory}")
|
||||
|
||||
concat_files: list[Path] = []
|
||||
|
||||
for path in directory.glob("*.csv"):
|
||||
stem = path.stem
|
||||
|
||||
try:
|
||||
base_name, channel = stem.rsplit("_", 1)
|
||||
except ValueError:
|
||||
print(f"Skipping badly named file: {path}")
|
||||
continue
|
||||
|
||||
if channel != "trimmed":
|
||||
print(f"Skipping unknown file: {path}")
|
||||
continue
|
||||
|
||||
concat_files.append(path)
|
||||
|
||||
for file in tqdm(concat_files, desc="Files"):
|
||||
base_name, _ = file.stem.rsplit("_", 1)
|
||||
output = file.parent / f"{base_name}_derivative.csv"
|
||||
|
||||
out_df = filter_data(file)
|
||||
out_df.to_csv(output, index=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user