Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datajoint pipeline PR into main branch #351

Merged
merged 689 commits into from
Jun 12, 2024
Merged

Datajoint pipeline PR into main branch #351

merged 689 commits into from
Jun 12, 2024

Conversation

ttngu207
Copy link
Contributor

@ttngu207 ttngu207 commented Apr 10, 2024

Major updates

  • New devices schemas for social0.1 and social0.2 based on the new Device/Stream object model
    • new Pose stream class
  • Programmatic creation of new DJ tables for new data streams
  • Automated ingestion of all stream tables
    • worker configurations & CICD
  • Automated pulling and syncing of colony information from Pyrat
  • Block and Block analyses
  • Updated SciViz for experiments, blocks visualizations

Summary by CodeRabbit

  • New Feature: Introduced new schemas for devices, automated table creation, ingestion of stream tables, syncing colony information, block analyses, and SciViz updates.
  • Refactor: Updated data pipeline architecture, core tables, and operating procedures. Enhanced code readability through refactoring and improved error messages.
  • Bug Fix: Fixed issues with reindexing process in aeon/io/api.py by handling missing values.
  • Documentation: Added detailed descriptions of core tables and their relationships, instructions on auto ingestion and processing, and diagrams for pipeline overview and analysis.
  • Chore: Implemented a bash script for automated ingestion and management of workers using Docker-compose.
  • Style: Improved code style by adding type hints, docstrings, and following best practices in aeon/dj_pipeline/utils/paths.py.
  • Test: Validated schema migration process in aeon/dj_pipeline/scripts/clone_and_freeze_exp02.py.

JaerongA and others added 30 commits October 27, 2023 10:17
modify `subject` schema to align with pyrat - added pyrat sync routine
Minor sciviz fix & add pyrat worker to docker-compose
Co-authored-by: JaerongA <jaerong.ahn@datajoint.com>
Added `lab_id` and `available` to pyrat subjects
update(tracking): raise error if no SLEAP data found
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

Commits Files that changed from the base of the PR and between 91f4490 and df1cc61 commits.
Files selected (1)
  • aeon/dj_pipeline/tracking.py (6)
Files not summarized due to errors (1)
  • aeon/dj_pipeline/tracking.py (diff tokens exceeds limit)
Review comments generated (6)
  • Review: 1
  • LGTM: 5

Tips

Chat with Image description CodeRabbit Bot (@coderabbitai)

  • Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
  • Invite the bot into a review comment chain by tagging @coderabbitai in a reply.

Code suggestions

  • The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
  • You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.

Pausing incremental reviews

  • Add @coderabbitai: ignore anywhere in the PR description to pause further reviews from the bot.

Comment on lines +106 to +263
}
)
if part == anchor_part:
class_likelihood = part_position.class_likelihood.values
pose_identity_entries.append(
{
**key,
"object_id": obj_id,
"timestamps": obj_position.index.to_pydatetime(),
"position_x": obj_position.x.values,
"position_y": obj_position.y.values,
"area": obj_position.area.values,
"identity_idx": class_idx,
"identity_name": class_names[class_idx],
"anchor_part": anchor_part,
"identity_likelihood": class_likelihood,
}
)

self.insert1(key)
self.Object.insert(object_positions)

@classmethod
def get_object_position(
cls,
experiment_name,
object_id,
start,
end,
camera_name="FrameTop",
tracking_paramset_id=0,
in_meter=False,
):
table = (
cls.Object * acquisition.Chunk.proj("chunk_end")
& {"experiment_name": experiment_name}
& {"tracking_paramset_id": tracking_paramset_id}
& (acquisition.ExperimentCamera & {"camera_description": camera_name})
)

return _get_position(
table,
object_attr="object_id",
object_name=object_id,
start_attr="chunk_start",
end_attr="chunk_end",
start=start,
end=end,
fetch_attrs=["timestamps", "position_x", "position_y", "area"],
attrs_to_scale=["position_x", "position_y"],
scale_factor=pixel_scale if in_meter else 1,
)
self.PoseIdentity.insert(pose_identity_entries)
self.Part.insert(part_entries)


