Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConvertJob.copy_sequences() method added. #145

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 167 additions & 1 deletion sequence_processing_pipeline/ConvertJob.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from os.path import join, exists
from os.path import join, exists, split
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
import logging
import re
from json import loads as json_loads
from metapool import load_sample_sheet
from shutil import copyfile


class ConvertJob(Job):
Expand Down Expand Up @@ -44,6 +47,8 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name,
self.qiita_job_id = qiita_job_id
self.job_script_path = join(self.output_path, f"{self.job_name}.sh")
self.suffix = 'fastq.gz'
self.fastq_paths = None
self.info = None

tmp = False
for executable_name in ['bcl2fastq', 'bcl-convert']:
Expand Down Expand Up @@ -146,6 +151,9 @@ def run(self, callback=None):
job_info = self.submit_job(self.job_script_path,
exec_from=self.log_path,
callback=callback)

self._get_sample_sheet_info()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, can this be called before the previous line or not (before submitting jobs)? Also, is the plan to unify this functionality in a single place? Maybe _get_sample_sheet_info()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should only be called after the bcl-convert job returns completed. The output from this method is specific to supporting ConvertJob so there currently isn't a value in pushing it down into the base class. It's name does begin with '_' and so by convention it's an internal method and nobody should be calling it except ConvertJob.run().

except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
# job to return a more descriptive message to the user.
Expand Down Expand Up @@ -210,3 +218,161 @@ def parse_job_script(job_script_path):
f"'{job_script_path}'")

return result

def _get_sample_sheet_info(self):
# assume path to sample-sheet exists, and sheet is valid.
# otherwise, we would not be here.
sheet = load_sample_sheet(self.sample_sheet_path)

# parse bioinformatics section to generate a durable list of
# project_names and qiita_ids.
bioinformatics = sheet.Bioinformatics
projects = bioinformatics['Sample_Project'].tolist()
qiita_ids = bioinformatics['QiitaID'].tolist()

if 'contains_replicates' in sheet.Bioinformatics:
has_reps = sheet.Bioinformatics['contains_replicates'].tolist()
# assume a validated sample-sheet ensures has_reps has only one
# value, either True or False.
self.contains_replicates = has_reps[0]
else:
self.contains_replicates = False

results = {}

for project, qiita_id in zip(projects, qiita_ids):
# derive project_name by removing the known qiita_id associated
# with this project from the string.
project_name = re.sub(f'_{qiita_id}$', '', project)
results[project] = {'qiita_id': qiita_id,
'project_name': project_name,
'full_project_name': project,
'samples': {}}

# Since the project-name is stored in an internal variable
# in a third-party library, convert samples to JSON using the exposed
# method first.
samples = json_loads(sheet.to_json())['Data']

for sample in samples:
d = {'Sample_Name': sample['Sample_Name'],
'Sample_ID': sample['Sample_ID'],
# matching files will store the paths to all fastq files
# associated w/this sample-name.
'matching_files': []}

if 'orig_name' in sample:
d['orig_name'] = sample['orig_name']

results[sample['Sample_Project']]['samples'][d['Sample_Name']] = d

# associate with each dictionary a list of matching fastq files.
# this way, we can easily determine which files to copy to another
# project based on just a sample-name/sample-id and a project-name.

for project in results:
# find just the fastq files for this project.

fastq_paths = self._find_files(join(self.output_path,
project))
fastq_paths = [f for f in fastq_paths if f.endswith('.fastq.gz')]

for sample_name in results[project]['samples']:
sample = results[project]['samples'][sample_name]
# regex based on studying all filenames of all fastq files in
# $WKDIR. Works with _R1_, _R2_, _I1_, _I2_, etc.
rgx = r"^" + re.escape(sample['Sample_ID']) + \
r"_S\d+_L\d+_[R,I]\d+_\d+.fastq.gz$"

