Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapting code & tests to use NuQCJob #76

Merged
merged 18 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
468 changes: 468 additions & 0 deletions fastp_known_adapters_formatted.fna

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions qp_klp/Amplicon.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ def quality_control(self):
# Quality control for Amplicon runs occurs downstream.
# Do not perform QC at this time.

# Simulate QCJob's output directory for use as input into FastQCJob.
# Simulate NuQCJob's output directory for use as input into FastQCJob.
projects = self.pipeline.get_project_info()
projects = [x['project_name'] for x in projects]

for project_name in projects:
# copy the files from ConvertJob output to faked QCJob output
# folder: $WKDIR/$RUN_ID/QCJob/$PROJ_NAME/amplicon
# copy the files from ConvertJob output to faked NuQCJob output
# folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon
output_folder = join(self.pipeline.output_path,
'QCJob',
'NuQCJob',
project_name,
# for legacy purposes, output folders are
# either 'trimmed_sequences', 'amplicon', or
Expand Down Expand Up @@ -91,7 +91,7 @@ def generate_reports(self):
super()._generate_reports()
return None # amplicon doesn't need project names

def _get_data_type(self, prep_file_path):
def get_data_type(self, prep_file_path):
metadata = Step.parse_prep_file(prep_file_path, convert_to_dict=False)

if 'target_gene' in metadata.columns:
Expand Down
9 changes: 7 additions & 2 deletions qp_klp/Metagenomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ def convert_bcl_to_fastq(self):
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob')

def quality_control(self):
config = self.pipeline.configuration['qc']
config = self.pipeline.configuration['nu-qc']
job = super()._quality_control(config, self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob')
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'NuQCJob')

def generate_reports(self):
job = super()._generate_reports()
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob')

self.project_names = job.project_names

def get_data_type(self, prep_file_path):
# prep_file_path is unused. It's kept for compatability with Amplicon
# and Step.
return self.pipeline.pipeline_type

def generate_prep_file(self):
config = self.pipeline.configuration['seqpro']

Expand Down
123 changes: 91 additions & 32 deletions qp_klp/Step.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.Pipeline import Pipeline
from sequence_processing_pipeline.QCJob import QCJob
from sequence_processing_pipeline.NuQCJob import NuQCJob
from subprocess import Popen, PIPE
import pandas as pd
from glob import glob
Expand Down Expand Up @@ -85,6 +85,8 @@ def __init__(self, pipeline, master_qiita_job_id,

self.pipeline = pipeline
self.lane_number = lane_number
self.generated_artifact_name = \
f'{self.pipeline.run_id}_{self.lane_number}'
self.master_qiita_job_id = master_qiita_job_id

if status_update_callback is not None:
Expand Down Expand Up @@ -192,6 +194,9 @@ def generate_special_map(self, qclient):

self.special_map = special_map

def get_data_type(self, prep_file_path):
raise ValueError("get_data_type() not implemented for base-class.")

def update_prep_templates(self, qclient):
'''
Update prep-template info in Qiita. Get dict of prep-ids by study-id.
Expand Down Expand Up @@ -280,36 +285,38 @@ def _convert_bcl_to_fastq(self, config, input_file_path):

return convert_job

# CHARLIE
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
def _quality_control(self, config, input_file_path):
qc_job = QCJob(join(self.pipeline.output_path, 'ConvertJob'),
self.pipeline.output_path,
input_file_path,
config['minimap_databases'],
config['kraken2_database'],
config['queue'],
config['nodes'],
config['nprocs'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['fastp_executable_path'],
config['minimap2_executable_path'],
config['samtools_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
self.job_pool_size,
config['job_max_array_length'])

qc_job.run(callback=self.update_callback)

return qc_job
nuqc_job = NuQCJob(join(self.pipeline.output_path, 'ConvertJob'),
self.pipeline.output_path,
input_file_path,
config['minimap2_databases'],
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['fastp_executable_path'],
config['minimap2_executable_path'],
config['samtools_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['job_max_array_length'],
config['known_adapters_path'],
bucket_size=config['bucket_size'],
length_limit=config['length_limit'],
cores_per_task=config['cores_per_task'])

nuqc_job.run(callback=self.update_callback)

return nuqc_job

def _generate_reports(self):
config = self.pipeline.configuration['fastqc']
is_amplicon = self.pipeline.pipeline_type == Step.AMPLICON_TYPE
fastqc_job = FastQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
join(self.pipeline.output_path, 'ConvertJob'),
join(self.pipeline.output_path, 'QCJob'),
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['fastqc_executable_path'],
Expand All @@ -335,7 +342,7 @@ def _generate_prep_file(self, config, input_file_path, seqpro_path,
gpf_job = GenPrepFileJob(
self.pipeline.run_dir,
join(self.pipeline.output_path, 'ConvertJob'),
join(self.pipeline.output_path, 'QCJob'),
join(self.pipeline.output_path, 'NuQCJob'),
self.pipeline.output_path,
input_file_path,
seqpro_path,
Expand All @@ -360,13 +367,13 @@ def _helper_process_fastp_report_dirs(self):
for dir_name in dirs:
if dir_name == 'fastp_reports_dir':
# generate the full path for this directory before
# truncating everything up to the QCJob directory.
full_path = join(root, dir_name).split('QCJob/')
report_dirs.append(join('QCJob', full_path[1]))
# truncating everything up to the NuQCJob directory.
full_path = join(root, dir_name).split('NuQCJob/')
report_dirs.append(join('NuQCJob', full_path[1]))

if report_dirs:
report_dirs.sort()
return 'tar zcvf reports-QCJob.tgz ' + ' '.join(report_dirs)
return 'tar zcvf reports-NuQCJob.tgz ' + ' '.join(report_dirs)
else:
# It is okay to return an empty list of commands if reports_dirs
# is empty. Some pipelines do not generate fastp reports.
Expand All @@ -388,7 +395,7 @@ def _helper_process_operations(self):
REPORT_PREFIX = 'reports'
PREP_PREFIX = 'prep-files'
CONVERT_JOB = 'ConvertJob'
QC_JOB = 'QCJob'
QC_JOB = 'NuQCJob'
FASTQC_JOB = 'FastQCJob'
PREPFILE_JOB = 'GenPrepFileJob'
TAR_EXT = 'tgz'
Expand All @@ -400,7 +407,7 @@ def _helper_process_operations(self):
f'{REPORT_PREFIX}-{CONVERT_JOB}.{TAR_EXT}',
'OUTPUT_FIRST'),

(['QCJob/logs'], TAR_CMD,
(['NuQCJob/logs'], TAR_CMD,
f'{LOG_PREFIX}-{QC_JOB}.{TAR_EXT}', 'OUTPUT_FIRST'),

(['FastQCJob/logs'], TAR_CMD,
Expand Down Expand Up @@ -478,12 +485,12 @@ def _get_fastq_files(self, out_dir, project):
af = None
sub_folders = ['amplicon', 'filtered_sequences', 'trimmed_sequences']
for sub_folder in sub_folders:
sf = f'{out_dir}/QCJob/{project}/{sub_folder}'
sf = f'{out_dir}/NuQCJob/{project}/{sub_folder}'
if exists(sf):
af = [f for f in glob(f'{sf}/*.fastq.gz')]
break
if af is None or not af:
raise PipelineError("QCJob output not in expected location")
raise PipelineError("NuQCJob output not in expected location")

files = {'raw_barcodes': [], 'raw_forward_seqs': [],
'raw_reverse_seqs': []}
Expand Down Expand Up @@ -607,6 +614,58 @@ def load_preps_into_qiita(self, qclient):

return df

def _load_preps_into_qiita2(self, qclient, prep_id, qiita_id, out_dir,
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
project):
surl = f'{qclient._server_url}/study/description/{qiita_id}'
prep_url = (f'{qclient._server_url}/study/description/'
f'{qiita_id}?prep_id={prep_id}')

if self.pipeline.pipeline_type == Step.AMPLICON_TYPE:
files = self._get_files_amplicon(out_dir, project)
else:
atype = 'per_sample_FASTQ'
files = self._get_files_meta(out_dir, project, prep_id)

for f_type in files:
if not files[f_type]:
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
# if one or more of the expected list of reads is empty,
# raise an Error.
raise ValueError(f"'{f_type}' is empty")

# ideally we would use the email of the user that started the SPP
# run but at this point there is no easy way to retrieve it
pdata = {'user_email': 'qiita.help@gmail.com',
'prep_id': prep_id,
'artifact_type': atype,
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
'command_artifact_name': self.generated_artifact_name,
'files': dumps(files)}

job_id = qclient.post('/qiita_db/artifact/', data=pdata)['job_id']

return {'Project': project, 'Qiita Study ID': qiita_id,
'Qiita Prep ID': prep_id, 'Qiita URL': surl,
'Prep URL': prep_url, 'Linking JobID': job_id}

def load_preps_into_qiita2(self, qclient):
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
out_dir = self.pipeline.output_path

data = []
for project, _, qiita_id in self.special_map:
for prep_id in self.touched_studies_prep_info[qiita_id]:
data.append(self._load_preps_into_qiita(qclient,
prep_id,
qiita_id,
out_dir,
project))

df = pd.DataFrame(data)
opath = join(self.pipeline.output_path, 'touched_studies.html')
with open(opath, 'w') as f:
f.write(df.to_html(border=2, index=False, justify="left",
render_links=True, escape=False))

return df

def write_commands_to_output_path(self):
self.cmds_log_path = join(self.pipeline.output_path, 'cmds.log')
with open(self.cmds_log_path, 'w') as f:
Expand Down
Loading
Loading