# ---------- HELPER ------------------


def compute_distance(position_df, target):
def compute_distance(position_df, target, xcol="x", ycol="y"):
assert len(target) == 2
return np.sqrt(np.square(position_df[["x", "y"]] - target).sum(axis=1))
return np.sqrt(np.square(position_df[[xcol, ycol]] - target).sum(axis=1))


def is_in_patch(
def is_position_in_patch(
position_df, patch_position, wheel_distance_travelled, patch_radius=0.2
):
) -> pd.Series:
distance_from_patch = compute_distance(position_df, patch_position)
in_patch = distance_from_patch < patch_radius
exit_patch = in_patch.astype(np.int8).diff() < 0
in_wheel = (wheel_distance_travelled.diff().rolling("1s").sum() > 1).reindex(
position_df.index, method="pad"
)
time_slice = exit_patch.cumsum()
return in_wheel.groupby(time_slice).apply(lambda x: x.cumsum()) > 0
return in_patch & (in_wheel.groupby(time_slice).apply(lambda x: x.cumsum()) > 0)


def is_position_in_nest(position_df, nest_key):
"""
Given the session key and the position data - arrays of x and y
return an array of boolean indicating whether or not a position is inside the nest
def is_position_in_nest(position_df, nest_key, xcol="x", ycol="y") -> pd.Series:
"""Given the session key and the position data - arrays of x and y
return an array of boolean indicating whether or not a position is inside the nest.
"""
nest_vertices = list(
zip(*(lab.ArenaNest.Vertex & nest_key).fetch("vertex_x", "vertex_y"))
)
nest_path = path.Path(nest_vertices)

return nest_path.contains_points(position_df[["x", "y"]])
nest_vertices = list(zip(*(lab.ArenaNest.Vertex & nest_key).fetch("vertex_x", "vertex_y")))
nest_path = matplotlib.path.Path(nest_vertices)
position_df["in_nest"] = nest_path.contains_points(position_df[[xcol, ycol]])
return position_df["in_nest"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

There are a few issues in this code block.

First, the PoseIdentity class has an attribute identity_likelihood which is of type longblob. This might not be the best choice for storing likelihood values as it can lead to performance issues when querying or processing the data. It would be better to use a more suitable datatype like float.

Second, the make method in the SLEAPTracking class is quite long and complex. It's doing a lot of different things, from loading data, finding config files, getting bodyparts and classes, to ingesting parts and classes. This violates the Single Responsibility Principle (SRP) and makes the method hard to understand, test, and maintain. It would be better to break this method down into smaller, more manageable methods each with a single responsibility.

Third, the is_position_in_patch and is_position_in_nest functions return a new column added to the input dataframe. This can lead to unexpected side effects if the caller doesn't realize that the input dataframe is being modified. It would be better to return a new dataframe instead of modifying the input dataframe.

Here are the suggested changes:

    class PoseIdentity(dj.Part):
        definition = """
         -> master
        identity_idx:           smallint


<!-- This is an auto-generated comment by OSS CodeRabbit -->

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

Commits Files that changed from the base of the PR and between df1cc61 and 5276cc1 commits.
Files selected (5)
  • aeon/dj_pipeline/init.py (2)
  • aeon/dj_pipeline/acquisition.py (12)
  • aeon/dj_pipeline/analysis/block_analysis.py (1)
  • aeon/dj_pipeline/populate/worker.py (1)
  • aeon/dj_pipeline/utils/load_metadata.py (1)
Files not summarized due to errors (3)
  • aeon/dj_pipeline/acquisition.py (diff tokens exceeds limit)
  • aeon/dj_pipeline/analysis/block_analysis.py (diff tokens exceeds limit)
  • aeon/dj_pipeline/utils/load_metadata.py (diff tokens exceeds limit)
Files skipped from review due to trivial changes (2)
  • aeon/dj_pipeline/analysis/block_analysis.py (diff too large)
  • aeon/dj_pipeline/utils/load_metadata.py (diff too large)
