Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConvertJob.copy_sequences() method added. #145

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 164 additions & 1 deletion sequence_processing_pipeline/ConvertJob.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from os.path import join, exists
from os.path import join, exists, split
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
import logging
import re
from json import loads as json_loads
from metapool import load_sample_sheet
from shutil import copyfile


class ConvertJob(Job):
Expand Down Expand Up @@ -44,6 +47,8 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name,
self.qiita_job_id = qiita_job_id
self.job_script_path = join(self.output_path, f"{self.job_name}.sh")
self.suffix = 'fastq.gz'
self.fastq_paths = None
self.info = None

tmp = False
for executable_name in ['bcl2fastq', 'bcl-convert']:
Expand Down Expand Up @@ -146,6 +151,9 @@ def run(self, callback=None):
job_info = self.submit_job(self.job_script_path,
exec_from=self.log_path,
callback=callback)

self._get_sample_sheet_info()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, can this be called before the previous line or not (before submitting jobs)? Also, is the plan to unify this functionality in a single place? Maybe _get_sample_sheet_info()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should only be called after the bcl-convert job returns completed. The output from this method is specific to supporting ConvertJob so there currently isn't a value in pushing it down into the base class. It's name does begin with '_' and so by convention it's an internal method and nobody should be calling it except ConvertJob.run().

except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
# job to return a more descriptive message to the user.
Expand Down Expand Up @@ -210,3 +218,158 @@ def parse_job_script(job_script_path):
f"'{job_script_path}'")

return result

def _get_sample_sheet_info(self):
# assume path to sample-sheet exists, and sheet is valid.
# otherwise, we would not be here.
sheet = load_sample_sheet(self.sample_sheet_path)

# parse bioinformatics section to generate a durable list of
# project_names and qiita_ids.
bioinformatics = sheet.Bioinformatics
projects = bioinformatics['Sample_Project'].tolist()
qiita_ids = bioinformatics['QiitaID'].tolist()

if 'contains_replicates' in sheet.Bioinformatics:
has_reps = sheet.Bioinformatics['contains_replicates'].tolist()
# assume a validated sample-sheet ensures has_reps has only one
# value, either True or False.
self.contains_replicates = bool(has_reps[0])
else:
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
self.contains_replicates = False

results = {}

for project, qiita_id in zip(projects, qiita_ids):
# derive project_name by removing the known qiita_id associated
# with this project from the string.
project_name = re.sub(f'_{qiita_id}$', '', project)
results[project] = {'qiita_id': qiita_id,
'project_name': project_name,
'full_project_name': project,
'samples': {}}

# Since the project-name is stored in an internal variable
# in a third-party library, convert samples to JSON using the exposed
# method first.
samples = json_loads(sheet.to_json())['Data']

for sample in samples:
d = {'Sample_Name': sample['Sample_Name'],
'Sample_ID': sample['Sample_ID'],
# matching files will store the paths to all fastq files
# associated w/this sample-name.
'matching_files': []}

if 'orig_name' in sample:
d['orig_name'] = sample['orig_name']

results[sample['Sample_Project']]['samples'][d['Sample_Name']] = d

# associate with each dictionary a list of matching fastq files.
# this way, we can easily determine which files to copy to another
# project based on just a sample-name/sample-id and a project-name.

for project in results:
# find just the fastq files for this project.

fastq_paths = self._find_files(join(self.output_path,
project))
fastq_paths = [f for f in fastq_paths if f.endswith('.fastq.gz')]

for sample_name in results[project]['samples']:
sample = results[project]['samples'][sample_name]
# regex based on studying all filenames of all fastq files in
# $WKDIR. Works with _R1_, _R2_, _I1_, _I2_, etc.
rgx = r"^" + re.escape(sample['Sample_ID']) + \
r"_S\d+_L\d+_[R,I]\d+_\d+.fastq.gz$"

for full_path in fastq_paths:
file_path, file_name = split(full_path)
if re.match(rgx, file_name):
sample['matching_files'].append(full_path)

self.info = results

def copy_sequences(self, sample_name, source_project, dest_project,
copy_all_replicates=False):
"""
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
Copies all fastq files related to a sample into another project.
:param source_project: The source project w/qiita_id.
:param dest_project: The destination project w/qiita_id.
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
:param sample_name: A sample-name.
:param orig_name: A sample-name.
:param copy_all_replicates: If True, search for sample_name in the
orig_name column of the sample-sheet. Copy all replicates.
:return: None
"""
if self.info is None:
raise ValueError("This method cannot be called until processing "
"has completed.")

project_names = list(self.info.keys())

# confirm source project is a valid one.
if source_project not in project_names:
raise ValueError(f"'{source_project}' is not defined in the "
"sample-sheet")

# confirm destination project is a valid one.
if dest_project not in project_names:
raise ValueError(f"'{dest_project}' is not defined in the "
"sample-sheet")

charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
if source_project == dest_project:
raise ValueError(f"source '{source_project}' and destination "
f"'{dest_project}' projects are the same")

# note that the user can supply a sample-name that didn't make it
# through the conversion step and may have no files matched to it.
# this is considered okay and a possible outcome of conversion. In
# this case zero files are copied and no information is passed back
# to the user.

# projects that contain replicates must also be considered. if the
# value for sample_name contains a well-id, then only the files
# associated with that particular replicate should be copied. If the
# value instead references a sample_name in the 'orig_name' column,
# then all files associated with each replicate need to be moved.

# in this situation, the sample_name needs be compared against all
# orig_names in the project and the individual sample_names (w/well-
# ids) must be discovered. Then those individual sample_names can
# be processed.

if copy_all_replicates is True and self.contains_replicates is False:
raise ValueError("'treat_as_orig_name' is set to 'True' but this "
"sample-sheet doesn't contain replicates")
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved

charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
samples = self.info[source_project]['samples']
sample_list = []

if copy_all_replicates:
for key in samples:
sample = samples[key]
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
# assume orig_name is present if treat_as_orig_name is True.
if sample_name == sample['orig_name']:
sample_list.append(sample)
else:
# sample_name is a value from the sample_name column. it may or
# may not have a well-id appended and this sample-sheet may or
# may not contain replicates, but in either case a single sample
# either exists or it doesn't.
if sample_name in self.info[source_project]['samples']:
sample_list.append(samples[sample_name])

if len(sample_list) == 0:
# if the sample_list is empty, then sample-name wasn't present in
# either the sample_name or orig_name columns.
raise ValueError(f"'{sample_name}' is not defined in the project"
f" '{source_project}'")
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved

for sample in sample_list:
for src_fp in sample['matching_files']:
# split(fp)[1] is simply the original filename, which must
# be provided in the destination path.
dst_fp = join(self.output_path, dest_project, split(src_fp)[1])
copyfile(src_fp, dst_fp)
Loading
Loading