diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index 80601ba8..65dd9bfd 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -25,7 +25,6 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.fastqc_path = fastqc_path self.queue_name = queue_name self.node_count = node_count - self.nprocs = nprocs self.wall_time_limit = wall_time_limit self.jmem = jmem self.qiita_job_id = qiita_job_id diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index dd8f0b96..8597eb91 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -21,7 +21,7 @@ class NuQCJob(Job): def __init__(self, fastq_root_dir, output_path, sample_sheet_path, minimap_database_paths, queue_name, node_count, wall_time_limit, jmem, fastp_path, minimap2_path, - samtools_path, modules_to_load, qiita_job_id, pool_size, + samtools_path, modules_to_load, qiita_job_id, max_array_length, known_adapters_path, bucket_size=8, length_limit=100, cores_per_task=4): """ @@ -41,7 +41,6 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, :param samtools_path: The path to the samtools executable :param modules_to_load: A list of Linux module names to load :param qiita_job_id: identify Torque jobs using qiita_job_id - :param pool_size: The number of jobs to process concurrently. :param known_adapters_path: The path to an .fna file of known adapters. :param bucket_size: the size in GB of each bucket to process :param length_limit: reads shorter than this will be discarded. @@ -68,7 +67,6 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self.minimap2_path = minimap2_path self.samtools_path = samtools_path self.qiita_job_id = qiita_job_id - self.pool_size = pool_size self.suffix = 'fastq.gz' # for projects that use sequence_processing_pipeline as a dependency, @@ -82,7 +80,17 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self.known_adapters_path = known_adapters_path self.max_file_list_size_in_gb = bucket_size self.length_limit = length_limit + + # NuQCJob() impl uses -c (--cores-per-task) switch instead of + # -n (--tasks-per-node). --cores-per-task requests the number of cpus + # per process. This is to support multithreaded jobs that require more + # than one cpu per task. All cores will be allocated on a single node. + # + # This is different than using -n + -N (number of nodes to request) + # because it's allowable to request more cores than are available on + # one node using this pair of switches (N nodes * n tasks per node). self.cores_per_task = cores_per_task + self.temp_dir = join(self.output_path, 'tmp') makedirs(self.temp_dir, exist_ok=True) diff --git a/sequence_processing_pipeline/configuration.json b/sequence_processing_pipeline/configuration.json index 4306ccb2..c5e7d54a 100644 --- a/sequence_processing_pipeline/configuration.json +++ b/sequence_processing_pipeline/configuration.json @@ -40,7 +40,7 @@ }, "nu-qc": { "nodes": 1, - "nprocs": 8, + "cpus_per_task": 8, "queue": "qiita", "wallclock_time_in_minutes": 240, "minimap2_databases": "/panfs/cguccion/23_06_25_Pangenome_Assembley/downloaded_fastqs/fastq_files/pangenome_individual_mmi", @@ -48,9 +48,12 @@ "fastp_executable_path": "fastp", "minimap2_executable_path": "minimap2", "samtools_executable_path": "samtools", - "job_total_memory_limit": "20gb", - "job_pool_size": 30, - "job_max_array_length": 1000 + "job_total_memory_limit": "20", + "job_max_array_length": 1000, + "known_adapters_path": "fastp_known_adapters_formatted.fna", + "bucket_size": 8, + "length_limit": 100, + "cores_per_task": 4 }, "seqpro": { "seqpro_path": "seqpro", diff --git a/sequence_processing_pipeline/tests/test_NuQCJob.py b/sequence_processing_pipeline/tests/test_NuQCJob.py index 1d9468bc..dd98274b 100644 --- a/sequence_processing_pipeline/tests/test_NuQCJob.py +++ b/sequence_processing_pipeline/tests/test_NuQCJob.py @@ -551,7 +551,7 @@ def test_nuqcjob_creation(self): NuQCJob(self.fastq_root_path, self.output_path, 'not/path/to/sample/sheet', self.mmi_db_paths, 'queue_name', 1, 1440, '8gb', - 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 30, + 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 1000, '') self.assertEqual(str(e.exception), "file 'not/path/to/sample/sheet' " @@ -567,7 +567,7 @@ def test_nuqcjob_creation(self): self.tmp_file_path, self.mmi_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], - self.qiita_job_id, 30, 1000, '') + self.qiita_job_id, 1000, '') self.assertFalse(nuqcjob is None) @@ -583,7 +583,7 @@ def test_nuqcjob_creation(self): self.tmp_file_path, self.mmi_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, - 30, 1000, '') + 1000, '') self.assertEqual(str(e.exception), ("Assay value 'NotMetagenomic' is " "not recognized.")) @@ -592,7 +592,7 @@ def test_nuqcjob_creation(self): NuQCJob(self.fastq_root_path, self.output_path, self.bad_sheet_bools_path, self.mmi_db_paths, 'queue_name', 1, 1440, '8gb', - 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 30, + 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 1000, '') self.assertEqual(str(e.exception), @@ -603,7 +603,7 @@ def test_assay_value(self): NuQCJob(self.fastq_root_path, self.output_path, self.bad_sample_sheet_path, self.mmi_db_paths, 'queue_name', 1, 1440, '8gb', - 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 30, + 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 1000, '') self.assertEqual(str(e.exception), "Assay value 'Metagenomics' is not" @@ -614,7 +614,7 @@ def test_audit(self): self.good_sample_sheet_path, self.mmi_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, - 30, 1000, '') + 1000, '') obs = job.audit(self.sample_ids) @@ -1044,7 +1044,7 @@ def test_completed_file_generation(self): self.good_sample_sheet_path, double_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, - 30, 1000, '') + 1000, '') my_path = ('sequence_processing_pipeline/tests/data/output_dir/' 'NuQCJob') @@ -1067,7 +1067,7 @@ def test_completed_file_generation_some_failures(self): self.good_sample_sheet_path, double_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, - 30, 1000, '') + 1000, '') # test _confirm_job_completed() fails when a .completed file isn't # manually created. @@ -1079,7 +1079,7 @@ def test_generate_job_script(self): self.good_sample_sheet_path, double_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, - 30, 1000, '') + 1000, '') job_script_path = job._generate_job_script() @@ -1216,7 +1216,7 @@ def test_regular_expressions(self): self.good_sample_sheet_path, double_db_paths, 'queue_name', 1, 1440, '8gb', 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, - 30, 1000, '') + 1000, '') # a sample of known valid fastq file-names plus a few edge-cases. good_names = ['11407-AAGTAGGAAGGA_S3249_L002_R1_001.fastq.gz',