Review comments generated (12)
  • Review: 6
  • LGTM: 6

Tips

Chat with Image description CodeRabbit Bot (@coderabbitai)

  • Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
  • Invite the bot into a review comment chain by tagging @coderabbitai in a reply.

Code suggestions

  • The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
  • You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.

Pausing incremental reviews

  • Add @coderabbitai: ignore anywhere in the PR description to pause further reviews from the bot.

Comment on lines +16 to +59
repository_config = dj.config["custom"].get("repository_config", _default_repository_config)


def get_schema_name(name):
def get_schema_name(name) -> str:
"""Return a schema name."""
return db_prefix + name


def dict_to_uuid(key):
"""
Given a dictionary `key`, returns a hash string as UUID
"""
def dict_to_uuid(key) -> uuid.UUID:
"""Given a dictionary `key`, returns a hash string as UUID."""
hashed = hashlib.md5()
for k, v in sorted(key.items()):
hashed.update(str(k).encode())
hashed.update(str(v).encode())
return uuid.UUID(hex=hashed.hexdigest())


def fetch_stream(query, drop_pk=True):
"""
Provided a query containing data from a Stream table,
fetch and aggregate the data into one DataFrame indexed by "time"
"""
df = (query & "sample_count > 0").fetch(format="frame").reset_index()
cols2explode = [
c for c in query.heading.secondary_attributes if query.heading.attributes[c].type == "longblob"
]
df = df.explode(column=cols2explode)
cols2drop = ["sample_count"] + (query.primary_key if drop_pk else [])
df.drop(columns=cols2drop, inplace=True, errors="ignore")
df.rename(columns={"timestamps": "time"}, inplace=True)
df.set_index("time", inplace=True)
df.sort_index(inplace=True)
return df


try:
from . import streams
except ImportError:
try:
from .utils import streams_maker

streams = dj.VirtualModule("streams", streams_maker.schema_name)
except:
pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The fetch_stream function could be optimized for better performance. Currently, the function fetches all data from the query and then filters out rows where "sample_count" is greater than 0. This could potentially lead to unnecessary memory usage if the dataset is large. It would be more efficient to filter the data at the database level before fetching it.

- df = (query & "sample_count > 0").fetch(format="frame").reset_index()
+ df = query.fetch("sample_count > 0", format="frame").reset_index()

In the try-except block at the end of this hunk, there's a bare except clause which is not a good practice as it can catch unexpected errors and make debugging difficult. It's recommended to specify the exception type you're expecting to catch. If you're unsure about the exact exception type, you can use Exception as a catch-all, but it's still better to specify the exact exceptions whenever possible.

-     except:
+     except ImportError:
          pass

Also, it's generally not a good idea to just pass on an exception without logging it or handling it in some way. This can lead to silent failures that are hard to debug. Consider adding some logging or error handling here.

-     except ImportError:
+     except ImportError as e:
+         print(f"An error occurred while importing: {e}")
          pass

Lastly, the function dict_to_uuid does not handle the case when the key values are not string. The encode() method will raise an AttributeError if the value is not a string. You should add a check to ensure that the values are strings before calling encode().

-         hashed.update(str(v).encode())
+         hashed.update(str(v).encode() if isinstance(v, str) else str(v))

Comment on lines 25 to 30
"exp0.2-r0": "CameraTop",
}

_device_schema_mapping = {
"exp0.1-r0": aeon_schema.exp01,
"social0-r1": aeon_schema.exp01,
"exp0.2-r0": aeon_schema.exp02,
}


# ------------------- Type Lookup ------------------------

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The removal of _device_schema_mapping might cause issues if it was used elsewhere in the code. Please ensure that this change doesn't break any dependencies.

