Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequencer.py with noisy chronobox_csv input #35

Merged
merged 2 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 171 additions & 99 deletions bin/sequencer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env python3

from typing import NamedTuple
from typing import NamedTuple, Optional
import argparse
import json
import polars as pl
import sys
import xml.etree.ElementTree as ET


Expand Down Expand Up @@ -115,7 +116,7 @@ class ChronoboxChannel(NamedTuple):
board: str
channel: int

def chronobox_channel(odb: dict, channel_name: str) -> ChronoboxChannel:
def chronobox_channel(odb: dict, channel_name: str) -> Optional[ChronoboxChannel]:
known_boards = ["cb01", "cb02", "cb03", "cb04"]
found_channels = []
for board in known_boards:
Expand All @@ -125,9 +126,12 @@ def chronobox_channel(odb: dict, channel_name: str) -> ChronoboxChannel:
found_channels.append(ChronoboxChannel(board, channel))

if len(found_channels) == 1:
return found_channels[0]
# https://github.com/pola-rs/polars/issues/15425
return found_channels[0]._asdict()
elif len(found_channels) == 0:
return None
else:
raise ValueError(f"error finding {channel_name} in ODB")
raise ValueError(f"multiple `{channel_name}` channels in ODB")

chronobox_df = (
pl.read_csv(args.chronobox_csv, comment_prefix="#")
Expand All @@ -139,111 +143,179 @@ def chronobox_channel(odb: dict, channel_name: str) -> ChronoboxChannel:
# First 2 lines are comments
json_string = open(args.odb_json).read().split("\n", 2)[2]
odb = json.loads(json_string)
# The sequencer XMLs are reliable to let us know if a sequence started
# running, but its timestamp is only good to within a few seconds. On the
# other hand, the Chronobox timestamps are good, but it has some noise/false
# positives (we detect leading edges, and it looks like the sequencer can't
# always keep the "SEQ_RUNNING" signal high, so it falls and rises again
# every now and then).
# Hence we need to match the sequencer XMLs to the Chronobox "SEQ_RUNNING"
# timestamps (filtering out false positives).
sequencer_df = sequencer_df.with_columns(
cb_start=pl.col("sequencer_name")
.map_elements(
lambda x: chronobox_channel(odb, start_dump_channel_name(x)),
return_dtype=pl.Struct({"board": pl.String, "channel": pl.Int64}),
skip_nulls=False,
)
.name.suffix_fields("_start"),
cb_stop=pl.col("sequencer_name")
.map_elements(
lambda x: chronobox_channel(odb, stop_dump_channel_name(x)),
return_dtype=pl.Struct({"board": pl.String, "channel": pl.Int64}),
skip_nulls=False,
)
.name.suffix_fields("_stop"),
cb_running=pl.col("sequencer_name")
.map_elements(
lambda x: chronobox_channel(odb, sequence_running_channel_name(x)),
return_dtype=pl.Struct({"board": pl.String, "channel": pl.Int64}),
skip_nulls=False,
)
.name.suffix_fields("_running"),
)
# Some times people randomly run A2 sequencers (e.g. atm, rct, etc) to do
# stuff like a random MCP dump. These sequencer signals are usually not
# connected to the Chronoboxes, so instead of crashing, we just ignore them.
# This doesn't affect at all the other sequences.
for (name,) in (
sequencer_df.filter(pl.any_horizontal(pl.all().is_null()))
.select("sequencer_name")
.unique()
.rows()
):
print(
f"Ignoring `{name}` sequencer (chronobox channels not found in ODB).",
file=sys.stderr,
)
sequencer_df = sequencer_df.drop_nulls().unnest("cb_start", "cb_stop", "cb_running")

result = pl.DataFrame()
for (name,), seq_df in sequencer_df.group_by(["sequencer_name"]):
seq_running = chronobox_channel(odb, sequence_running_channel_name(name))
start_dump = chronobox_channel(odb, start_dump_channel_name(name))
stop_dump = chronobox_channel(odb, stop_dump_channel_name(name))

seq_df = seq_df.sort("midas_timestamp")
cb_df = chronobox_df.filter(
(
pl.col("board").eq(seq_running.board)
& pl.col("channel").eq(seq_running.channel)
)
| (
pl.col("board").eq(start_dump.board)
& pl.col("channel").eq(start_dump.channel)
)
| (
pl.col("board").eq(stop_dump.board)
& pl.col("channel").eq(stop_dump.channel)
)
).sort("chronobox_time")
if cb_df["chronobox_time"].null_count() > 0:
raise ValueError(f"found null chronobox time for `{name}`")

seq_df = (
seq_df.with_row_index()
.join(
cb_df.with_row_index("running_index")
.filter(board=seq_running.board, channel=seq_running.channel)
.with_columns(
next_running_index=pl.col("running_index").shift(
-1, fill_value=cb_df.select(pl.len())
)
)
.with_row_index(),
on="index",
how="left",
)
.with_columns(
difference=pl.col("midas_timestamp") - pl.col("chronobox_time"),
)
.select(
"sequencer_name",
"running_index",
"next_running_index",
"event_table",
difference=(pl.col("difference") - pl.first("difference")).abs(),
)
matched_seq_running = False
cb_running_df = chronobox_df.join(
sequencer_df,
left_on=["board", "channel"],
right_on=["board_running", "channel_running"],
how="semi",
)
for (shift,) in (
sequencer_df.select(
pl.first("midas_timestamp", "board_running", "channel_running")
)
# Allow some difference between the MIDAS timestamp of the sequencer
# event and the Chronobox "SEQUENCE_RUNNING" hit.
if seq_df.filter(pl.col("difference") > 5).select(pl.len()).item() > 0:
raise ValueError(f"found large MIDAS timestamp difference for `{name}`")

cb_df = (
cb_df.with_row_index()
.filter(
(pl.col("board") != seq_running.board)
| (pl.col("channel") != seq_running.channel)
)
.join(
seq_df.explode("event_table").with_columns(
index=(
(
pl.col("running_index")
+ pl.col("running_index").cum_count()
).clip(upper_bound=pl.col("next_running_index"))
).over("running_index")
),
on="index",
how="left",
.join(
cb_running_df,
left_on=["board_running", "channel_running"],
right_on=["board", "channel"],
how="inner",
)
.select(shift=pl.col("midas_timestamp") - pl.col("chronobox_time").round())
.rows()
):
temp = (
sequencer_df.with_columns(
shifted_timestamp=pl.col("midas_timestamp") - shift
)
.unnest("event_table")
.select(
"sequencer_name",
pl.col("name").alias("event_name"),
pl.col("description").alias("event_description"),
"chronobox_time",
"board",
"channel",
.sort("shifted_timestamp")
.join_asof(
cb_running_df.sort("chronobox_time"),
left_on="shifted_timestamp",
right_on="chronobox_time",
by_left=["board_running", "channel_running"],
by_right=["board", "channel"],
strategy="nearest",
tolerance=5.0,
)
.rename({"chronobox_time": "start_time"})
)
if cb_df["event_name"].null_count() > 0:
raise ValueError(f"found extra Chronobox hits for `{name}`")
if (
cb_df.filter(board=start_dump.board, channel=start_dump.channel)
.select(pl.col("event_name").eq("startDump").all().not_())
.item()
) or (
cb_df.filter(board=stop_dump.board, channel=stop_dump.channel)
.select(pl.col("event_name").eq("stopDump").all().not_())
.item()
):
raise ValueError(f"found event and Chronobox mismatch for `{name}`")

cb_df = cb_df.select(

if temp.filter(pl.col("start_time").is_null()).height == 0:
matched_seq_running = True
sequencer_df = temp
break
if not matched_seq_running:
# This failure means that we couldn't match all sequencer XMLs to a
# "SEQ_RUNNING" hit in a Chronobox. To debug this, the easiest would be
# to print the `temp` DataFrame above for all attempted `shifts` and see
# what's going on. The most likely causes are:
# 1. The tolerance is too low. Just increase it. This is expected, the
# XML timestamps are not very accurate.
# 2. The "SEQ_RUNNING" hit for an XML is missing in the Chronobox data.
# Find out why and fix it. Maybe the cable is not connected.
# To fix this for a run that has already been taken, just add a fake
# Chronobox hit in the `chronobox_timestamps.csv` file by hand.
raise ValueError("failed to match `SEQ_RUNNING` signals")

sequencer_df = sequencer_df.with_columns(
next_start_time=pl.col("start_time")
.shift(-1, fill_value=float("inf"))
.over("sequencer_name"),
)

expected_df = (
sequencer_df.explode("event_table")
.unnest("event_table")
.select(
"sequencer_name",
"start_time",
event_name="name",
event_description=pl.col("description").str.strip_chars('"'),
index=pl.int_range(pl.len()).over("sequencer_name", "start_time"),
)
)

observed_df = (
pl.concat(
[
sequencer_df.join_where(
chronobox_df,
pl.col("chronobox_time") >= pl.col("start_time"),
pl.col("chronobox_time") < pl.col("next_start_time"),
pl.col("board") == pl.col("board_start"),
pl.col("channel") == pl.col("channel_start"),
).with_columns(event_name=pl.lit("startDump")),
sequencer_df.join_where(
chronobox_df,
pl.col("chronobox_time") >= pl.col("start_time"),
pl.col("chronobox_time") < pl.col("next_start_time"),
pl.col("board") == pl.col("board_stop"),
pl.col("channel") == pl.col("channel_stop"),
).with_columns(event_name=pl.lit("stopDump")),
]
)
.select(
"sequencer_name",
"start_time",
"event_name",
pl.col("event_description").str.strip_chars('"'),
"chronobox_time",
)
result.vstack(cb_df, in_place=True)
.sort("chronobox_time", maintain_order=True)
.with_columns(index=pl.int_range(pl.len()).over("sequencer_name", "start_time"))
)

result = (
observed_df.join(
expected_df,
on=["sequencer_name", "start_time", "event_name", "index"],
how="left",
)
.with_columns(
pl.when(pl.col("event_description").is_null().cum_sum() == 0)
.then("event_description")
.over("sequencer_name", "start_time")
)
.select("sequencer_name", "event_name", "event_description", "chronobox_time")
)
for (name,) in (
result.filter(pl.col("event_description").is_null())
.select("sequencer_name")
.unique()
.rows()
):
print(
f"Ignoring mismatched dump markers for `{name}` sequencer.",
file=sys.stderr,
)
result = result.drop_nulls()

result = result.sort("chronobox_time")
if args.output:
result.write_csv(args.output)
else:
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ cycler==0.12.1
# via matplotlib
fonttools==4.53.1
# via matplotlib
importlib-resources==6.4.4 ; python_full_version < '3.10'
importlib-resources==6.4.5 ; python_full_version < '3.10'
# via matplotlib
jsonpointer==3.0.0
# via -r requirements.in
kiwisolver==1.4.5
kiwisolver==1.4.7
# via matplotlib
matplotlib==3.9.2
# via -r requirements.in
Expand All @@ -23,7 +23,7 @@ packaging==24.1
# via matplotlib
pillow==10.4.0
# via matplotlib
polars==1.6.0
polars==1.7.1
# via -r requirements.in
pyparsing==3.1.4
# via matplotlib
Expand Down
Loading