8. Background Tasks

"Run slow operations in the background; the agent keeps thinking"

20 min read
πŸ’‘New to this?

What is a blocking operation?

A command that makes the program wait until it finishes before doing anything else. 'npm install' can take 2 minutes. With blocking execution, the agent sits idle the entire time β€” wasting wall-clock time and user patience.

What is a daemon thread?

A background thread that runs independently of the main program. When set as daemon=True in Python, it automatically stops when the main program exits, so you don't need to manage cleanup.

How does the agent learn a background task finished?

The background thread pushes a result into a shared queue. Before each LLM call, the agent drains that queue and injects any completed results as messages. The model reads them on its next turn.

The Problem

Some commands take minutes: npm install, pytest, docker build. With a blocking loop, the model sits idle waiting. If the user asks β€œinstall dependencies and while that runs, create the config file,” the agent does them sequentially, not in parallel.

The Solution

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]--[drain]--
             |          |                       ^
             v          v                       |
          [A runs]   [B runs]      (parallel)   |
             |          |                       |
             +----------+----- results injected-+

How It Works

  1. A shared queue collects completed background task results.
import threading
import subprocess
import queue

bg_queue: queue.Queue = queue.Queue()
bg_counter = {"n": 0}

def run_in_background(command: str, label: str = "") -> str:
    bg_counter["n"] += 1
    task_id = bg_counter["n"]
    label = label or f"bg-{task_id}"

    def worker():
        try:
            result = subprocess.run(
                command, shell=True, capture_output=True,
                text=True, timeout=300,
            )
            output = (result.stdout + result.stderr).strip()
            status = "done" if result.returncode == 0 else "failed"
        except subprocess.TimeoutExpired:
            output = "Timeout after 300s"
            status = "failed"
        bg_queue.put({
            "task_id": task_id,
            "label": label,
            "status": status,
            "output": output[:5000],
        })

    t = threading.Thread(target=worker, daemon=True)
    t.start()
    return f"Background task {task_id} ({label}) started. You'll be notified when done."
  1. Before each LLM call, drain the queue and inject completed results.
def drain_bg_queue(messages: list) -> list:
    results = []
    while not bg_queue.empty():
        completed = bg_queue.get_nowait()
        results.append({
            "type": "text",
            "text": (
                f"<background_complete>\n"
                f"Task {completed['task_id']} ({completed['label']}): "
                f"{completed['status']}\n"
                f"{completed['output']}\n"
                f"</background_complete>"
            ),
        })
    if results:
        messages.append({"role": "user", "content": results})
    return messages
  1. The main loop calls drain_bg_queue before each LLM call.
def agent_loop(messages: list):
    while True:
        messages = drain_bg_queue(messages)  # inject any completions
        response = client.messages.create(
            model=MODEL, system=SYSTEM,
            messages=messages, tools=TOOLS, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            return

        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = TOOL_HANDLERS[block.name](**block.input)
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        messages.append({"role": "user", "content": results})

What Changed From Tasks

ComponentBefore (Tasks)After (Background Tasks)
ExecutionSequential onlyParallel background tasks
WaitingBlocks agent loopAgent continues while bg runs
NotificationN/AQueue drain before each LLM call
ToolNonerun_in_background(command, label)

Key Takeaway

Background tasks are a concurrency pattern for the agent harness. The model doesn’t need to understand threads β€” it just calls run_in_background and receives a notification when the task completes. The queue drain is the key: it’s a single injection point that feeds completions back into the conversation at exactly the right moment.

Interactive Code Walkthrough

The Background Runner and Notification Queue
1bg_queue: queue.Queue = queue.Queue()
2bg_counter = {"n": 0}
3 
4def run_in_background(command: str, label: str = "") -> str:
5 bg_counter["n"] += 1
6 task_id = bg_counter["n"]
7 label = label or f"bg-{task_id}"
8 
9 def worker():
10 result = subprocess.run(
11 command, shell=True, capture_output=True,
12 text=True, timeout=300,
13 )
14 output = (result.stdout + result.stderr).strip()
15 status = "done" if result.returncode == 0 else "failed"
16 bg_queue.put({"task_id": task_id, "label": label,
17 "status": status, "output": output[:5000]})
18 
19 t = threading.Thread(target=worker, daemon=True)
20 t.start()
21 return f"Background task {task_id} ({label}) started."
22 
23def drain_bg_queue(messages: list) -> list:
24 results = []
25 while not bg_queue.empty():
26 completed = bg_queue.get_nowait()
27 results.append({"type": "text", "text": (
28 f"<background_complete>\nTask {completed['task_id']} "
29 f"({completed['label']}): {completed['status']}\n"
30 f"{completed['output']}\n</background_complete>"
31 )})
32 if results:
33 messages.append({"role": "user", "content": results})
34 return messages
35 
bg_queue is a thread-safe Queue shared between the main thread and all worker threads. bg_counter uses a dict (not an int) so worker closures can increment it by reference.
Step 1 of 5
πŸ§ͺ Try it yourself
πŸ”₯ Warm-up ~5 min

What's the difference between running a tool in the main thread vs. a background thread? When would background execution be worse than blocking?

Hint

Background is worse when the agent needs the result immediately to make its next decision.

πŸ”¨ Build ~20 min

Run a long build command in the background while the agent continues working. Verify that results appear in the next LLM call via the queue.

Hint

Use subprocess.run in a thread and queue.put() when it finishes.

πŸš€ Stretch ~45 min

Add timeout handling for background tasks: if a task runs longer than 60 seconds, kill the subprocess and inject a timeout error into the queue.

Hint

Use subprocess.run(timeout=60) with a try/except TimeoutExpired block.

Found a mistake? Report it β†’