@@ -81,7 +84,7 @@
directory_type: varchar(16)
"""

contents = zip(["raw", "preprocessing", "analysis", "quality-control"])
contents = zip(["raw", "processed", "qc"])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The change in contents from ["raw", "preprocessing", "analysis", "quality-control"] to ["raw", "processed", "qc"] may affect other parts of the code that rely on these specific directory types. Ensure that all references to these directory types are updated accordingly.

Comment on lines +193 to 238
epoch_start = datetime.datetime.strptime(epoch_dir.name, "%Y-%m-%dT%H-%M-%S")
# --- insert to Epoch ---
epoch_key = {"experiment_name": experiment_name, "epoch_start": epoch_start}

# skip over those already ingested
if cls & epoch_key or epoch_key in epoch_list:
# skip over those already ingested
continue

epoch_config, metadata_yml_filepath = None, None
if experiment_name != "exp0.1-r0":
metadata_yml_filepath = epoch_dir / "Metadata.yml"
if metadata_yml_filepath.exists():
epoch_config = extract_epoch_metadata(
experiment_name, metadata_yml_filepath
)

metadata_yml_filepath = epoch_config["metadata_file_path"]

_, directory, repo_path = _match_experiment_directory(
experiment_name,
epoch_config["metadata_file_path"],
raw_data_dirs,
)
epoch_config = {
**epoch_config,
**directory,
"metadata_file_path": epoch_config["metadata_file_path"]
.relative_to(repo_path)
.as_posix(),
}
raw_data_dir, directory, _ = _match_experiment_directory(
experiment_name,
epoch_dir,
raw_data_dirs,
)

# find previous epoch end-time
previous_epoch_end = None
previous_epoch_key = None
if i > 0:
previous_chunk = all_chunks.iloc[i - 1]
previous_chunk_path = pathlib.Path(previous_chunk.path)
previous_epoch_dir = pathlib.Path(
previous_chunk_path.as_posix().split(device_name)[0]
)
previous_epoch_dir = pathlib.Path(previous_chunk_path.as_posix().split(device_name)[0])
previous_epoch_start = datetime.datetime.strptime(
previous_epoch_dir.name, "%Y-%m-%dT%H-%M-%S"
)
previous_chunk_end = previous_chunk.name + datetime.timedelta(
hours=io_api.CHUNK_DURATION
)
previous_chunk_end = previous_chunk.name + datetime.timedelta(hours=io_api.CHUNK_DURATION)
previous_epoch_end = min(previous_chunk_end, epoch_start)
previous_epoch_key = {
"experiment_name": experiment_name,
"epoch_start": previous_epoch_start,
}

with cls.connection.transaction:
cls.insert1(epoch_key)
if previous_epoch_end and not (
EpochEnd
& {
"experiment_name": experiment_name,
"epoch_start": previous_epoch_start,
}
# insert new epoch
cls.insert1(
{**epoch_key, **directory, "epoch_dir": epoch_dir.relative_to(raw_data_dir).as_posix()}
)
epoch_list.append(epoch_key)

# update previous epoch
if (
previous_epoch_key
and (cls & previous_epoch_key)
and not (EpochEnd & previous_epoch_key)
):
# insert end-time for previous epoch
EpochEnd.insert1(
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

There's a potential issue with the datetime parsing at line 193. If the format of the date string in epoch_dir.name changes, this could raise a ValueError. It would be safer to wrap this in a try-except block to handle potential errors.

- epoch_start = datetime.datetime.strptime(epoch_dir.name, "%Y-%m-%dT%H-%M-%S")
+ try:
+     epoch_start = datetime.datetime.strptime(epoch_dir.name, "%Y-%m-%dT%H-%M-%S")
+ except ValueError:
+     logger.error(f"Invalid date format in {epoch_dir.name}")
+     continue

Comment on lines +271 to +345
@schema
class EpochConfig(dj.Imported):
definition = """
-> Epoch
"""

class Meta(dj.Part):
definition = """ # Metadata for the configuration of a given epoch
-> master
---
bonsai_workflow: varchar(36)
commit: varchar(64) # e.g. git commit hash of aeon_experiment used to generated this particular epoch
source='': varchar(16) # e.g. aeon_experiment or aeon_acquisition (or others)
metadata: longblob
metadata_file_path: varchar(255) # path of the file, relative to the experiment repository
"""

class DeviceType(dj.Part):
definition = """ # Device type(s) used in a particular acquisition epoch
-> master
device_type: varchar(36)
"""

class ActiveRegion(dj.Part):
definition = """
-> master
region_name: varchar(36)
---
region_data: longblob
"""

def make(self, key):
from aeon.dj_pipeline.utils import streams_maker
from aeon.dj_pipeline.utils.load_metadata import (
extract_epoch_config,
ingest_epoch_metadata,
insert_device_types,
)

experiment_name = key["experiment_name"]
devices_schema = getattr(
aeon_schemas,
(Experiment.DevicesSchema & {"experiment_name": experiment_name}).fetch1("devices_schema_name"),
)

dir_type, epoch_dir = (Epoch & key).fetch1("directory_type", "epoch_dir")
data_dir = Experiment.get_data_directory(key, dir_type)
metadata_yml_filepath = data_dir / epoch_dir / "Metadata.yml"

epoch_config = extract_epoch_config(experiment_name, devices_schema, metadata_yml_filepath)
epoch_config = {
**epoch_config,
"metadata_file_path": metadata_yml_filepath.relative_to(data_dir).as_posix(),
}

# Insert new entries for streams.DeviceType, streams.Device.
insert_device_types(
devices_schema,
metadata_yml_filepath,
)
# Define and instantiate new devices/stream tables under `streams` schema
streams_maker.main()
# Insert devices' installation/removal/settings
epoch_device_types = ingest_epoch_metadata(experiment_name, devices_schema, metadata_yml_filepath)

self.insert1(key)
self.Meta.insert1(epoch_config)
self.DeviceType.insert(key | {"device_type": n} for n in epoch_device_types or {})
with metadata_yml_filepath.open("r") as f:
metadata = json.load(f)
self.ActiveRegion.insert(
{**key, "region_name": k, "region_data": v} for k, v in metadata["ActiveRegion"].items()
)


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

In the make method, there's a potential issue at line 318 where you're assuming that the Metadata.yml file exists. This could raise a FileNotFoundError if the file is not found. It would be safer to check if the file exists before trying to open it.

- metadata_yml_filepath = data_dir / epoch_dir / "Metadata.yml"
+ metadata_yml_filepath = data_dir / epoch_dir / "Metadata.yml"
+ if not metadata_yml_filepath.exists():
+     logger.error(f"Metadata file not found at {metadata_yml_filepath}")
+     return

Comment on lines 1 to 115
"streams_worker",
"WorkerLog",
"ErrorLog",
"logger",
"AutomatedExperimentIngestion",
]

# ---- Some constants ----
logger = dj.logger
worker_schema_name = db_prefix + "worker"

# ---- Manage experiments for automated ingestion ----

schema = dj.Schema(worker_schema_name)


@schema
class AutomatedExperimentIngestion(dj.Manual):
definition = """ # experiments to undergo automated ingestion
-> acquisition.Experiment
"""


def ingest_epochs_chunks():
"""Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
acquisition.Epoch.ingest_epochs(experiment_name)
acquisition.Chunk.ingest_chunks(experiment_name)