for full_path in fastq_paths:
file_path, file_name = split(full_path)
if re.match(rgx, file_name):
sample['matching_files'].append(full_path)

self.info = results

def copy_sequences(self, sample_name, source_project, dest_project):
"""
Copies all fastq files related to a sample into another project.
:param sample_name: A value from the sample-name column if
copy_all_replicates is False or a value from the orig_name column
otherwise.
:param source_project: The source project name including qiita_id.
:param dest_project: The destination project name including qiita_id.
:return: None
"""
if self.info is None:
raise ValueError("This method cannot be called until processing "
"has completed.")

project_names = list(self.info.keys())

# confirm source and destination projects are both valid.
for proj in [source_project, dest_project]:
if proj not in project_names:
raise ValueError(f"'{proj}' is not defined in sample-sheet")

if source_project == dest_project:
raise ValueError(f"source '{source_project}' and destination "
f"'{dest_project}' projects are the same")

# note that the user can supply a sample-name that didn't make it
# through the conversion step and may have no files matched to it.
# this is considered okay and a possible outcome of conversion. In
# this case zero files are copied and no information is passed back
# to the user.

# projects that contain replicates must also be considered. In this
# situation, the sample_name needs to be compared against all values
# in the orig_name column and the individual sample_names (w/well-ids)
# must be discovered; those individual sample_names can then be
# processed.

samples = self.info[source_project]['samples']

# Either we will need to copy all replicates, in which case we need to
# match our input to values in the orig_name column, or we want to copy
# a single sample, in which case the input needs to be matched to a
# value in the sample_name column. The results of our match will be
# stored in results and if we are not copying all replicates, there
# should only be one sample matched in results.
results = []

if self.contains_replicates is True:
for _, sample in samples.items():
# assume orig_name is present if copy_all_replicates is True.
if sample_name == sample['orig_name']:
results.append(sample)
else:
# sample_name is a value from the sample_name column. it may or
# may not have a well-id appended and this sample-sheet may or
# may not contain replicates, but in either case a single sample
# either exists or it doesn't.
if sample_name in self.info[source_project]['samples']:
results.append(samples[sample_name])

if len(results) == 0:
if self.contains_replicates is True:
# the value of sample_name did not match any value in the
# orig_name column. It may match a value in the sample_name
# column.
msg = (f"'{sample_name}' did not match any values in the 'orig"
f"_name' column for project '{source_project}'. Your "
f"value '{sample_name}' may have a well-id appended to"
" it")
else:
# if we don't want to copy all replicates and we just want to
# copy a particular sample-name, then providing a value from
# the orig_name column would be ambiguous since it could be
# matched to multiple samples. Not much can be done here.
msg = (f"'{sample_name}' did not match any values in the 'samp"
f"le_name' column for project '{source_project}'")

raise ValueError(msg)

