-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Datajoint pipeline PR into main
branch
#351
Conversation
modify `subject` schema to align with pyrat - added pyrat sync routine
…re/aeon_mecha into datajoint_pipeline
Minor sciviz fix & add pyrat worker to docker-compose
…oint_pipeline_pyrat
Co-authored-by: JaerongA <jaerong.ahn@datajoint.com>
…7/aeon_mecha into datajoint_pipeline_pyrat
Added `lab_id` and `available` to pyrat subjects
…oint_pipeline_pyrat
update(tracking): raise error if no SLEAP data found
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (1)
- aeon/dj_pipeline/tracking.py (6)
Files not summarized due to errors (1)
- aeon/dj_pipeline/tracking.py (diff tokens exceeds limit)
Review comments generated (6)
- Review: 1
- LGTM: 5
Tips
Chat with CodeRabbit Bot (@coderabbitai
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitai
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignore
anywhere in the PR description to pause further reviews from the bot.
} | ||
) | ||
if part == anchor_part: | ||
class_likelihood = part_position.class_likelihood.values | ||
pose_identity_entries.append( | ||
{ | ||
**key, | ||
"object_id": obj_id, | ||
"timestamps": obj_position.index.to_pydatetime(), | ||
"position_x": obj_position.x.values, | ||
"position_y": obj_position.y.values, | ||
"area": obj_position.area.values, | ||
"identity_idx": class_idx, | ||
"identity_name": class_names[class_idx], | ||
"anchor_part": anchor_part, | ||
"identity_likelihood": class_likelihood, | ||
} | ||
) | ||
|
||
self.insert1(key) | ||
self.Object.insert(object_positions) | ||
|
||
@classmethod | ||
def get_object_position( | ||
cls, | ||
experiment_name, | ||
object_id, | ||
start, | ||
end, | ||
camera_name="FrameTop", | ||
tracking_paramset_id=0, | ||
in_meter=False, | ||
): | ||
table = ( | ||
cls.Object * acquisition.Chunk.proj("chunk_end") | ||
& {"experiment_name": experiment_name} | ||
& {"tracking_paramset_id": tracking_paramset_id} | ||
& (acquisition.ExperimentCamera & {"camera_description": camera_name}) | ||
) | ||
|
||
return _get_position( | ||
table, | ||
object_attr="object_id", | ||
object_name=object_id, | ||
start_attr="chunk_start", | ||
end_attr="chunk_end", | ||
start=start, | ||
end=end, | ||
fetch_attrs=["timestamps", "position_x", "position_y", "area"], | ||
attrs_to_scale=["position_x", "position_y"], | ||
scale_factor=pixel_scale if in_meter else 1, | ||
) | ||
self.PoseIdentity.insert(pose_identity_entries) | ||
self.Part.insert(part_entries) | ||
|
||
|
||
# ---------- HELPER ------------------ | ||
|
||
|
||
def compute_distance(position_df, target): | ||
def compute_distance(position_df, target, xcol="x", ycol="y"): | ||
assert len(target) == 2 | ||
return np.sqrt(np.square(position_df[["x", "y"]] - target).sum(axis=1)) | ||
return np.sqrt(np.square(position_df[[xcol, ycol]] - target).sum(axis=1)) | ||
|
||
|
||
def is_in_patch( | ||
def is_position_in_patch( | ||
position_df, patch_position, wheel_distance_travelled, patch_radius=0.2 | ||
): | ||
) -> pd.Series: | ||
distance_from_patch = compute_distance(position_df, patch_position) | ||
in_patch = distance_from_patch < patch_radius | ||
exit_patch = in_patch.astype(np.int8).diff() < 0 | ||
in_wheel = (wheel_distance_travelled.diff().rolling("1s").sum() > 1).reindex( | ||
position_df.index, method="pad" | ||
) | ||
time_slice = exit_patch.cumsum() | ||
return in_wheel.groupby(time_slice).apply(lambda x: x.cumsum()) > 0 | ||
return in_patch & (in_wheel.groupby(time_slice).apply(lambda x: x.cumsum()) > 0) | ||
|
||
|
||
def is_position_in_nest(position_df, nest_key): | ||
""" | ||
Given the session key and the position data - arrays of x and y | ||
return an array of boolean indicating whether or not a position is inside the nest | ||
def is_position_in_nest(position_df, nest_key, xcol="x", ycol="y") -> pd.Series: | ||
"""Given the session key and the position data - arrays of x and y | ||
return an array of boolean indicating whether or not a position is inside the nest. | ||
""" | ||
nest_vertices = list( | ||
zip(*(lab.ArenaNest.Vertex & nest_key).fetch("vertex_x", "vertex_y")) | ||
) | ||
nest_path = path.Path(nest_vertices) | ||
|
||
return nest_path.contains_points(position_df[["x", "y"]]) | ||
nest_vertices = list(zip(*(lab.ArenaNest.Vertex & nest_key).fetch("vertex_x", "vertex_y"))) | ||
nest_path = matplotlib.path.Path(nest_vertices) | ||
position_df["in_nest"] = nest_path.contains_points(position_df[[xcol, ycol]]) | ||
return position_df["in_nest"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few issues in this code block.
First, the PoseIdentity
class has an attribute identity_likelihood
which is of type longblob
. This might not be the best choice for storing likelihood values as it can lead to performance issues when querying or processing the data. It would be better to use a more suitable datatype like float
.
Second, the make
method in the SLEAPTracking
class is quite long and complex. It's doing a lot of different things, from loading data, finding config files, getting bodyparts and classes, to ingesting parts and classes. This violates the Single Responsibility Principle (SRP) and makes the method hard to understand, test, and maintain. It would be better to break this method down into smaller, more manageable methods each with a single responsibility.
Third, the is_position_in_patch
and is_position_in_nest
functions return a new column added to the input dataframe. This can lead to unexpected side effects if the caller doesn't realize that the input dataframe is being modified. It would be better to return a new dataframe instead of modifying the input dataframe.
Here are the suggested changes:
class PoseIdentity(dj.Part):
definition = """
-> master
identity_idx: smallint
<!-- This is an auto-generated comment by OSS CodeRabbit -->
fix first chunk starting before epoch start & disable automatic BlockDetection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (5)
- aeon/dj_pipeline/init.py (2)
- aeon/dj_pipeline/acquisition.py (12)
- aeon/dj_pipeline/analysis/block_analysis.py (1)
- aeon/dj_pipeline/populate/worker.py (1)
- aeon/dj_pipeline/utils/load_metadata.py (1)
Files not summarized due to errors (3)
- aeon/dj_pipeline/acquisition.py (diff tokens exceeds limit)
- aeon/dj_pipeline/analysis/block_analysis.py (diff tokens exceeds limit)
- aeon/dj_pipeline/utils/load_metadata.py (diff tokens exceeds limit)
Files skipped from review due to trivial changes (2)
- aeon/dj_pipeline/analysis/block_analysis.py (diff too large)
- aeon/dj_pipeline/utils/load_metadata.py (diff too large)
Review comments generated (12)
- Review: 6
- LGTM: 6
Tips
Chat with CodeRabbit Bot (@coderabbitai
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitai
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignore
anywhere in the PR description to pause further reviews from the bot.
repository_config = dj.config["custom"].get("repository_config", _default_repository_config) | ||
|
||
|
||
def get_schema_name(name): | ||
def get_schema_name(name) -> str: | ||
"""Return a schema name.""" | ||
return db_prefix + name | ||
|
||
|
||
def dict_to_uuid(key): | ||
""" | ||
Given a dictionary `key`, returns a hash string as UUID | ||
""" | ||
def dict_to_uuid(key) -> uuid.UUID: | ||
"""Given a dictionary `key`, returns a hash string as UUID.""" | ||
hashed = hashlib.md5() | ||
for k, v in sorted(key.items()): | ||
hashed.update(str(k).encode()) | ||
hashed.update(str(v).encode()) | ||
return uuid.UUID(hex=hashed.hexdigest()) | ||
|
||
|
||
def fetch_stream(query, drop_pk=True): | ||
""" | ||
Provided a query containing data from a Stream table, | ||
fetch and aggregate the data into one DataFrame indexed by "time" | ||
""" | ||
df = (query & "sample_count > 0").fetch(format="frame").reset_index() | ||
cols2explode = [ | ||
c for c in query.heading.secondary_attributes if query.heading.attributes[c].type == "longblob" | ||
] | ||
df = df.explode(column=cols2explode) | ||
cols2drop = ["sample_count"] + (query.primary_key if drop_pk else []) | ||
df.drop(columns=cols2drop, inplace=True, errors="ignore") | ||
df.rename(columns={"timestamps": "time"}, inplace=True) | ||
df.set_index("time", inplace=True) | ||
df.sort_index(inplace=True) | ||
return df | ||
|
||
|
||
try: | ||
from . import streams | ||
except ImportError: | ||
try: | ||
from .utils import streams_maker | ||
|
||
streams = dj.VirtualModule("streams", streams_maker.schema_name) | ||
except: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fetch_stream
function could be optimized for better performance. Currently, the function fetches all data from the query and then filters out rows where "sample_count" is greater than 0. This could potentially lead to unnecessary memory usage if the dataset is large. It would be more efficient to filter the data at the database level before fetching it.
- df = (query & "sample_count > 0").fetch(format="frame").reset_index()
+ df = query.fetch("sample_count > 0", format="frame").reset_index()
In the try-except block at the end of this hunk, there's a bare except clause which is not a good practice as it can catch unexpected errors and make debugging difficult. It's recommended to specify the exception type you're expecting to catch. If you're unsure about the exact exception type, you can use Exception
as a catch-all, but it's still better to specify the exact exceptions whenever possible.
- except:
+ except ImportError:
pass
Also, it's generally not a good idea to just pass
on an exception without logging it or handling it in some way. This can lead to silent failures that are hard to debug. Consider adding some logging or error handling here.
- except ImportError:
+ except ImportError as e:
+ print(f"An error occurred while importing: {e}")
pass
Lastly, the function dict_to_uuid
does not handle the case when the key values are not string. The encode()
method will raise an AttributeError
if the value is not a string. You should add a check to ensure that the values are strings before calling encode()
.
- hashed.update(str(v).encode())
+ hashed.update(str(v).encode() if isinstance(v, str) else str(v))
"exp0.2-r0": "CameraTop", | ||
} | ||
|
||
_device_schema_mapping = { | ||
"exp0.1-r0": aeon_schema.exp01, | ||
"social0-r1": aeon_schema.exp01, | ||
"exp0.2-r0": aeon_schema.exp02, | ||
} | ||
|
||
|
||
# ------------------- Type Lookup ------------------------ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -81,7 +84,7 @@ | |||
directory_type: varchar(16) | |||
""" | |||
|
|||
contents = zip(["raw", "preprocessing", "analysis", "quality-control"]) | |||
contents = zip(["raw", "processed", "qc"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
epoch_start = datetime.datetime.strptime(epoch_dir.name, "%Y-%m-%dT%H-%M-%S") | ||
# --- insert to Epoch --- | ||
epoch_key = {"experiment_name": experiment_name, "epoch_start": epoch_start} | ||
|
||
# skip over those already ingested | ||
if cls & epoch_key or epoch_key in epoch_list: | ||
# skip over those already ingested | ||
continue | ||
|
||
epoch_config, metadata_yml_filepath = None, None | ||
if experiment_name != "exp0.1-r0": | ||
metadata_yml_filepath = epoch_dir / "Metadata.yml" | ||
if metadata_yml_filepath.exists(): | ||
epoch_config = extract_epoch_metadata( | ||
experiment_name, metadata_yml_filepath | ||
) | ||
|
||
metadata_yml_filepath = epoch_config["metadata_file_path"] | ||
|
||
_, directory, repo_path = _match_experiment_directory( | ||
experiment_name, | ||
epoch_config["metadata_file_path"], | ||
raw_data_dirs, | ||
) | ||
epoch_config = { | ||
**epoch_config, | ||
**directory, | ||
"metadata_file_path": epoch_config["metadata_file_path"] | ||
.relative_to(repo_path) | ||
.as_posix(), | ||
} | ||
raw_data_dir, directory, _ = _match_experiment_directory( | ||
experiment_name, | ||
epoch_dir, | ||
raw_data_dirs, | ||
) | ||
|
||
# find previous epoch end-time | ||
previous_epoch_end = None | ||
previous_epoch_key = None | ||
if i > 0: | ||
previous_chunk = all_chunks.iloc[i - 1] | ||
previous_chunk_path = pathlib.Path(previous_chunk.path) | ||
previous_epoch_dir = pathlib.Path( | ||
previous_chunk_path.as_posix().split(device_name)[0] | ||
) | ||
previous_epoch_dir = pathlib.Path(previous_chunk_path.as_posix().split(device_name)[0]) | ||
previous_epoch_start = datetime.datetime.strptime( | ||
previous_epoch_dir.name, "%Y-%m-%dT%H-%M-%S" | ||
) | ||
previous_chunk_end = previous_chunk.name + datetime.timedelta( | ||
hours=io_api.CHUNK_DURATION | ||
) | ||
previous_chunk_end = previous_chunk.name + datetime.timedelta(hours=io_api.CHUNK_DURATION) | ||
previous_epoch_end = min(previous_chunk_end, epoch_start) | ||
previous_epoch_key = { | ||
"experiment_name": experiment_name, | ||
"epoch_start": previous_epoch_start, | ||
} | ||
|
||
with cls.connection.transaction: | ||
cls.insert1(epoch_key) | ||
if previous_epoch_end and not ( | ||
EpochEnd | ||
& { | ||
"experiment_name": experiment_name, | ||
"epoch_start": previous_epoch_start, | ||
} | ||
# insert new epoch | ||
cls.insert1( | ||
{**epoch_key, **directory, "epoch_dir": epoch_dir.relative_to(raw_data_dir).as_posix()} | ||
) | ||
epoch_list.append(epoch_key) | ||
|
||
# update previous epoch | ||
if ( | ||
previous_epoch_key | ||
and (cls & previous_epoch_key) | ||
and not (EpochEnd & previous_epoch_key) | ||
): | ||
# insert end-time for previous epoch | ||
EpochEnd.insert1( | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential issue with the datetime parsing at line 193. If the format of the date string in epoch_dir.name
changes, this could raise a ValueError. It would be safer to wrap this in a try-except block to handle potential errors.
- epoch_start = datetime.datetime.strptime(epoch_dir.name, "%Y-%m-%dT%H-%M-%S")
+ try:
+ epoch_start = datetime.datetime.strptime(epoch_dir.name, "%Y-%m-%dT%H-%M-%S")
+ except ValueError:
+ logger.error(f"Invalid date format in {epoch_dir.name}")
+ continue
@schema | ||
class EpochConfig(dj.Imported): | ||
definition = """ | ||
-> Epoch | ||
""" | ||
|
||
class Meta(dj.Part): | ||
definition = """ # Metadata for the configuration of a given epoch | ||
-> master | ||
--- | ||
bonsai_workflow: varchar(36) | ||
commit: varchar(64) # e.g. git commit hash of aeon_experiment used to generated this particular epoch | ||
source='': varchar(16) # e.g. aeon_experiment or aeon_acquisition (or others) | ||
metadata: longblob | ||
metadata_file_path: varchar(255) # path of the file, relative to the experiment repository | ||
""" | ||
|
||
class DeviceType(dj.Part): | ||
definition = """ # Device type(s) used in a particular acquisition epoch | ||
-> master | ||
device_type: varchar(36) | ||
""" | ||
|
||
class ActiveRegion(dj.Part): | ||
definition = """ | ||
-> master | ||
region_name: varchar(36) | ||
--- | ||
region_data: longblob | ||
""" | ||
|
||
def make(self, key): | ||
from aeon.dj_pipeline.utils import streams_maker | ||
from aeon.dj_pipeline.utils.load_metadata import ( | ||
extract_epoch_config, | ||
ingest_epoch_metadata, | ||
insert_device_types, | ||
) | ||
|
||
experiment_name = key["experiment_name"] | ||
devices_schema = getattr( | ||
aeon_schemas, | ||
(Experiment.DevicesSchema & {"experiment_name": experiment_name}).fetch1("devices_schema_name"), | ||
) | ||
|
||
dir_type, epoch_dir = (Epoch & key).fetch1("directory_type", "epoch_dir") | ||
data_dir = Experiment.get_data_directory(key, dir_type) | ||
metadata_yml_filepath = data_dir / epoch_dir / "Metadata.yml" | ||
|
||
epoch_config = extract_epoch_config(experiment_name, devices_schema, metadata_yml_filepath) | ||
epoch_config = { | ||
**epoch_config, | ||
"metadata_file_path": metadata_yml_filepath.relative_to(data_dir).as_posix(), | ||
} | ||
|
||
# Insert new entries for streams.DeviceType, streams.Device. | ||
insert_device_types( | ||
devices_schema, | ||
metadata_yml_filepath, | ||
) | ||
# Define and instantiate new devices/stream tables under `streams` schema | ||
streams_maker.main() | ||
# Insert devices' installation/removal/settings | ||
epoch_device_types = ingest_epoch_metadata(experiment_name, devices_schema, metadata_yml_filepath) | ||
|
||
self.insert1(key) | ||
self.Meta.insert1(epoch_config) | ||
self.DeviceType.insert(key | {"device_type": n} for n in epoch_device_types or {}) | ||
with metadata_yml_filepath.open("r") as f: | ||
metadata = json.load(f) | ||
self.ActiveRegion.insert( | ||
{**key, "region_name": k, "region_data": v} for k, v in metadata["ActiveRegion"].items() | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the make
method, there's a potential issue at line 318 where you're assuming that the Metadata.yml
file exists. This could raise a FileNotFoundError if the file is not found. It would be safer to check if the file exists before trying to open it.
- metadata_yml_filepath = data_dir / epoch_dir / "Metadata.yml"
+ metadata_yml_filepath = data_dir / epoch_dir / "Metadata.yml"
+ if not metadata_yml_filepath.exists():
+ logger.error(f"Metadata file not found at {metadata_yml_filepath}")
+ return
aeon/dj_pipeline/populate/worker.py
Outdated
"streams_worker", | ||
"WorkerLog", | ||
"ErrorLog", | ||
"logger", | ||
"AutomatedExperimentIngestion", | ||
] | ||
|
||
# ---- Some constants ---- | ||
logger = dj.logger | ||
worker_schema_name = db_prefix + "worker" | ||
|
||
# ---- Manage experiments for automated ingestion ---- | ||
|
||
schema = dj.Schema(worker_schema_name) | ||
|
||
|
||
@schema | ||
class AutomatedExperimentIngestion(dj.Manual): | ||
definition = """ # experiments to undergo automated ingestion | ||
-> acquisition.Experiment | ||
""" | ||
|
||
|
||
def ingest_epochs_chunks(): | ||
"""Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion.""" | ||
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") | ||
for experiment_name in experiment_names: | ||
acquisition.Epoch.ingest_epochs(experiment_name) | ||
acquisition.Chunk.ingest_chunks(experiment_name) | ||
|
||
|
||
def ingest_environment_visits(): | ||
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion.""" | ||
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") | ||
# analysis.ingest_environment_visits(experiment_names) | ||
pass | ||
|
||
|
||
# ---- Define worker(s) ---- | ||
# configure a worker to process `acquisition`-related tasks | ||
acquisition_worker = DataJointWorker( | ||
"acquisition_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=6, | ||
sleep_duration=1200, | ||
) | ||
acquisition_worker(ingest_epochs_chunks) | ||
acquisition_worker(acquisition.EpochConfig) | ||
acquisition_worker(acquisition.Environment) | ||
# acquisition_worker(ingest_environment_visits) | ||
# acquisition_worker(block_analysis.BlockDetection) | ||
|
||
# configure a worker to handle pyrat sync | ||
pyrat_worker = DataJointWorker( | ||
"pyrat_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=400, | ||
sleep_duration=30, | ||
) | ||
|
||
pyrat_worker(subject.CreatePyratIngestionTask) | ||
pyrat_worker(subject.PyratIngestion) | ||
pyrat_worker(subject.SubjectDetail) | ||
pyrat_worker(subject.PyratCommentWeightProcedure) | ||
|
||
# configure a worker to ingest all data streams | ||
streams_worker = DataJointWorker( | ||
"streams_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=50, | ||
sleep_duration=60, | ||
autoclear_error_patterns=["%BlockAnalysis Not Ready%"], | ||
) | ||
|
||
for attr in vars(streams).values(): | ||
if is_djtable(attr, dj.user_tables.AutoPopulate): | ||
streams_worker(attr, max_calls=10) | ||
|
||
streams_worker(qc.CameraQC, max_calls=10) | ||
streams_worker(tracking.SLEAPTracking, max_calls=10) | ||
|
||
# configure a worker to run the analysis tables | ||
analysis_worker = DataJointWorker( | ||
"analysis_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=6, | ||
sleep_duration=1200, | ||
) | ||
|
||
analysis_worker(block_analysis.BlockAnalysis, max_calls=6) | ||
analysis_worker(block_analysis.BlockPlots, max_calls=6) | ||
|
||
|
||
def get_workflow_operation_overview(): | ||
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview | ||
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks well-structured and follows good practices. However, there are a few areas that could be improved for better maintainability and readability.
-
The
ingest_epochs_chunks
andingest_environment_visits
functions (lines 39-52) both fetch the experiment names fromAutomatedExperimentIngestion
. This is a repeated operation and can be abstracted into a separate function to follow the DRY principle. -
The configuration of each worker (lines 56-111) is repetitive and could be abstracted into a function that takes parameters such as worker name, max idled cycle, sleep duration, etc. This would make the code more maintainable and easier to read.
-
The
get_workflow_operation_overview
function (lines 113-115) imports a function within its body. It's generally recommended to keep all imports at the top of the file for better visibility and organization.
Here are the suggested changes:
+ def get_experiment_names():
+ """Fetch experiment names from AutomatedExperimentIngestion."""
+ return AutomatedExperimentIngestion.fetch("experiment_name")
- experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
+ experiment_names = get_experiment_names()
...
+ def configure_worker(name, schema_name, prefix, idled_cycle, sleep_duration, tasks):
+ """Configure a DataJointWorker with given parameters and tasks."""
+ worker = DataJointWorker(
+ name,
+ worker_schema_name=schema_name,
+ db_prefix=prefix,
+ max_idled_cycle=idled_cycle,
+ sleep_duration=sleep_duration,
+ )
+ for task in tasks:
+ worker(task)
+ return worker
- acquisition_worker = DataJointWorker(
- "acquisition_worker",
- worker_schema_name=worker_schema_name,
- db_prefix=db_prefix,
- max_idled_cycle=6,
- sleep_duration=1200,
- )
- acquisition_worker(ingest_epochs_chunks)
- acquisition_worker(acquisition.EpochConfig)
- acquisition_worker(acquisition.Environment)
+ acquisition_tasks = [ingest_epochs_chunks, acquisition.EpochConfig, acquisition.Environment]
+ acquisition_worker = configure_worker("acquisition_worker", worker_schema_name, db_prefix, 6, 1200, acquisition_tasks)
...
- from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
+ import datajoint_utilities.dj_worker.utils as dj_utils
- return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])
+ return dj_utils.get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])
tests/conftest.py
Outdated
dj.config["custom"][ | ||
"database.prefix" | ||
] = f"u_{dj.config['database.user']}_testsuite_" | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we drop the return
statements in the fixtures that do not return anything?
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion.""" | ||
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") | ||
# analysis.ingest_environment_visits(experiment_names) | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass |
Improve block analysis - skip subjects with no position data, bugfix Block detection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (3)
- aeon/dj_pipeline/analysis/block_analysis.py (1)
- aeon/dj_pipeline/populate/worker.py (1)
- tests/conftest.py (1)
Files not summarized due to errors (1)
- aeon/dj_pipeline/analysis/block_analysis.py (diff tokens exceeds limit)
Files skipped from review due to trivial changes (1)
- aeon/dj_pipeline/analysis/block_analysis.py (diff too large)
Review comments generated (15)
- Review: 2
- LGTM: 13
Tips
Chat with CodeRabbit Bot (@coderabbitai
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitai
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignore
anywhere in the PR description to pause further reviews from the bot.
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview | ||
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function get_workflow_operation_overview()
is imported and used within another function of the same name. This could lead to confusion and potential errors in the future. Consider renaming the outer function to avoid naming conflicts.
- def get_workflow_operation_overview():
+ def retrieve_workflow_operation_overview():
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])
@pytest.fixture(autouse=True, scope="session") | ||
def dj_config(): | ||
"""If dj_local_config exists, load""" | ||
dj_config_fp = pathlib.Path("dj_local_conf.json") | ||
assert dj_config_fp.exists() | ||
dj.config.load(dj_config_fp) | ||
dj.config["safemode"] = False | ||
assert "custom" in dj.config | ||
dj.config["custom"][ | ||
"database.prefix" | ||
] = f"u_{dj.config['database.user']}_testsuite_" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dj_config
fixture is loading a local configuration file without any error handling. If the file does not exist or is corrupted, the test suite will fail with an unhandled exception. It would be better to add some error handling around this operation.
- dj_config_fp = pathlib.Path("dj_local_conf.json")
- assert dj_config_fp.exists()
- dj.config.load(dj_config_fp)
+ try:
+ dj_config_fp = pathlib.Path("dj_local_conf.json")
+ assert dj_config_fp.exists(), "Local config file does not exist."
+ dj.config.load(dj_config_fp)
+ except Exception as e:
+ raise Exception(f"Failed to load local config: {e}")
BlockSubjectPatch analysis, patch preference plot and SciViz update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (3)
- aeon/analysis/block_plotting.py (1)
- aeon/dj_pipeline/analysis/block_analysis.py (1)
- aeon/dj_pipeline/populate/worker.py (1)
Files ignored due to filter (1)
- aeon/dj_pipeline/webapps/sciviz/specsheet.yaml
Files not summarized due to errors (1)
- aeon/dj_pipeline/analysis/block_analysis.py (diff tokens exceeds limit)
Files skipped from review due to trivial changes (1)
- aeon/dj_pipeline/analysis/block_analysis.py (diff too large)
Review comments generated (2)
- Review: 1
- LGTM: 1
Tips
Chat with CodeRabbit Bot (@coderabbitai
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@coderabbitai
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@coderabbitai: ignore
anywhere in the PR description to pause further reviews from the bot.
"WorkerLog", | ||
"ErrorLog", | ||
"logger", | ||
"AutomatedExperimentIngestion", | ||
] | ||
|
||
# ---- Some constants ---- | ||
logger = dj.logger | ||
worker_schema_name = db_prefix + "worker" | ||
|
||
# ---- Manage experiments for automated ingestion ---- | ||
|
||
schema = dj.Schema(worker_schema_name) | ||
|
||
|
||
@schema | ||
class AutomatedExperimentIngestion(dj.Manual): | ||
definition = """ # experiments to undergo automated ingestion | ||
-> acquisition.Experiment | ||
""" | ||
|
||
|
||
def ingest_epochs_chunks(): | ||
"""Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion.""" | ||
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") | ||
for experiment_name in experiment_names: | ||
acquisition.Epoch.ingest_epochs(experiment_name) | ||
acquisition.Chunk.ingest_chunks(experiment_name) | ||
|
||
|
||
def ingest_environment_visits(): | ||
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion.""" | ||
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name") | ||
# analysis.ingest_environment_visits(experiment_names) | ||
pass | ||
|
||
|
||
# ---- Define worker(s) ---- | ||
# configure a worker to process `acquisition`-related tasks | ||
acquisition_worker = DataJointWorker( | ||
"acquisition_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=6, | ||
sleep_duration=1200, | ||
) | ||
acquisition_worker(ingest_epochs_chunks) | ||
acquisition_worker(acquisition.EpochConfig) | ||
acquisition_worker(acquisition.Environment) | ||
# acquisition_worker(ingest_environment_visits) | ||
acquisition_worker(block_analysis.BlockDetection) | ||
|
||
# configure a worker to handle pyrat sync | ||
pyrat_worker = DataJointWorker( | ||
"pyrat_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=400, | ||
sleep_duration=30, | ||
) | ||
|
||
pyrat_worker(subject.CreatePyratIngestionTask) | ||
pyrat_worker(subject.PyratIngestion) | ||
pyrat_worker(subject.SubjectDetail) | ||
pyrat_worker(subject.PyratCommentWeightProcedure) | ||
|
||
# configure a worker to ingest all data streams | ||
streams_worker = DataJointWorker( | ||
"streams_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=50, | ||
sleep_duration=60, | ||
autoclear_error_patterns=["%BlockAnalysis Not Ready%"], | ||
) | ||
|
||
for attr in vars(streams).values(): | ||
if is_djtable(attr, dj.user_tables.AutoPopulate): | ||
streams_worker(attr, max_calls=10) | ||
|
||
streams_worker(qc.CameraQC, max_calls=10) | ||
streams_worker(tracking.SLEAPTracking, max_calls=10) | ||
|
||
# configure a worker to run the analysis tables | ||
analysis_worker = DataJointWorker( | ||
"analysis_worker", | ||
worker_schema_name=worker_schema_name, | ||
db_prefix=db_prefix, | ||
max_idled_cycle=6, | ||
sleep_duration=1200, | ||
) | ||
|
||
analysis_worker(block_analysis.BlockAnalysis, max_calls=6) | ||
analysis_worker(block_analysis.BlockPlots, max_calls=6) | ||
analysis_worker(block_analysis.BlockSubjectAnalysis, max_calls=6) | ||
analysis_worker(block_analysis.BlockSubjectPlots, max_calls=6) | ||
|
||
def get_workflow_operation_overview(): | ||
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview | ||
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks well-structured and follows good practices. However, there are a few areas that could be improved for better maintainability and error handling.
- Error Handling: There is no exception handling in the
ingest_epochs_chunks
andingest_environment_visits
functions. If an error occurs during the ingestion process, it will stop the entire process. It would be better to handle exceptions so that the process can continue with the next experiment if one fails.
def ingest_epochs_chunks():
"""Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
try:
acquisition.Epoch.ingest_epochs(experiment_name)
acquisition.Chunk.ingest_chunks(experiment_name)
except Exception as e:
logger.error(f"Error ingesting epochs and chunks for {experiment_name}: {e}")
def ingest_environment_visits():
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
try:
# analysis.ingest_environment_visits(experiment_names)
pass
except Exception as e:
logger.error(f"Error ingesting environment visits for {experiment_name}: {e}")
- Maintainability: The worker configuration is repeated multiple times with different parameters. This could be refactored into a function to reduce redundancy and improve maintainability.
def configure_worker(name, max_idled_cycle, sleep_duration):
return DataJointWorker(
name,
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=max_idled_cycle,
sleep_duration=sleep_duration,
)
acquisition_worker = configure_worker("acquisition_worker", 6, 1200)
pyrat_worker = configure_worker("pyrat_worker", 400, 30)
streams_worker = configure_worker("streams_worker", 50, 60)
analysis_worker = configure_worker("analysis_worker", 6, 1200)
-
Modularity: The
ingest_epochs_chunks
andingest_environment_visits
functions are defined in the global scope. It would be better to encapsulate them within a class or module for better modularity. -
Performance: In the
for attr in vars(streams).values():
loop, theis_djtable
function is called for every attribute ofstreams
. Ifstreams
has many attributes, this could be inefficient. Consider filtering the attributes first before callingis_djtable
.
auto_populate_attrs = [attr for attr in vars(streams).values() if isinstance(attr, dj.user_tables.AutoPopulate)]
for attr in auto_populate_attrs:
streams_worker(attr, max_calls=10)
- Best Practices: The
pass
statement in theingest_environment_visits
function indicates that this function is not yet implemented. It's best to raise aNotImplementedError
to make it clear that this function needs to be implemented.
def ingest_environment_visits():
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
raise NotImplementedError("This function needs to be implemented.")
Major updates
social0.1
andsocial0.2
based on the new Device/Stream object modelSummary by CodeRabbit