def ingest_environment_visits():
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
# analysis.ingest_environment_visits(experiment_names)
pass


# ---- Define worker(s) ----
# configure a worker to process `acquisition`-related tasks
acquisition_worker = DataJointWorker(
"acquisition_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=6,
sleep_duration=1200,
)
acquisition_worker(ingest_epochs_chunks)
acquisition_worker(acquisition.EpochConfig)
acquisition_worker(acquisition.Environment)
# acquisition_worker(ingest_environment_visits)
# acquisition_worker(block_analysis.BlockDetection)

# configure a worker to handle pyrat sync
pyrat_worker = DataJointWorker(
"pyrat_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=400,
sleep_duration=30,
)

pyrat_worker(subject.CreatePyratIngestionTask)
pyrat_worker(subject.PyratIngestion)
pyrat_worker(subject.SubjectDetail)
pyrat_worker(subject.PyratCommentWeightProcedure)

# configure a worker to ingest all data streams
streams_worker = DataJointWorker(
"streams_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=50,
sleep_duration=60,
autoclear_error_patterns=["%BlockAnalysis Not Ready%"],
)

for attr in vars(streams).values():
if is_djtable(attr, dj.user_tables.AutoPopulate):
streams_worker(attr, max_calls=10)

streams_worker(qc.CameraQC, max_calls=10)
streams_worker(tracking.SLEAPTracking, max_calls=10)