for sample in results:
for src_fp in sample['matching_files']:
# split(fp)[1] is simply the original filename, which must
# be provided in the destination path.
dst_fp = join(self.output_path, dest_project, split(src_fp)[1])
copyfile(src_fp, dst_fp)
178 changes: 177 additions & 1 deletion sequence_processing_pipeline/tests/test_ConvertJob.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from os.path import join, abspath
from os.path import join, abspath, exists
from os import makedirs
from sequence_processing_pipeline.ConvertJob import ConvertJob
from sequence_processing_pipeline.PipelineError import (PipelineError,
Expand Down Expand Up @@ -841,6 +841,7 @@ def setUp(self):
self.base_path = partial(join, package_root, 'tests', 'data')
self.good_output_path = self.base_path('output_dir')
self.sample_sheet_path = self.base_path('good-sample-sheet.csv')
self.sheet_w_repl_path = self.base_path('good_sheet_w_replicates.csv')
self.good_input_path = self.base_path('input_dir')

# self.good_input_path doesn't need to have anything in it for the
Expand Down Expand Up @@ -995,6 +996,181 @@ def test_parse_sample_sheet(self):

self.assertDictEqual(obs, exp)

def test_copy_sequences_bad_parameters(self):
run_dir = self.base_path('211021_A00000_0000_SAMPLE')
qiita_id = 'abcdabcdabcdabcdabcdabcdabcdabcd'

job = ConvertJob(run_dir, self.good_output_path,
self.sample_sheet_path, 'qiita', 1, 16, 1440, '10gb',
'tests/bin/bcl-convert', [], qiita_id)

# instead of calling run() and faking an entire ConvertJob run,
# manually call _get_sample_sheet_info(), which is typically called
# once a job has completed, to gather the metadata needed to properly
# run copy_sequences() method.

job._get_sample_sheet_info()

sample_name = 'CDPH-SAL.Salmonella.Typhi.MDL-154'
source_project = 'Feist_11661'
other_projects = ['NYU_BMS_Melanoma_13059', 'Gerwick_6123']
dest_project = other_projects[0]
not_source_project = other_projects[1]
not_a_sample_name = 'NOT_A_SAMPLE_NAME'
not_a_project = 'NOT_A_PROJECT'

with self.assertRaisesRegex(ValueError, "'NOT_A_SAMPLE_NAME' did not "
"match any values in the "
"'sample_name' column for "
"project 'Feist_11661'"):
job.copy_sequences(not_a_sample_name,
source_project,
dest_project)

with self.assertRaisesRegex(ValueError, "'CDPH-SAL.Salmonella.Typhi."
"MDL-154' did not match any "
"values in the 'sample_name' "
"column for project 'Gerwick_"
"6123'"):
job.copy_sequences(sample_name,
not_source_project,
dest_project)

with self.assertRaisesRegex(ValueError, "'NOT_A_PROJECT' is not "
"defined in sample-sheet"):
job.copy_sequences(sample_name,
not_a_project,
dest_project)

with self.assertRaisesRegex(ValueError, "'NOT_A_PROJECT' is not "
"defined in sample-sheet"):
job.copy_sequences(sample_name,
source_project,
not_a_project)

with self.assertRaisesRegex(ValueError, "source 'Feist_11661' and "
"destination 'Feist_11661' "
"projects are the same"):
job.copy_sequences(sample_name,
source_project,
source_project)

def test_copy_sequences_success(self):
run_dir = self.base_path('211021_A00000_0000_SAMPLE')
qiita_id = 'abcdabcdabcdabcdabcdabcdabcdabcd'

job = ConvertJob(run_dir, self.good_output_path,
self.sample_sheet_path, 'qiita', 1, 16, 1440, '10gb',
'tests/bin/bcl-convert', [], qiita_id)

sample_name = 'CDPH-SAL.Salmonella.Typhi.MDL-154'
source_project = 'Feist_11661'
dest_project = 'NYU_BMS_Melanoma_13059'
projects = ['NYU_BMS_Melanoma_13059', 'Gerwick_6123', 'Feist_11661']

# since we can't perform a real run, let's manually create a fake
# fastq file and project directories in the 'output_dir' directory and
# manually call job._get_sample_sheet_info() to obtain all of the
# metadata needed to copy a sequence from one project into another.

for some_project in projects:
# fake the fastq file directories in ConvertJob, one for each
# project defined in the sample-sheet.
makedirs(join(self.good_output_path, 'ConvertJob', some_project))

# fake a fastq file in the 'Feist_11661' directory for the purposes of
# copying it into the 'NYU_BMS_Melanoma_13059' project.
with open(join(self.good_output_path, 'ConvertJob', source_project,
'CDPH-SAL_Salmonella_Typhi_MDL-154_S1_L001_R1_001.'
'fastq.gz'), 'w') as f:
f.write("Hello World!\n")

# manually call the functionality that reads the sample-sheet and
# attempts to associate samples with the fastq files generated by
# bcl-convert.
job._get_sample_sheet_info()

# copy all fastq files associated w/
# 'CDPH-SAL.Salmonella.Typhi.MDL-154' from 'Feist_11661' to
# 'NYU_BMS_Melanoma_13059'. 'Gerwick_6123' should remain empty; the
# code shouldn't copy anything into that project.
job.copy_sequences(sample_name, source_project, dest_project)

sample_info = job.info[source_project]['samples'][sample_name]

# get the path for the source fastq file we created above and swap out
# the project-level directory names to confirm and deny the existence
# of the fastq file in other locations.
source_file = sample_info['matching_files'][0]

# file should have been copied here.
dst_file = source_file.replace('Feist_11661', 'NYU_BMS_Melanoma_13059')
self.assertTrue(exists(dst_file))

# file should not have been copied here.
dst_file = source_file.replace('Feist_11661', 'Gerwick_6123')
self.assertFalse(exists(dst_file))

def test_copy_sequences_success_w_replicates(self):
# perform a similar test to the one above, but w/replicate samples.

run_dir = self.base_path('211021_A00000_0000_SAMPLE')
qiita_id = 'abcdabcdabcdabcdabcdabcdabcdabcd'

job = ConvertJob(run_dir, self.good_output_path,
self.sheet_w_repl_path, 'qiita', 1, 16, 1440, '10gb',
'tests/bin/bcl-convert', [], qiita_id)

sample_name = 'RMA.KHP.rpoS.Mage.Q97D'
source_project = 'Feist_11661'
dest_project = 'NYU_BMS_Melanoma_13059'
projects = ['NYU_BMS_Melanoma_13059', 'Feist_11661']

for some_project in projects:
makedirs(
join(self.good_output_path, 'ConvertJob', some_project))

# fake a fastq file in the 'Feist_11661' directory for the purposes of
# copying it into the 'NYU_BMS_Melanoma_13059' project.

# instead of faking just a single fastq file, fake an R1, R2 and I1
# fastq file for all three replicates of 'RMA.KHP.rpoS.Mage.Q97D'.

fastqs = ['RMA_KHP_rpoS_Mage_Q97D_A5_S1_L001_R1_001.fastq.gz',
'RMA_KHP_rpoS_Mage_Q97D_A5_S1_L001_I1_001.fastq.gz',
'RMA_KHP_rpoS_Mage_Q97D_A5_S1_L001_R2_001.fastq.gz',

'RMA_KHP_rpoS_Mage_Q97D_A6_S1_L001_R1_001.fastq.gz',
'RMA_KHP_rpoS_Mage_Q97D_A6_S1_L001_I1_001.fastq.gz',
'RMA_KHP_rpoS_Mage_Q97D_A6_S1_L001_R2_001.fastq.gz',

'RMA_KHP_rpoS_Mage_Q97D_B6_S1_L001_R1_001.fastq.gz',
'RMA_KHP_rpoS_Mage_Q97D_B6_S1_L001_I1_001.fastq.gz',
'RMA_KHP_rpoS_Mage_Q97D_B6_S1_L001_R2_001.fastq.gz']

for fastq in fastqs:
with open(join(self.good_output_path, 'ConvertJob', source_project,
fastq), 'w') as f:
f.write("Hello World!\n")

job._get_sample_sheet_info()

job.copy_sequences(sample_name, source_project, dest_project)

files_to_match = []

for smpl in job.info[source_project]['samples']:
smpl_info = job.info[source_project]['samples'][smpl]
if smpl_info['orig_name'] == 'RMA.KHP.rpoS.Mage.Q97D':
files_to_match += smpl_info['matching_files']

files_to_match = [fp.replace('Feist_11661',
'NYU_BMS_Melanoma_13059')
for fp in files_to_match]

for dst_file in files_to_match:
self.assertTrue(exists(dst_file))


SCRIPT_EXP = ''.join([
'#!/bin/bash\n',
Expand Down
Loading