Initial commit
This commit is contained in:
80
src/concat.py
Normal file
80
src/concat.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
|
||||
def concat_pair(r_file: Path, s_file: Path) -> pd.DataFrame:
|
||||
crank_df = pd.read_csv(r_file)
|
||||
cam_df = pd.read_csv(s_file)
|
||||
|
||||
crank_values = crank_df.iloc[:, 0]
|
||||
cam_values = cam_df.iloc[:, 0]
|
||||
|
||||
start_time = min(crank_values.iloc[0], cam_values.iloc[0])
|
||||
|
||||
crank_values = crank_values - start_time
|
||||
cam_values = cam_values - start_time
|
||||
|
||||
rows = max(crank_values.max(), cam_values.max()) + 1
|
||||
|
||||
final_df = pd.DataFrame(index=range(rows), columns=["crank", "cam"])
|
||||
|
||||
final_df["crank"] = final_df.index.isin(crank_values).astype(int)
|
||||
final_df["cam"] = final_df.index.isin(cam_values).astype(int)
|
||||
final_df.index.name = "time_us"
|
||||
|
||||
return final_df
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("directory", type=Path, help="Source data directory")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
directory: Path = args.directory
|
||||
|
||||
if not directory.is_dir():
|
||||
parser.error(f"{directory} is not a valid directory")
|
||||
|
||||
print(f"Processing data in: {directory}")
|
||||
|
||||
file_groups: dict[str, dict[str, Path]] = {}
|
||||
|
||||
for path in directory.glob("*.csv"):
|
||||
stem = path.stem
|
||||
|
||||
try:
|
||||
base_name, channel = stem.rsplit("_", 1)
|
||||
except ValueError:
|
||||
print(f"Skipping badly named file: {path}")
|
||||
continue
|
||||
|
||||
if channel not in ("R", "S"):
|
||||
print(f"Skipping unknown file: {path}")
|
||||
continue
|
||||
|
||||
file_groups.setdefault(base_name, {})[channel] = path
|
||||
|
||||
file_pairs: list[tuple[Path, Path]] = []
|
||||
|
||||
for base_name, files in sorted(file_groups.items()):
|
||||
if "R" not in files:
|
||||
print(f"Missing R file for {base_name}")
|
||||
continue
|
||||
|
||||
if "S" not in files:
|
||||
print(f"Missing S file for {base_name}")
|
||||
continue
|
||||
|
||||
file_pairs.append((files["R"], files["S"]))
|
||||
|
||||
for r_file, s_file in file_pairs:
|
||||
df = concat_pair(r_file, s_file)
|
||||
|
||||
base_name, _ = r_file.stem.rsplit("_", 1)
|
||||
output = r_file.parent / f"{base_name}_concat.csv"
|
||||
|
||||
df.to_csv(output)
|
||||
|
||||
|
||||
exit(0)
|
||||
Reference in New Issue
Block a user