# configure a worker to run the analysis tables
analysis_worker = DataJointWorker(
"analysis_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=6,
sleep_duration=1200,
)

analysis_worker(block_analysis.BlockAnalysis, max_calls=6)
analysis_worker(block_analysis.BlockPlots, max_calls=6)


def get_workflow_operation_overview():
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The code looks well-structured and follows good practices. However, there are a few areas that could be improved for better maintainability and readability.

  1. The ingest_epochs_chunks and ingest_environment_visits functions (lines 39-52) both fetch the experiment names from AutomatedExperimentIngestion. This is a repeated operation and can be abstracted into a separate function to follow the DRY principle.

  2. The configuration of each worker (lines 56-111) is repetitive and could be abstracted into a function that takes parameters such as worker name, max idled cycle, sleep duration, etc. This would make the code more maintainable and easier to read.

  3. The get_workflow_operation_overview function (lines 113-115) imports a function within its body. It's generally recommended to keep all imports at the top of the file for better visibility and organization.

Here are the suggested changes:

+ def get_experiment_names():
+     """Fetch experiment names from AutomatedExperimentIngestion."""
+     return AutomatedExperimentIngestion.fetch("experiment_name")

- experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
+ experiment_names = get_experiment_names()

...

+ def configure_worker(name, schema_name, prefix, idled_cycle, sleep_duration, tasks):
+     """Configure a DataJointWorker with given parameters and tasks."""
+     worker = DataJointWorker(
+         name,
+         worker_schema_name=schema_name,
+         db_prefix=prefix,
+         max_idled_cycle=idled_cycle,
+         sleep_duration=sleep_duration,
+     )
+     for task in tasks:
+         worker(task)
+     return worker

- acquisition_worker = DataJointWorker(
-     "acquisition_worker",
-     worker_schema_name=worker_schema_name,
-     db_prefix=db_prefix,
-     max_idled_cycle=6,
-     sleep_duration=1200,
- )
- acquisition_worker(ingest_epochs_chunks)
- acquisition_worker(acquisition.EpochConfig)
- acquisition_worker(acquisition.Environment)

+ acquisition_tasks = [ingest_epochs_chunks, acquisition.EpochConfig, acquisition.Environment]
+ acquisition_worker = configure_worker("acquisition_worker", worker_schema_name, db_prefix, 6, 1200, acquisition_tasks)

...

- from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
+ import datajoint_utilities.dj_worker.utils as dj_utils

- return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])
+ return dj_utils.get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])

dj.config["custom"][
"database.prefix"
] = f"u_{dj.config['database.user']}_testsuite_"
return
Copy link
Contributor

@lochhh lochhh Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we drop the return statements in the fixtures that do not return anything?

"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
# analysis.ingest_environment_visits(experiment_names)
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pass

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

Commits Files that changed from the base of the PR and between 5276cc1 and ab51258 commits.
Files selected (3)
  • aeon/dj_pipeline/analysis/block_analysis.py (1)
  • aeon/dj_pipeline/populate/worker.py (1)
  • tests/conftest.py (1)
Files not summarized due to errors (1)
  • aeon/dj_pipeline/analysis/block_analysis.py (diff tokens exceeds limit)
Files skipped from review due to trivial changes (1)
  • aeon/dj_pipeline/analysis/block_analysis.py (diff too large)
Review comments generated (15)
  • Review: 2
  • LGTM: 13

Tips

Chat with Image description CodeRabbit Bot (@coderabbitai)

  • Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
  • Invite the bot into a review comment chain by tagging @coderabbitai in a reply.

Code suggestions

  • The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
  • You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.

Pausing incremental reviews

  • Add @coderabbitai: ignore anywhere in the PR description to pause further reviews from the bot.

