Agent Foundry
All Problems

#98. Workflow Checkpointing

HardOrchestrationError Recovery

The Problem

Your data-processing pipeline has five steps: extract entities, classify document, summarize findings, generate recommendations, and format the report. When step 4 fails due to an API timeout, the entire pipeline restarts from step 1—wasting the work already done in steps 1–3 and doubling your LLM costs. Your job is to add checkpointing so the pipeline saves its state after each successful step and, on failure, resumes from the last checkpoint instead of starting over.

Examples

Example 1

Input: Q3 2024 sales data

Current (bad) output:

Attempt 1: Step 1 ✓, Step 2 ✓, Step 3 ✓, Step 4 ✗ (API timeout)
Attempt 2: Step 1 ✓ (repeated!), Step 2 ✓ (repeated!), Step 3 ✓ (repeated!), Step 4 ✓, Step 5 ✓
Total LLM calls: 8 (3 wasted)

Expected (good) output:

Attempt 1: Step 1 ✓ [checkpointed], Step 2 ✓ [checkpointed], Step 3 ✓ [checkpointed], Step 4 ✗ (API timeout)
Attempt 2: Resuming from step 4... Step 4 ✓ [checkpointed], Step 5 ✓
Total LLM calls: 6 (0 wasted)

Example 2

Input: Annual compliance report data

Current (bad) output: A failure at step 5 forces re-execution of all 5 steps, costing extra time and tokens.

Expected (good) output: Steps 1–4 are restored from checkpoints. Only step 5 re-executes. The final report is identical to what an uninterrupted run would produce.

Your Task

Modify the starter code so that:

  • The workflow checkpoints state after each successful step.
  • On failure, the workflow resumes from the last checkpoint, skipping already-completed steps.
  • Checkpoints are persistent (survive process restarts), not just in-memory.
  • The final output from a resumed run matches what an uninterrupted run would produce.

Evaluation

Submissions are checked for the following:

  • Checkpoints after each step: The workflow saves state after each successful step completion.
  • Resumes from failure point: On failure and retry, the workflow continues from the last successful step, not step 1.
  • Persistent checkpoints: Checkpoints survive process restarts, not just stored in memory.
  • Correct final result: The resumed workflow produces the same final output as an uninterrupted run.

Constraints

  • The workflow must checkpoint state after each successful step
  • On failure, the workflow must resume from the last successful checkpoint, not restart from step 1
  • Checkpoints must persist across process restarts (not just in-memory)
  • The resumed workflow must produce the same final result as an uninterrupted run
Starter Code
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import time

llm = ChatOpenAI(model="gpt-4o-mini")

# BUG: No checkpointing — if step 4 fails, the entire 5-step pipeline restarts from scratch
# TODO: Add checkpointing so the workflow resumes from the failed step
def run_pipeline(data: str) -> str:
    steps = [
        "Extract key entities from the data",
        "Classify the document type",
        "Summarize the main findings",
        "Generate recommendations based on findings",
        "Format the final report",
    ]
    results = []

    for i, step_instruction in enumerate(steps):
        print(f"Running step {i+1}: {step_instruction}")

        # Simulate occasional failure at step 4
        if i == 3 and not hasattr(run_pipeline, '_retried'):
            run_pipeline._retried = True
            raise RuntimeError(f"Step {i+1} failed: API timeout")

        context = "\n".join(results) if results else data
        response = llm.invoke([
            SystemMessage(content=f"Step {i+1}: {step_instruction}"),
            HumanMessage(content=context),
        ])
        results.append(f"Step {i+1}: {response.content}")

    return "\n\n".join(results)

# First run fails at step 4, second run restarts from step 1 (wasteful)
for attempt in range(2):
    print(f"\n=== Attempt {attempt+1} ===")
    try:
        result = run_pipeline("Q3 2024 sales data shows 15% growth in APAC region with declining margins in Europe.")
        print(f"\nFinal Result:\n{result}")
        break
    except RuntimeError as e:
        print(f"Failed: {e}")
Open in Google Colab
Evaluation Criteria0/4