PipeFunc: Build Lightning-Fast Pipelines with Python: DAGs Made Easy

Posted by basnijholt@reddit | Python | View on Reddit | 13 comments

Hey r/Python!

I'm excited to share pipefunc (github.com/pipefunc/pipefunc), a Python library designed to make building and running complex computational workflows incredibly fast and easy. If you've ever dealt with intricate dependencies between functions, struggled with parallelization, or wished for a simpler way to create and manage DAG pipelines, pipefunc is here to help.

What My Project Does:

pipefunc empowers you to easily construct Directed Acyclic Graph (DAG) pipelines in Python. It handles:

  1. Automatic Dependency Resolution: pipefunc automatically determines the correct execution order of your functions, eliminating manual dependency management.
  2. Lightning-Fast Execution: With minimal overhead (around 10 µs per function call), pipefunc ensures your pipelines run super fast.
  3. Effortless Parallelization: pipefunc automatically parallelizes independent tasks, whether on your local machine or a SLURM cluster. It supports any concurrent.futures.Executor!
  4. Intuitive Visualization: Generate interactive graphs to visualize your pipeline's structure and understand data flow.
  5. Simplified Parameter Sweeps: pipefunc's mapspec feature lets you easily define and run N-dimensional parameter sweeps, which is perfect for scientific computing, simulations, and hyperparameter tuning.
  6. Resource Profiling: Gain insights into your pipeline's performance with detailed CPU, memory, and timing reports.
  7. Caching: Avoid redundant computations with multiple caching backends.
  8. Type Annotation Validation: Ensures type consistency across your pipeline to catch errors early.
  9. Error Handling: Includes an ErrorSnapshot feature to capture detailed information about errors, making debugging easier.

Target Audience:

pipefunc is ideal for:

pipefunc is designed for production use, but it's also a great tool for prototyping and experimentation.

Comparison:

Examples:

Simple Example:

from pipefunc import pipefunc, Pipeline

@pipefunc(output_name="c")
def add(a, b):
    return a + b

@pipefunc(output_name="d")
def multiply(b, c):
    return b * c

pipeline = Pipeline([add, multiply])
result = pipeline("d", a=2, b=3)  # Automatically executes 'add' first
print(result)  # Output: 15

pipeline.visualize() # Visualize the pipeline

Parallel Example with mapspec:

Parallelizes for all combinations of inputs a and b automatically!

import numpy as np
from pipefunc import pipefunc, Pipeline
from pipefunc.map import load_outputs

@pipefunc(output_name="c", mapspec="a[i], b[j] -> c[i, j]")
def f(a: int, b: int):
    return a + b

@pipefunc(output_name="mean") # no mapspec, so receives 2D `c[:, :]`
def g(c: np.ndarray):
    return np.mean(c)

pipeline = Pipeline([f, g])
inputs = {"a": [1, 2, 3], "b": [4, 5, 6]}
result_dict = pipeline.map(inputs, run_folder="my_run_folder", parallel=True)
result = load_outputs("mean", run_folder="my_run_folder") # can load now too
print(result)  # Output: 7.0

Getting Started:

I'm exctited to hear your feedback and answer any questions you have. Give pipefunc a try and let me know how it can improve your workflows!