Comment on lines +114 to +115
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The function get_workflow_operation_overview() is imported and used within another function of the same name. This could lead to confusion and potential errors in the future. Consider renaming the outer function to avoid naming conflicts.

- def get_workflow_operation_overview():
+ def retrieve_workflow_operation_overview():
      from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
      return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])

Comment on lines +50 to +61
@pytest.fixture(autouse=True, scope="session")
def dj_config():
"""If dj_local_config exists, load"""
dj_config_fp = pathlib.Path("dj_local_conf.json")
assert dj_config_fp.exists()
dj.config.load(dj_config_fp)
dj.config["safemode"] = False
assert "custom" in dj.config
dj.config["custom"][
"database.prefix"
] = f"u_{dj.config['database.user']}_testsuite_"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The dj_config fixture is loading a local configuration file without any error handling. If the file does not exist or is corrupted, the test suite will fail with an unhandled exception. It would be better to add some error handling around this operation.

-    dj_config_fp = pathlib.Path("dj_local_conf.json")
-    assert dj_config_fp.exists()
-    dj.config.load(dj_config_fp)
+    try:
+        dj_config_fp = pathlib.Path("dj_local_conf.json")
+        assert dj_config_fp.exists(), "Local config file does not exist."
+        dj.config.load(dj_config_fp)
+    except Exception as e:
+        raise Exception(f"Failed to load local config: {e}")

@jkbhagatio jkbhagatio self-requested a review June 12, 2024 14:23
@jkbhagatio jkbhagatio merged commit 25cc4b7 into main Jun 12, 2024
6 of 8 checks passed
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

Commits Files that changed from the base of the PR and between ab51258 and f9ae80a commits.
Files selected (3)
  • aeon/analysis/block_plotting.py (1)
  • aeon/dj_pipeline/analysis/block_analysis.py (1)
  • aeon/dj_pipeline/populate/worker.py (1)
Files ignored due to filter (1)
  • aeon/dj_pipeline/webapps/sciviz/specsheet.yaml
Files not summarized due to errors (1)
  • aeon/dj_pipeline/analysis/block_analysis.py (diff tokens exceeds limit)
Files skipped from review due to trivial changes (1)
  • aeon/dj_pipeline/analysis/block_analysis.py (diff too large)
Review comments generated (2)
  • Review: 1
  • LGTM: 1

Tips

Chat with Image description CodeRabbit Bot (@coderabbitai)

  • Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
  • Invite the bot into a review comment chain by tagging @coderabbitai in a reply.

Code suggestions

  • The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
  • You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.

Pausing incremental reviews

  • Add @coderabbitai: ignore anywhere in the PR description to pause further reviews from the bot.

Comment on lines +1 to +116
"WorkerLog",
"ErrorLog",
"logger",
"AutomatedExperimentIngestion",
]

# ---- Some constants ----
logger = dj.logger
worker_schema_name = db_prefix + "worker"

# ---- Manage experiments for automated ingestion ----

schema = dj.Schema(worker_schema_name)


@schema
class AutomatedExperimentIngestion(dj.Manual):
definition = """ # experiments to undergo automated ingestion
-> acquisition.Experiment
"""


def ingest_epochs_chunks():
"""Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
acquisition.Epoch.ingest_epochs(experiment_name)
acquisition.Chunk.ingest_chunks(experiment_name)


def ingest_environment_visits():
"""Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
# analysis.ingest_environment_visits(experiment_names)
pass


# ---- Define worker(s) ----
# configure a worker to process `acquisition`-related tasks
acquisition_worker = DataJointWorker(
"acquisition_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=6,
sleep_duration=1200,
)
acquisition_worker(ingest_epochs_chunks)
acquisition_worker(acquisition.EpochConfig)
acquisition_worker(acquisition.Environment)
# acquisition_worker(ingest_environment_visits)
acquisition_worker(block_analysis.BlockDetection)

# configure a worker to handle pyrat sync
pyrat_worker = DataJointWorker(
"pyrat_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=400,
sleep_duration=30,
)

