In real-world systems, you rarely know the exact number of tasks or agents you'll need ahead of time.

How can we dynamically assign and parallelize tasks based on runtime conditions?

In Google's Agent Development Kit (ADK),

the answer is simple but powerful:

Dynamically create a ParallelAgent inside a Custom Agent, and use session.state as your shared communication bus.

In this post, we'll cover:

✅ Why dynamic parallelism matters

✅ A minimal working example

✅ How to run and verify it

✅ Common pitfalls and best practices

✅ Extension ideas for scaling


🤔 Why Static Parallel Agents Aren't Enough

ADK provides several workflow primitives:

  • SequentialAgent (sequential execution) ⏩
  • ParallelAgent (parallel execution) 🔀
  • LoopAgent (repetitive execution) 🔄

ParallelAgent, in particular, is extremely lightweight —

it simply concurrently executes the given sub-agents.

Perfect for speeding up I/O-bound operations.

However, it has a key limitation:

The list of child agents must be predefined at construction time.

That means:

  • You can't dynamically change how many workers run
  • You can't adapt to different task types at runtime

👉 To solve this, we build a fresh ParallelAgent dynamically inside a Custom Agent every time.


🗒️ Example: A 60-line Dynamic Fanout Pattern

Let's start with a minimal working example:

import random, secrets
from typing import ClassVar, List
from google.adk.events import Event, EventActions
from google.adk.agents import BaseAgent, ParallelAgent, SequentialAgent
from google.genai import types

class Worker(BaseAgent):
    """Simple worker that calculates n²."""
    def __init__(self, *, name: str, run_id: str):
        super().__init__(name=name); self._run_id = run_id
    async def _run_async_impl(self, ctx):
        n = ctx.session.state.get(f"task:{self._run_id}:{self.name}", 0)
        result = n * n
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                                  parts=[types.Part(text=f"{n}² = {result}")]),
            actions=EventActions(
                state_delta={f"result:{self._run_id}:{self.name}": result})
        )

class PlannerAndRunner(BaseAgent):
    """Distributes tasks and dynamically creates a ParallelAgent."""
    POOL: ClassVar[List[str]] = ["w0", "w1", "w2"]
    async def _run_async_impl(self, ctx):
        run_id = secrets.token_hex(2)
        picked = random.sample(self.POOL,
                               k=random.randint(1, len(self.POOL)))
        task_delta = {f"task:{run_id}:{name}": random.randint(1, 9)
                      for name in picked}
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                   parts=[types.Part(text=f"Run {run_id} tasks {task_delta}")]),
            actions=EventActions(state_delta={"current_run": run_id, **task_delta})
        )
        parallel = ParallelAgent(
            name=f"block_{run_id}",
            sub_agents=[Worker(name=n, run_id=run_id) for n in picked]
        )
        async for ev in parallel.run_async(ctx):
            yield ev

class Aggregator(BaseAgent):
    """Aggregates results from workers."""
    async def _run_async_impl(self, ctx):
        run_id = ctx.session.state.get("current_run")
        vals = [v for k, v in ctx.session.state.items()
                if run_id and k.startswith(f"result:{run_id}:")]
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                   parts=[types.Part(text=f"Sum = {sum(vals)}")]),
            actions=EventActions(escalate=True)
        )

root_agent = SequentialAgent(
    name="root",
    sub_agents=[PlannerAndRunner(name="planner"), Aggregator(name="collector")]
)

✅ This simple pipeline dynamically fans out tasks to random workers, runs them in parallel, and aggregates the results — all in about 60 lines!


🏃 Running It

Launch your agent using the standard adk run command:

$ adk run .
Log setup complete: /tmp/agents_log/agent.20250427_122520.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root, type exit to exit.

When prompted (user:), type anything (e.g., a, b) to trigger a new task round.


📒 Sample Output

Here's what you'll see:

user: a
[planner]: Run 84e9 tasks {'task:84e9:w0': 3, 'task:84e9:w1': 5}
[w1]: 5² = 25
[w0]: 3² = 9
[collector]: Sum = 34
user: b
[planner]: Run 35d1 tasks {'task:35d1:w1': 6, 'task:35d1:w0': 7, 'task:35d1:w2': 2}
[w1]: 6² = 36
[w0]: 7² = 49
[w2]: 2² = 4
[collector]: Sum = 89
user:
  • planner logs which workers received tasks
  • Each Worker logs its result
  • collector sums up the results per round

All steps are cleanly traceable thanks to ADK's event logs! 📈


🧩 Common Pitfalls and Best Practices

❌ Reusing Worker Instances

In ADK,

an agent instance can only have one parent (Multi-Agent Systems docs).

➡ Always create fresh Worker instances each time you build a new ParallelAgent.

🗄️ Designing session.state Safely

Since all agents share the same session.state,

prefix your keys (e.g., task:{run_id}:{worker_name}) to avoid collisions.

🏷️ Declaring ClassVar Properly

Because ADK agents are Pydantic models internally,

you must use ClassVar annotations for constants like POOL.

from typing import ClassVar
class PlannerAndRunner(BaseAgent):
    POOL: ClassVar[list[str]] = ["w0", "w1", "w2"]

Otherwise, Pydantic will treat them as model fields and raise an error.


📊 Comparing to Other Approaches

Approach Limitations
Static ParallelAgent Fixed set of children, no runtime flexibility
LLM transfer_to_agent() Routing is flexible but serialized and LLM errors can break it
Manual asyncio.gather() Breaks ADK's observability and session state management

✅ By dynamically building ParallelAgents,

you get the best of both worlds:

true concurrency + native ADK observability!


🌟 Extensions and Next Steps

Goal Idea
Support multiple task types Pass operation type to Workers
Summarize results with LLM Replace Aggregator with an LlmAgent
LLM-assisted task dispatching Use LlmAgent to select workers dynamically
Production deployment Launch root agents with external triggers on Vertex AI

📝 Final Thoughts

Dynamic parallelism with Custom Agents

lets you build scalable, flexible, and highly performant workflows in ADK.

As long as you:

  • Handle Single-Parent Rule carefully
  • Design a clean session.state schema

you can achieve powerful architectures —

without losing any of ADK's built-in tracing and UI support.

If you're building dynamic, data-driven agent pipelines,

this technique belongs in your toolbox. 🛠️


🔽 Bonus Links