.png)
When your automation workflows start to scale, latency becomes the bottleneck. Traditional synchronous process pipelines like:
step_one → step_two → step_three → ... → step_n
often suffer from two key delays:
A simple implementation looks like this:
.png)
all_data = get_all_data()
results = []
for item in all_data:
a = step_one(item)
b = step_two(a)
c = step_three(b)
results.append(c)
Processing millions of records sequentially like this leads to huge wait times, even if each step is fast.
Using multiprocessing, each process runs its own Python interpreter, bypassing the GIL:
POOL = create_pool(num_processes)
results = POOL.map(
lambda x: step_three(step_two(step_one(x))),
all_data
)
POOL.close()
Think of your pipeline as a factory line. Each step runs in its own process and passes data via queues. This means:
.png)
Time →
elem k [Step1]──►[Step2]──►[Step3]
elem k + 1 [Step1]──►[Step2]
elem k + 2 [Step1]
You can implement this with multiprocessing queues:
.png)
END = object()
def worker(step_fn, in_q, out_q):
while True:
item = in_q.get()
if item == END:
out_q.put(END)
break
out_q.put(step_fn(item))
# Setup queues and processes for each stage
Q0, Q1, Q2, Q3 = Queue(), Queue(), Queue(), Queue()
P1 = spawn_process(worker, args=(stage_one, Q0, Q1))
P2 = spawn_process(worker, args=(stage_two, Q1, Q2))
P3 = spawn_process(worker, args=(stage_three, Q2, Q3))
P4 = spawn_process(worker, args=(stage_four, Q3, output_sink))
# Feed data
for item in data_source:
Q0.put(item)
Q0.put(END)
# Drain results
def output_sink():
out_q = Queue()
spawn_process(worker, args=(stage_four, Q3, out_q))
while True:
result = out_q.get()
if result == END:
break
handle_output(result)
Single-record processing wastes overhead. Instead, process data in batches:
.png)
BATCH_SIZE = 1000
def chunked(iterable, n):
batch = []
for element in iterable:
batch.append(element)
if len(batch) == n:
yield batch
batch = []
if batch:
yield batch
def step_one_batch(batch): return [step_one(x) for x in batch]
# similarly for other steps
# Setup workers like before, but with batch functions
Batching leverages vectorized libraries like Pandas or NumPy, dramatically increasing throughput while reducing queue overhead.
+--------------------------------------+--------------------+------------+
| Technique | End to End runtime | CPU |
| | | Utilisation|
+--------------------------------------+--------------------+------------+
| Single-process loop | 4 h 17 m | 15 % |
| multiprocessing.Pool (8x) | 52 m | 85 % |
| Queued 4-stage pipeline (8x) | 27 m | 90 % |
| Batching + pipeline (8x) | 11 m | 95 % |
+--------------------------------------+--------------------+------------+
Takeaway: Combining pipeline parallelism with batching cuts runtime ~23× and maximizes CPU usage.
Adopt this pattern, and your “synchronous” pipeline will feel almost asynchronous, without sacrificing ordering guarantees. Practical guidance on orchestrating with Airflow, Astro, and Pub/Sub is coming soon.