In real-world systems, you rarely know the exact number of tasks or agents you'll need ahead of time.
How can we dynamically assign and parallelize tasks based on runtime conditions?
In Google's Agent Development Kit (ADK),
the answer is simple but powerful:
Dynamically create a ParallelAgent
inside a Custom Agent
, and use session.state
as your shared communication bus.
In this post, we'll cover:
✅ Why dynamic parallelism matters
✅ A minimal working example
✅ How to run and verify it
✅ Common pitfalls and best practices
✅ Extension ideas for scaling
🤔 Why Static Parallel Agents Aren't Enough
ADK provides several workflow primitives:
-
SequentialAgent
(sequential execution) ⏩ -
ParallelAgent
(parallel execution) 🔀 -
LoopAgent
(repetitive execution) 🔄
ParallelAgent
, in particular, is extremely lightweight —
it simply concurrently executes the given sub-agents.
Perfect for speeding up I/O-bound operations.
However, it has a key limitation:
✅ The list of child agents must be predefined at construction time.
That means:
- You can't dynamically change how many workers run
- You can't adapt to different task types at runtime
👉 To solve this, we build a fresh ParallelAgent
dynamically inside a Custom Agent
every time.
🗒️ Example: A 60-line Dynamic Fanout Pattern
Let's start with a minimal working example:
import random, secrets
from typing import ClassVar, List
from google.adk.events import Event, EventActions
from google.adk.agents import BaseAgent, ParallelAgent, SequentialAgent
from google.genai import types
class Worker(BaseAgent):
"""Simple worker that calculates n²."""
def __init__(self, *, name: str, run_id: str):
super().__init__(name=name); self._run_id = run_id
async def _run_async_impl(self, ctx):
n = ctx.session.state.get(f"task:{self._run_id}:{self.name}", 0)
result = n * n
yield Event(
author=self.name,
content=types.Content(role=self.name,
parts=[types.Part(text=f"{n}² = {result}")]),
actions=EventActions(
state_delta={f"result:{self._run_id}:{self.name}": result})
)
class PlannerAndRunner(BaseAgent):
"""Distributes tasks and dynamically creates a ParallelAgent."""
POOL: ClassVar[List[str]] = ["w0", "w1", "w2"]
async def _run_async_impl(self, ctx):
run_id = secrets.token_hex(2)
picked = random.sample(self.POOL,
k=random.randint(1, len(self.POOL)))
task_delta = {f"task:{run_id}:{name}": random.randint(1, 9)
for name in picked}
yield Event(
author=self.name,
content=types.Content(role=self.name,
parts=[types.Part(text=f"Run {run_id} tasks {task_delta}")]),
actions=EventActions(state_delta={"current_run": run_id, **task_delta})
)
parallel = ParallelAgent(
name=f"block_{run_id}",
sub_agents=[Worker(name=n, run_id=run_id) for n in picked]
)
async for ev in parallel.run_async(ctx):
yield ev
class Aggregator(BaseAgent):
"""Aggregates results from workers."""
async def _run_async_impl(self, ctx):
run_id = ctx.session.state.get("current_run")
vals = [v for k, v in ctx.session.state.items()
if run_id and k.startswith(f"result:{run_id}:")]
yield Event(
author=self.name,
content=types.Content(role=self.name,
parts=[types.Part(text=f"Sum = {sum(vals)}")]),
actions=EventActions(escalate=True)
)
root_agent = SequentialAgent(
name="root",
sub_agents=[PlannerAndRunner(name="planner"), Aggregator(name="collector")]
)
✅ This simple pipeline dynamically fans out tasks to random workers, runs them in parallel, and aggregates the results — all in about 60 lines!
🏃 Running It
Launch your agent using the standard adk run
command:
$ adk run .
Log setup complete: /tmp/agents_log/agent.20250427_122520.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root, type exit to exit.
When prompted (user:
), type anything (e.g., a
, b
) to trigger a new task round.
📒 Sample Output
Here's what you'll see:
user: a
[planner]: Run 84e9 tasks {'task:84e9:w0': 3, 'task:84e9:w1': 5}
[w1]: 5² = 25
[w0]: 3² = 9
[collector]: Sum = 34
user: b
[planner]: Run 35d1 tasks {'task:35d1:w1': 6, 'task:35d1:w0': 7, 'task:35d1:w2': 2}
[w1]: 6² = 36
[w0]: 7² = 49
[w2]: 2² = 4
[collector]: Sum = 89
user:
-
planner
logs which workers received tasks - Each
Worker
logs its result -
collector
sums up the results per round
All steps are cleanly traceable thanks to ADK's event logs! 📈
🧩 Common Pitfalls and Best Practices
❌ Reusing Worker Instances
In ADK,
an agent instance can only have one parent (Multi-Agent Systems docs).
➡ Always create fresh Worker
instances each time you build a new ParallelAgent
.
🗄️ Designing session.state Safely
Since all agents share the same session.state
,
prefix your keys (e.g., task:{run_id}:{worker_name}
) to avoid collisions.
🏷️ Declaring ClassVar Properly
Because ADK agents are Pydantic models internally,
you must use ClassVar
annotations for constants like POOL
.
from typing import ClassVar
class PlannerAndRunner(BaseAgent):
POOL: ClassVar[list[str]] = ["w0", "w1", "w2"]
Otherwise, Pydantic will treat them as model fields and raise an error.
📊 Comparing to Other Approaches
Approach | Limitations |
---|---|
Static ParallelAgent | Fixed set of children, no runtime flexibility |
LLM transfer_to_agent()
|
Routing is flexible but serialized and LLM errors can break it |
Manual asyncio.gather() | Breaks ADK's observability and session state management |
✅ By dynamically building ParallelAgents,
you get the best of both worlds:
true concurrency + native ADK observability!
🌟 Extensions and Next Steps
Goal | Idea |
---|---|
Support multiple task types | Pass operation type to Workers |
Summarize results with LLM | Replace Aggregator with an LlmAgent |
LLM-assisted task dispatching | Use LlmAgent to select workers dynamically |
Production deployment | Launch root agents with external triggers on Vertex AI |
📝 Final Thoughts
Dynamic parallelism with Custom Agents
lets you build scalable, flexible, and highly performant workflows in ADK.
As long as you:
- Handle Single-Parent Rule carefully
- Design a clean session.state schema
you can achieve powerful architectures —
without losing any of ADK's built-in tracing and UI support.
If you're building dynamic, data-driven agent pipelines,
this technique belongs in your toolbox. 🛠️
🔽 Bonus Links