Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pipeline processor instances only once. #293

Merged
merged 1 commit into from
Nov 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions datalad_metalad/conduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
NB: Individual elements are instantiated once and reused in the individual
parallel executions.
"""
from __future__ import annotations

import concurrent.futures
import logging
import traceback
Expand Down Expand Up @@ -255,19 +257,11 @@ def __call__(
**evaluated_constructor_args[provider_name]
})

processor_instances = [
get_class_instance(spec)(
**{
**spec["arguments"],
**evaluated_constructor_args[spec["name"]]
}
)
for index, spec in enumerate(conduct_configuration["processors"])]

if processing_mode == "sequential":
yield from process_sequential(
provider_instance,
processor_instances,
conduct_configuration["processors"],
evaluated_constructor_args,
consumer_instance)
return
elif processing_mode == "thread":
Expand All @@ -278,16 +272,18 @@ def __call__(
raise ValueError(f"unsupported processing mode: {processing_mode}")

yield from process_parallel(
executor,
provider_instance,
processor_instances,
consumer_instance)
executor,
provider_instance,
conduct_configuration["processors"],
evaluated_constructor_args,
consumer_instance)


def process_parallel(executor,
provider_instance: Provider,
processor_instances: List[Processor],
consumer_instance: Optional[Consumer] = None
processor_specs: list[dict],
evaluated_constructor_args: dict,
consumer_instance: Consumer | None = None
) -> Iterable:

running = set()
Expand All @@ -297,7 +293,8 @@ def process_parallel(executor,
# and feeds the result of every pipeline into the consumer.
for pipeline_data in provider_instance.next_object():

if not processor_instances:
# Handle the "provider-only" case
if not processor_specs:
path = pipeline_data.get_result("path")
yield dict(
action="meta_conduct",
Expand All @@ -307,10 +304,13 @@ def process_parallel(executor,
pipeline_data=pipeline_data.to_json())
continue

lgr.debug(f"Starting {processor_instances[0]} on {pipeline_data}")
lgr.debug(f"Starting new instance of {processor_specs[0]} on {pipeline_data}")
processor = create_processor_instance(
processor_specs[0],
evaluated_constructor_args)
running.add(
executor.submit(
processor_instances[0].execute,
processor.execute,
-1,
pipeline_data))

Expand All @@ -330,7 +330,7 @@ def process_parallel(executor,
lgr.debug(
f"Processor[{source_index}] returned {pipeline_data} "
f"[provider not yet exhausted]")
if next_index >= len(processor_instances):
if next_index >= len(processor_specs):
if consumer_instance:
pipeline_data = consumer_instance.consume(pipeline_data)

Expand All @@ -346,9 +346,12 @@ def process_parallel(executor,
lgr.debug(
f"Starting processor[{next_index}]"
f"[provider not yet exhausted]")
processor = create_processor_instance(
processor_specs[next_index],
evaluated_constructor_args)
running.add(
executor.submit(
processor_instances[next_index].execute,
processor.execute,
this_index,
pipeline_data))

Expand Down Expand Up @@ -377,7 +380,7 @@ def process_parallel(executor,
lgr.debug(
f"Processor[{source_index}] returned {pipeline_data}")

if next_index >= len(processor_instances):
if next_index >= len(processor_specs):
if consumer_instance:
pipeline_data = consumer_instance.consume(pipeline_data)
lgr.debug(
Expand All @@ -396,9 +399,13 @@ def process_parallel(executor,
lgr.debug(
f"Handing pipeline data {pipeline_data} to"
f"processor[{next_index}]")
processor = create_processor_instance(
spec=processor_specs[next_index],
evaluated_constructor_args=evaluated_constructor_args
)
running.add(
executor.submit(
processor_instances[next_index].execute,
processor.execute,
this_index,
pipeline_data))

Expand All @@ -413,20 +420,25 @@ def process_parallel(executor,


def process_sequential(provider_instance: Provider,
processor_instances: List[Processor],
consumer_instance: Optional[Consumer]) -> Iterable:
processor_specs: list[dict],
evaluated_constructor_args: dict,
consumer_instance: Consumer | None = None,
) -> Iterable:

for pipeline_data in provider_instance.next_object():
lgr.debug(f"Provider yielded: {pipeline_data}")
yield from process_downstream(
pipeline_data=pipeline_data,
processor_instances=processor_instances,
processor_specs=processor_specs,
evaluated_constructor_args=evaluated_constructor_args,
consumer_instance=consumer_instance)


def process_downstream(pipeline_data: PipelineData,
processor_instances: List[Processor],
consumer_instance: Optional[Consumer]) -> Iterable:
processor_specs: list[dict],
evaluated_constructor_args: dict,
consumer_instance: Consumer | None,
) -> Iterable:

if pipeline_data.state == PipelineDataState.STOP:
path = pipeline_data.get_result("path")
Expand All @@ -445,7 +457,10 @@ def process_downstream(pipeline_data: PipelineData,
yield datalad_result
return

for processor in processor_instances:
for processor_spec in processor_specs:
processor = create_processor_instance(
processor_spec,
evaluated_constructor_args)
try:
_, pipeline_data = processor.execute(None, pipeline_data)
except Exception as exc:
Expand Down Expand Up @@ -517,3 +532,14 @@ def evaluate_constructor_args(class_instance: Dict[str, Type[PipelineElement]],
value = class_instance.get_keyword_arg_value(keyword, value)
result[element_name][keyword] = value
return result


def create_processor_instance(spec: dict,
evaluated_constructor_args: dict
) -> type(Processor):
return get_class_instance(spec)(
**{
**spec["arguments"],
**evaluated_constructor_args[spec["name"]]
}
)