pyrat_worker(subject.CreatePyratIngestionTask)
pyrat_worker(subject.PyratIngestion)
pyrat_worker(subject.SubjectDetail)
pyrat_worker(subject.PyratCommentWeightProcedure)

# configure a worker to ingest all data streams
streams_worker = DataJointWorker(
"streams_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=50,
sleep_duration=60,
autoclear_error_patterns=["%BlockAnalysis Not Ready%"],
)

for attr in vars(streams).values():
if is_djtable(attr, dj.user_tables.AutoPopulate):
streams_worker(attr, max_calls=10)

streams_worker(qc.CameraQC, max_calls=10)
streams_worker(tracking.SLEAPTracking, max_calls=10)

# configure a worker to run the analysis tables
analysis_worker = DataJointWorker(
"analysis_worker",
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
max_idled_cycle=6,
sleep_duration=1200,
)

analysis_worker(block_analysis.BlockAnalysis, max_calls=6)
analysis_worker(block_analysis.BlockPlots, max_calls=6)
analysis_worker(block_analysis.BlockSubjectAnalysis, max_calls=6)
analysis_worker(block_analysis.BlockSubjectPlots, max_calls=6)

def get_workflow_operation_overview():
from datajoint_utilities.dj_worker.utils import get_workflow_operation_overview
return get_workflow_operation_overview(worker_schema_name=worker_schema_name, db_prefixes=[db_prefix])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description CodeRabbit

The code looks well-structured and follows good practices. However, there are a few areas that could be improved for better maintainability and error handling.

  1. Error Handling: There is no exception handling in the ingest_epochs_chunks and ingest_environment_visits functions. If an error occurs during the ingestion process, it will stop the entire process. It would be better to handle exceptions so that the process can continue with the next experiment if one fails.
def ingest_epochs_chunks():
    """Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion."""
    experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
    for experiment_name in experiment_names:
        try:
            acquisition.Epoch.ingest_epochs(experiment_name)
            acquisition.Chunk.ingest_chunks(experiment_name)
        except Exception as e:
            logger.error(f"Error ingesting epochs and chunks for {experiment_name}: {e}")

def ingest_environment_visits():
    """Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
    experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
    for experiment_name in experiment_names:
        try:
            # analysis.ingest_environment_visits(experiment_names)
            pass
        except Exception as e:
            logger.error(f"Error ingesting environment visits for {experiment_name}: {e}")
  1. Maintainability: The worker configuration is repeated multiple times with different parameters. This could be refactored into a function to reduce redundancy and improve maintainability.
def configure_worker(name, max_idled_cycle, sleep_duration):
    return DataJointWorker(
        name,
        worker_schema_name=worker_schema_name,
        db_prefix=db_prefix,
        max_idled_cycle=max_idled_cycle,
        sleep_duration=sleep_duration,
    )

acquisition_worker = configure_worker("acquisition_worker", 6, 1200)
pyrat_worker = configure_worker("pyrat_worker", 400, 30)
streams_worker = configure_worker("streams_worker", 50, 60)
analysis_worker = configure_worker("analysis_worker", 6, 1200)
  1. Modularity: The ingest_epochs_chunks and ingest_environment_visits functions are defined in the global scope. It would be better to encapsulate them within a class or module for better modularity.

  2. Performance: In the for attr in vars(streams).values(): loop, the is_djtable function is called for every attribute of streams. If streams has many attributes, this could be inefficient. Consider filtering the attributes first before calling is_djtable.

auto_populate_attrs = [attr for attr in vars(streams).values() if isinstance(attr, dj.user_tables.AutoPopulate)]
for attr in auto_populate_attrs:
    streams_worker(attr, max_calls=10)
  1. Best Practices: The pass statement in the ingest_environment_visits function indicates that this function is not yet implemented. It's best to raise a NotImplementedError to make it clear that this function needs to be implemented.
def ingest_environment_visits():
    """Extract and insert complete visits for experiments specified in AutomatedExperimentIngestion."""
    experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
    for experiment_name in experiment_names:
        raise NotImplementedError("This function needs to be implemented.")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants