Skip to content

Commit

Permalink
Updates based on testing in qiita-rc
Browse files Browse the repository at this point in the history
  • Loading branch information
charles-cowart committed Sep 17, 2024
1 parent d347217 commit 0236403
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 15 deletions.
5 changes: 4 additions & 1 deletion sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
# is now the following:
# add one more level to account for project_names nested under ConvertJob
# dir.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')
# this will ignore the _I1_ reads that appear in the integrated result.
fastq_paths = glob.glob(data_location_path + '/*/*_R?_001.fastq.gz')

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
Expand Down Expand Up @@ -114,6 +115,8 @@ def demux(id_map, fp, out_d, task, maxtask):
qual = iter(fp)

for i, s, d, q in zip(id_, seq, dumb, qual):
# NB: This appears to not be causing the removal of the metadata
# either.
fname_encoded, id_ = i.split(delimiter, 1)

if fname_encoded not in openfps:
Expand Down
1 change: 0 additions & 1 deletion sequence_processing_pipeline/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import logging
from inspect import stack
import re
from time import time


class Job:
Expand Down
22 changes: 12 additions & 10 deletions sequence_processing_pipeline/TRConvertJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name,
self.cloudspades_path = "/home/qiita_test/qiita-spots/spades-cloudspades-0.1"
self.cloudspades_wall_time_limit = "24:00:00"
self.counts_cores_per_task = "1"
self.counts_create_picklist_path = "/home/qiita_test/qiita-spots/create_picklist.py",
self.counts_create_picklist_path = "/home/qiita_test/qiita-spots/create_picklist.py"
self.counts_mem_in_gb = "8"
self.counts_node_count = "1"
self.counts_other_file = '20230906_FS10001773_68_BTR67708-1611.read_counts.tsv'
Expand Down Expand Up @@ -154,6 +154,8 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name,
self.main_reference_base = ""
self.main_reference_map = ""

self._generate_job_scripts()

def _generate_job_scripts(self):
scripts = [
{
Expand Down Expand Up @@ -189,7 +191,7 @@ def _generate_job_scripts(self):
"wall_time_limit": self.integrate_wall_time_limit,
"mem_in_gb": self.integrate_mem_in_gb,
"node_count": self.integrate_node_count,
"cores_per_task": self.integtrate_cores_per_task,
"cores_per_task": self.integrate_cores_per_task,
"iinp_script_path": self.integrate_indicies_script_path,
"queue_name": self.queue_name
}
Expand Down Expand Up @@ -368,7 +370,7 @@ def run(self, callback=None):

# Get a list of Slurm job ids that we need to wait on and text
# descriptions of what they are.
jids = [(results[x[2], x[0]]) for x in child_processes if
jids = [(results[x[2]], x[0]) for x in child_processes if
x[2] in results]

# ensure the jids are casted to integers before passing them.
Expand All @@ -377,7 +379,7 @@ def run(self, callback=None):
for (jid, description), status in zip(jids, statuses):
if status not in Job.slurm_status_successful:
raise PipelineError(f"process '{description}' ({jid}) "
f"failed ({status}")
f"failed ({status})")

# post-process working directory to make it appear like results
# generated by ConvertJob
Expand Down Expand Up @@ -412,7 +414,7 @@ def run(self, callback=None):
for root, dirs, files in walk(integrated_files_path):
for _file in files:
fastq_file = join(root, _file)
self._post_process_file(fastq_file, self.lane, self.mapping)
self._post_process_file(fastq_file, self.mapping)

# move project folders from integrated directory to working_dir.
contents = listdir(integrated_files_path)
Expand All @@ -430,7 +432,7 @@ def parse_logs(self):
def parse_job_script(job_script_path):
raise PipelineError("parsing job script not implemented.")

def _post_process_file(self, fastq_file, lane, mapping):
def _post_process_file(self, fastq_file, mapping):
# generate names of the form generated by bcl-convert/bcl2fastq:
# <Sample_ID>_S#_L00#_<R# or I#>_001.fastq.gz
# see:
Expand All @@ -455,10 +457,10 @@ def _post_process_file(self, fastq_file, lane, mapping):

# generate the new filename for the fastq file, and reorganize the
# files by project.
new_name = "%s_S%d_L%s_%s_001.fastq.gz" % (sample_name,
sample_index,
str(lane).zfill(3),
read_type)
new_name = "%s_S%d_%s_%s_001.fastq.gz" % (sample_name,
sample_index,
self.lane,
read_type)

# ensure that the project directory exists before we rename and move
# the file to that location.
Expand Down
2 changes: 1 addition & 1 deletion sequence_processing_pipeline/tests/test_ConvertJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ def test_error_msg_from_logs(self):

# an internal method to force submit_job() to raise a JobFailedError
# instead of submitting the job w/sbatch and waiting for a failed
# job w/sacct.
# job w/squeue.
self.assertTrue(job._toggle_force_job_fail())

error_msg = ("This job died.\n2024-01-01T12:12:12Z thread 99999 ERROR:"
Expand Down
2 changes: 1 addition & 1 deletion sequence_processing_pipeline/tests/test_FastQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ def test_error_msg_from_logs(self):

# an internal method to force submit_job() to raise a JobFailedError
# instead of submitting the job w/sbatch and waiting for a failed
# job w/sacct.
# job w/squeue.
self.assertTrue(job._toggle_force_job_fail())

try:
Expand Down
2 changes: 1 addition & 1 deletion sequence_processing_pipeline/tests/test_NuQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ def test_error_msg_from_logs(self):

# an internal method to force submit_job() to raise a JobFailedError
# instead of submitting the job w/sbatch and waiting for a failed
# job w/sacct.
# job w/squeue.
self.assertTrue(job._toggle_force_job_fail())

try:
Expand Down

0 comments on commit 0236403

Please sign in to comment.