Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapting code & tests to use NuQCJob #76

Merged
merged 18 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions qp_klp/Amplicon.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ def quality_control(self):
# Quality control for Amplicon runs occurs downstream.
# Do not perform QC at this time.

# Simulate QCJob's output directory for use as input into FastQCJob.
# Simulate NuQCJob's output directory for use as input into FastQCJob.
projects = self.pipeline.get_project_info()
projects = [x['project_name'] for x in projects]

for project_name in projects:
# copy the files from ConvertJob output to faked QCJob output
# folder: $WKDIR/$RUN_ID/QCJob/$PROJ_NAME/amplicon
# copy the files from ConvertJob output to faked NuQCJob output
# folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon
output_folder = join(self.pipeline.output_path,
'QCJob',
'NuQCJob',
project_name,
# for legacy purposes, output folders are
# either 'trimmed_sequences', 'amplicon', or
Expand Down Expand Up @@ -91,7 +91,7 @@ def generate_reports(self):
super()._generate_reports()
return None # amplicon doesn't need project names

def _get_data_type(self, prep_file_path):
def get_data_type(self, prep_file_path):
metadata = Step.parse_prep_file(prep_file_path, convert_to_dict=False)

if 'target_gene' in metadata.columns:
Expand Down
8 changes: 6 additions & 2 deletions qp_klp/Metagenomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ def convert_bcl_to_fastq(self):
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob')

def quality_control(self):
config = self.pipeline.configuration['qc']
config = self.pipeline.configuration['nu-qc']
job = super()._quality_control(config, self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob')
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'NuQCJob')

def generate_reports(self):
job = super()._generate_reports()
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob')

self.project_names = job.project_names

def get_data_type(self, prep_file_path):
# prep_file_path is unused. It's kept for compatability with Amplicon and Step.
return self.pipeline.pipeline_type

def generate_prep_file(self):
config = self.pipeline.configuration['seqpro']

Expand Down
122 changes: 90 additions & 32 deletions qp_klp/Step.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.Pipeline import Pipeline
from sequence_processing_pipeline.QCJob import QCJob
from sequence_processing_pipeline.NuQCJob import NuQCJob
from subprocess import Popen, PIPE
import pandas as pd
from glob import glob
Expand Down Expand Up @@ -85,6 +85,8 @@ def __init__(self, pipeline, master_qiita_job_id,

self.pipeline = pipeline
self.lane_number = lane_number
self.generated_artifact_name = \
f'{self.pipeline.run_id}_{self.lane_number}'
self.master_qiita_job_id = master_qiita_job_id

if status_update_callback is not None:
Expand Down Expand Up @@ -192,6 +194,9 @@ def generate_special_map(self, qclient):

self.special_map = special_map

def get_data_type(self, prep_file_path):
raise ValueError("get_data_type() not implemented for base-class.")

def update_prep_templates(self, qclient):
'''
Update prep-template info in Qiita. Get dict of prep-ids by study-id.
Expand Down Expand Up @@ -281,35 +286,35 @@ def _convert_bcl_to_fastq(self, config, input_file_path):
return convert_job

def _quality_control(self, config, input_file_path):
qc_job = QCJob(join(self.pipeline.output_path, 'ConvertJob'),
self.pipeline.output_path,
input_file_path,
config['minimap_databases'],
config['kraken2_database'],
config['queue'],
config['nodes'],
config['nprocs'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['fastp_executable_path'],
config['minimap2_executable_path'],
config['samtools_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
self.job_pool_size,
config['job_max_array_length'])

qc_job.run(callback=self.update_callback)

return qc_job
nuqc_job = NuQCJob(join(self.pipeline.output_path, 'ConvertJob'),
self.pipeline.output_path,
input_file_path,
config['minimap_databases'],
config['queue'],
config['nodes'],
config['nprocs'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['fastp_executable_path'],
config['minimap2_executable_path'],
config['samtools_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
self.job_pool_size,
config['job_max_array_length'],
config['known_adapters_path'])

nuqc_job.run(callback=self.update_callback)

return nuqc_job

def _generate_reports(self):
config = self.pipeline.configuration['fastqc']
is_amplicon = self.pipeline.pipeline_type == Step.AMPLICON_TYPE
fastqc_job = FastQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
join(self.pipeline.output_path, 'ConvertJob'),
join(self.pipeline.output_path, 'QCJob'),
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['fastqc_executable_path'],
Expand All @@ -335,7 +340,7 @@ def _generate_prep_file(self, config, input_file_path, seqpro_path,
gpf_job = GenPrepFileJob(
self.pipeline.run_dir,
join(self.pipeline.output_path, 'ConvertJob'),
join(self.pipeline.output_path, 'QCJob'),
join(self.pipeline.output_path, 'NuQCJob'),
self.pipeline.output_path,
input_file_path,
seqpro_path,
Expand All @@ -360,13 +365,13 @@ def _helper_process_fastp_report_dirs(self):
for dir_name in dirs:
if dir_name == 'fastp_reports_dir':
# generate the full path for this directory before
# truncating everything up to the QCJob directory.
full_path = join(root, dir_name).split('QCJob/')
report_dirs.append(join('QCJob', full_path[1]))
# truncating everything up to the NuQCJob directory.
full_path = join(root, dir_name).split('NuQCJob/')
report_dirs.append(join('NuQCJob', full_path[1]))

if report_dirs:
report_dirs.sort()
return 'tar zcvf reports-QCJob.tgz ' + ' '.join(report_dirs)
return 'tar zcvf reports-NuQCJob.tgz ' + ' '.join(report_dirs)
else:
# It is okay to return an empty list of commands if reports_dirs
# is empty. Some pipelines do not generate fastp reports.
Expand All @@ -388,7 +393,7 @@ def _helper_process_operations(self):
REPORT_PREFIX = 'reports'
PREP_PREFIX = 'prep-files'
CONVERT_JOB = 'ConvertJob'
QC_JOB = 'QCJob'
QC_JOB = 'NuQCJob'
FASTQC_JOB = 'FastQCJob'
PREPFILE_JOB = 'GenPrepFileJob'
TAR_EXT = 'tgz'
Expand All @@ -400,7 +405,7 @@ def _helper_process_operations(self):
f'{REPORT_PREFIX}-{CONVERT_JOB}.{TAR_EXT}',
'OUTPUT_FIRST'),

(['QCJob/logs'], TAR_CMD,
(['NuQCJob/logs'], TAR_CMD,
f'{LOG_PREFIX}-{QC_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'),

(['FastQCJob/logs'], TAR_CMD,
Expand Down Expand Up @@ -478,12 +483,12 @@ def _get_fastq_files(self, out_dir, project):
af = None
sub_folders = ['amplicon', 'filtered_sequences', 'trimmed_sequences']
for sub_folder in sub_folders:
sf = f'{out_dir}/QCJob/{project}/{sub_folder}'
sf = f'{out_dir}/NuQCJob/{project}/{sub_folder}'
if exists(sf):
af = [f for f in glob(f'{sf}/*.fastq.gz')]
break
if af is None or not af:
raise PipelineError("QCJob output not in expected location")
raise PipelineError("NuQCJob output not in expected location")

files = {'raw_barcodes': [], 'raw_forward_seqs': [],
'raw_reverse_seqs': []}
Expand Down Expand Up @@ -516,6 +521,7 @@ def _get_fastq_files(self, out_dir, project):

return files


def _load_prep_into_qiita(self, qclient, prep_id, artifact_name,
qiita_id, project, fastq_files, atype):
surl = f'{qclient._server_url}/study/description/{qiita_id}'
Expand Down Expand Up @@ -607,6 +613,58 @@ def load_preps_into_qiita(self, qclient):

return df

def _load_preps_into_qiita2(self, qclient, prep_id, qiita_id, out_dir,
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
project):
surl = f'{qclient._server_url}/study/description/{qiita_id}'
prep_url = (f'{qclient._server_url}/study/description/'
f'{qiita_id}?prep_id={prep_id}')

if self.pipeline.pipeline_type == Step.AMPLICON_TYPE:
files = self._get_files_amplicon(out_dir, project)
else:
atype = 'per_sample_FASTQ'
files = self._get_files_meta(out_dir, project, prep_id)

for f_type in files:
if not files[f_type]:
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
# if one or more of the expected list of reads is empty,
# raise an Error.
raise ValueError(f"'{f_type}' is empty")

# ideally we would use the email of the user that started the SPP
# run but at this point there is no easy way to retrieve it
pdata = {'user_email': 'qiita.help@gmail.com',
'prep_id': prep_id,
'artifact_type': atype,
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
'command_artifact_name': self.generated_artifact_name,
'files': dumps(files)}

job_id = qclient.post('/qiita_db/artifact/', data=pdata)['job_id']

return {'Project': project, 'Qiita Study ID': qiita_id,
'Qiita Prep ID': prep_id, 'Qiita URL': surl,
'Prep URL': prep_url, 'Linking JobID': job_id}

def load_preps_into_qiita2(self, qclient):
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
out_dir = self.pipeline.output_path

data = []
for project, _, qiita_id in self.special_map:
for prep_id in self.touched_studies_prep_info[qiita_id]:
data.append(self._load_preps_into_qiita(qclient,
prep_id,
qiita_id,
out_dir,
project))

df = pd.DataFrame(data)
opath = join(self.pipeline.output_path, 'touched_studies.html')
with open(opath, 'w') as f:
f.write(df.to_html(border=2, index=False, justify="left",
render_links=True, escape=False))

return df

def write_commands_to_output_path(self):
self.cmds_log_path = join(self.pipeline.output_path, 'cmds.log')
with open(self.cmds_log_path, 'w') as f:
Expand Down
16 changes: 16 additions & 0 deletions qp_klp/tests/test_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,22 @@ class BaseStepTests(TestCase):
"job_pool_size": 30,
"job_max_array_length": 1000
},
"nu-qc": {
"nodes": 1,
"nprocs": 16,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the other open PR, should this be connected/the-same as {{cores_per_task}}? If yes, could you make sure to use the same name, just for clarity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I removed nprocs for NuQC because it's not being used and added cpus_per_task since it is being used. I added a comment briefly covering the difference between the two parameters after reading some literature.

"queue": "qiita",
"wallclock_time_in_minutes": 60,
"minimap_databases": "/panfs/cguccion/23_06_25_Pangenome_Assembley/downloaded_fastqs/fastq_files/pangenome_individual_mmi",
"modules_to_load": ["fastp_0.20.1", "samtools_1.12",
" minimap2_2.18"],
"fastp_executable_path": "fastp",
"minimap2_executable_path": "minimap2",
"samtools_executable_path": "samtools",
"job_total_memory_limit": "20gb",
"job_pool_size": 30,
"job_max_array_length": 1000,
"known_adapters_path": "/Users/ccowart/NEW_QC/qp-knight-lab-processing/fastp_known_adapters_formatted.fna"
},
"seqpro": {
"seqpro_path": "seqpro",
"modules_to_load": []
Expand Down