I'll show you different levels of parallelism you can achieve to improve the performance of complex Machine Learning workflows.

Parallelism in ML pipelines

Map is a functional programming paradigm that allows you to apply a function to each element of a collection (e.g., a list) and return the output in a collection of the same type.
This can be very useful in ML pipelines, where multiple transformation steps are applied iteratively to the input dataset. Say you need to preprocess a text dataset represented as a list:

raw_texts = [
    "Hello! This is a SAMPLE text with UPPERCASE words.",
    "  Extra   spaces   AND 123 numbers...  ",
    "Another EXAMPLE with Punctuation!!!"
]

You may want to define a preprocessing function:

def preprocess_text(text):
    # Lowercase
    text = text.lower()
    # Remove special characters and numbers
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    return text

Then, you need to apply this function to the input text. Doing it sequentially would likely involve writing something like a for loop. While the output is the same, this can be time-consuming for large datasets, and processing it in parallel may be preferable.

Enter the map pattern.

# Apply preprocessing to all elements in the inputs list using map
preprocessed = list(map(preprocess_text, raw_texts))

Using this approach, we can complete preprocessing in parallel, which can be beneficial in ML pipelines where this step may happen multiple times.

But, how to do it more efficiently? How to distribute the load among multiple compute nodes? Enter Flyte Map Tasks.

Flyte MapTask

Flyte is a distributed computation framework that uses a Kubernetes Pod as the fundamental execution environment for each task in a pipeline. When you use MapTasks, Flyte automatically distributes the load among multiple Pods that run in parallel and limits each Pod to downloading and processing only a specific index from the inputs list, preventing inefficient duplicate data movement.

But how many Pods are created?

Let’s see.

In this simple example:

from flytekit import map_task, task, workflow

threshold = 11


@task
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold


@workflow
def map_workflow(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return map_task(detect_anomalies)(data_point=data)

We can see it creates one Pod per element in the inputs list:

K8s Pods created in the namespace of this execution

We said it’s an iterative process, so next time you have to do this, MapTasks again spin up 10 Pods. What if there are thousands or millions of inputs (like in many LLM input datasets)? What is the impact of booting up one container per element in the inputs list?

With the example above, this is the time it takes to complete the execution:

Union UI showing execution time for the map task defined above

20 seconds.
If we duplicate the list size, time starts piling up:

Union UI showing a new execution with more data taking more time
This is, for a 100% increase in the input dataset size, there was a 50% increase in total execution time.

What if you could reuse a specific number of Pods to mitigate this penalty?

Union Actors

This feature allows you to declare an execution environment and then reuse it through multiple executions to mitigate the impact of container boot-up times.

We define a slightly modified version of the previous example to run it in the Union platform using an input dataset with 100 items.
As you may imagine at this point, it would create 100 Pods.

To mitigate this workflow becoming a "noisy neighbour" in your cluster, you can specify the number of concurrent executions and, hence, the number of Pods that are created simultaneously at any point in time. In this example, we limit this number to 10:

from flytekit import map_task
import union
import random
threshold = 31

#Declare a container image
image = union.ImageSpec(
    packages=["union==0.1.168"],
    builder= "union",
)

@union.task(container_image=image)
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold

@union.workflow
def map_workflow(data: list[int] = random.sample(range(1,101), k=100)) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return map_task(detect_anomalies, concurrency=10)(data_point=data)

Execution takes 1 minute:

Union UI showing an execution that takes 1 minute

Now, if we define an Actors environment with 10 replicas, or 10 "reusable" Pods:

from flytekit import map_task
import union
import random

threshold = 101

image = union.ImageSpec(
    #registry="ghcr.io/davidmirror-ops/images",
    packages=["union==0.1.168"],
    builder= "union",
)
#Here we define the Actors settings
actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=10,
    ttl_seconds=30,
    requests=union.Resources(
        cpu="125m",
        mem="256Mi",
    ),
    container_image=image,
)

@actor.task
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold


@union.workflow
def map_workflow(data: list[int] = random.sample(range(1,101), k=100)) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return map_task(detect_anomalies)(data_point=data)

Execution time is 22 seconds:

Union UI showing an execution that takes 22 seconds

That's a 63% decrease in execution time, enabling much higher iteration velocity and more efficient resource consumption.

Conclusion

Modern ML workloads are typically designed to process in parallel big datasets. That carries a performance penalty due to the latency of booting up a container. Flyte let's you achieve efficient parallel processing but Union Actors take it to the next level by also removing the limitation of one Pod per input item, allowing faster and more efficient executions.


Signup for Union's free tier
Check out the repo
Questions? Join us on Slack!