8. Background Tasks
"Run slow operations in the background; the agent keeps thinking"
New to this?
What is a blocking operation?
A command that makes the program wait until it finishes before doing anything else. 'npm install' can take 2 minutes. With blocking execution, the agent sits idle the entire time β wasting wall-clock time and user patience.
What is a daemon thread?
A background thread that runs independently of the main program. When set as daemon=True in Python, it automatically stops when the main program exits, so you don't need to manage cleanup.
How does the agent learn a background task finished?
The background thread pushes a result into a shared queue. Before each LLM call, the agent drains that queue and injects any completed results as messages. The model reads them on its next turn.
The Problem
Some commands take minutes: npm install, pytest, docker build. With a blocking loop, the model sits idle waiting. If the user asks βinstall dependencies and while that runs, create the config file,β the agent does them sequentially, not in parallel.
The Solution
Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | subprocess runs |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+
Timeline:
Agent --[spawn A]--[spawn B]--[other work]--[drain]--
| | ^
v v |
[A runs] [B runs] (parallel) |
| | |
+----------+----- results injected-+
How It Works
- A shared queue collects completed background task results.
import threading
import subprocess
import queue
bg_queue: queue.Queue = queue.Queue()
bg_counter = {"n": 0}
def run_in_background(command: str, label: str = "") -> str:
bg_counter["n"] += 1
task_id = bg_counter["n"]
label = label or f"bg-{task_id}"
def worker():
try:
result = subprocess.run(
command, shell=True, capture_output=True,
text=True, timeout=300,
)
output = (result.stdout + result.stderr).strip()
status = "done" if result.returncode == 0 else "failed"
except subprocess.TimeoutExpired:
output = "Timeout after 300s"
status = "failed"
bg_queue.put({
"task_id": task_id,
"label": label,
"status": status,
"output": output[:5000],
})
t = threading.Thread(target=worker, daemon=True)
t.start()
return f"Background task {task_id} ({label}) started. You'll be notified when done."
- Before each LLM call, drain the queue and inject completed results.
def drain_bg_queue(messages: list) -> list:
results = []
while not bg_queue.empty():
completed = bg_queue.get_nowait()
results.append({
"type": "text",
"text": (
f"<background_complete>\n"
f"Task {completed['task_id']} ({completed['label']}): "
f"{completed['status']}\n"
f"{completed['output']}\n"
f"</background_complete>"
),
})
if results:
messages.append({"role": "user", "content": results})
return messages
- The main loop calls
drain_bg_queuebefore each LLM call.
def agent_loop(messages: list):
while True:
messages = drain_bg_queue(messages) # inject any completions
response = client.messages.create(
model=MODEL, system=SYSTEM,
messages=messages, tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
output = TOOL_HANDLERS[block.name](**block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
What Changed From Tasks
| Component | Before (Tasks) | After (Background Tasks) |
|---|---|---|
| Execution | Sequential only | Parallel background tasks |
| Waiting | Blocks agent loop | Agent continues while bg runs |
| Notification | N/A | Queue drain before each LLM call |
| Tool | None | run_in_background(command, label) |
Key Takeaway
Background tasks are a concurrency pattern for the agent harness. The model doesnβt need to understand threads β it just calls run_in_background and receives a notification when the task completes. The queue drain is the key: itβs a single injection point that feeds completions back into the conversation at exactly the right moment.
Interactive Code Walkthrough
1bg_queue: queue.Queue = queue.Queue()2bg_counter = {"n": 0}3 4def run_in_background(command: str, label: str = "") -> str:5 bg_counter["n"] += 16 task_id = bg_counter["n"]7 label = label or f"bg-{task_id}"8 9 def worker():10 result = subprocess.run(11 command, shell=True, capture_output=True,12 text=True, timeout=300,13 )14 output = (result.stdout + result.stderr).strip()15 status = "done" if result.returncode == 0 else "failed"16 bg_queue.put({"task_id": task_id, "label": label,17 "status": status, "output": output[:5000]})18 19 t = threading.Thread(target=worker, daemon=True)20 t.start()21 return f"Background task {task_id} ({label}) started."22 23def drain_bg_queue(messages: list) -> list:24 results = []25 while not bg_queue.empty():26 completed = bg_queue.get_nowait()27 results.append({"type": "text", "text": (28 f"<background_complete>\nTask {completed['task_id']} "29 f"({completed['label']}): {completed['status']}\n"30 f"{completed['output']}\n</background_complete>"31 )})32 if results:33 messages.append({"role": "user", "content": results})34 return messages35 bg_queue is a thread-safe Queue shared between the main thread and all worker threads. bg_counter uses a dict (not an int) so worker closures can increment it by reference.What's the difference between running a tool in the main thread vs. a background thread? When would background execution be worse than blocking?
Hint
Background is worse when the agent needs the result immediately to make its next decision.
Run a long build command in the background while the agent continues working. Verify that results appear in the next LLM call via the queue.
Hint
Use subprocess.run in a thread and queue.put() when it finishes.
Add timeout handling for background tasks: if a task runs longer than 60 seconds, kill the subprocess and inject a timeout error into the queue.
Hint
Use subprocess.run(timeout=60) with a try/except TimeoutExpired block.