diff --git a/docetl/runner.py b/docetl/runner.py index 6c46bdb1..7666e905 100644 --- a/docetl/runner.py +++ b/docetl/runner.py @@ -374,6 +374,27 @@ def execute_step( if self.intermediate_dir: self._save_checkpoint(step["name"], operation_name, input_data) + # Load existing step op hash, if exists, merge self.step_op_hashes[step["name"]][operation_name] into it + # Save the step op hash + intermediate_config_path = os.path.join( + self.intermediate_dir, ".docetl_intermediate_config.json" + ) + if os.path.exists(intermediate_config_path): + with open(intermediate_config_path, "r") as f: + existing_config = json.load(f) + else: + existing_config = {} + + if step["name"] not in existing_config: + existing_config[step["name"]] = {} + existing_config[step["name"]][operation_name] = self.step_op_hashes[ + step["name"] + ][operation_name] + + # Resave + with open(intermediate_config_path, "w") as f: + json.dump(existing_config, f, indent=2) + return input_data, total_cost def _load_from_checkpoint_if_exists( diff --git a/poetry.lock b/poetry.lock index c765dd0e..7522e482 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4006,21 +4006,6 @@ files = [ {file = "soupsieve-2.6.tar.gz", hash = "sha256:e2e68417777af359ec65daac1057404a3c8a5455bb8abc36f1a9866ab1a51abb"}, ] -[[package]] -name = "tenacity" -version = "9.0.0" -description = "Retry code until it succeeds" -optional = false -python-versions = ">=3.8" -files = [ - {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, - {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, -] - -[package.extras] -doc = ["reno", "sphinx"] -test = ["pytest", "tornado (>=4.5)", "typeguard"] - [[package]] name = "termcolor" version = "2.5.0" @@ -4518,4 +4503,4 @@ parsing = ["azure-ai-documentintelligence", "openpyxl", "paddleocr", "paddlepadd [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "6cfc3498d69d4df1672e110994e4791b4d163615f146f36f041a90dced5846ea" +content-hash = "492e352f2364559d37223d6c88069c6a9d586f2f08e8d02deef65ccde07a2194" diff --git a/pyproject.toml b/pyproject.toml index e8791a2b..16fbd376 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,6 @@ python = "^3.10" litellm = "^1.42.1" tqdm = "^4.66.4" rich = "^13.7.1" -tenacity = "^9.0.0" frozendict = "^2.4.4" diskcache = "^5.6.3" typer = "^0.12.5" diff --git a/tests/test_runner_caching.py b/tests/test_runner_caching.py index 42f02f2f..65b0fac8 100644 --- a/tests/test_runner_caching.py +++ b/tests/test_runner_caching.py @@ -1,3 +1,4 @@ +import shutil import time import pytest import json @@ -118,3 +119,75 @@ def test_pipeline_rerun_on_operation_change( # Check that the runtime is faster when not modifying assert unmodified_runtime < modified_runtime + + +# Test with an incorrect later operation but correct earlier operation +def test_partial_caching(temp_input_file, temp_output_file, temp_intermediate_dir): + # Create initial pipeline with two operations + initial_pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[ + MapOp( + name="first_operation", + type="map", + prompt="Analyze the sentiment of the following text: '{{ input.text }}'", + output={"schema": {"sentiment": "string"}}, + model="gpt-4o-mini", + ), + MapOp( + name="second_operation_bad", + type="map", + prompt="Summarize the following text: '{{ forororororo }}'", + output={"schema": {"summary": "1000"}}, + model="gpt-4o-mini", + ), + ], + steps=[ + PipelineStep( + name="first_step", + input="test_input", + operations=["first_operation", "second_operation_bad"], + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + # Run the initial pipeline + # Run the initial pipeline with an expected error + with pytest.raises(Exception): + initial_cost = initial_pipeline.run() + + new_pipeline_with_only_one_op = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[ + MapOp( + name="first_operation", + type="map", + prompt="Analyze the sentiment of the following text: '{{ input.text }}'", + output={"schema": {"sentiment": "string"}}, + model="gpt-4o-mini", + ), + ], + steps=[ + PipelineStep( + name="first_step", + input="test_input", + operations=["first_operation"], + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + rerun_cost = new_pipeline_with_only_one_op.run() + + # Assert that the cost was 0 when rerunning the pipeline + assert ( + rerun_cost == 0 + ), "Expected zero cost when rerunning the pipeline without changes"