PipeFunc: Build Lightning-Fast Pipelines with Python: DAGs Made Easy
Posted by basnijholt@reddit | Python | View on Reddit | 13 comments
Hey r/Python!
I'm excited to share pipefunc
(github.com/pipefunc/pipefunc), a Python library designed to make building and running complex computational workflows incredibly fast and easy. If you've ever dealt with intricate dependencies between functions, struggled with parallelization, or wished for a simpler way to create and manage DAG pipelines, pipefunc
is here to help.
What My Project Does:
pipefunc
empowers you to easily construct Directed Acyclic Graph (DAG) pipelines in Python. It handles:
- Automatic Dependency Resolution:
pipefunc
automatically determines the correct execution order of your functions, eliminating manual dependency management. - Lightning-Fast Execution: With minimal overhead (around 10 µs per function call),
pipefunc
ensures your pipelines run super fast. - Effortless Parallelization:
pipefunc
automatically parallelizes independent tasks, whether on your local machine or a SLURM cluster. It supports anyconcurrent.futures.Executor
! - Intuitive Visualization: Generate interactive graphs to visualize your pipeline's structure and understand data flow.
- Simplified Parameter Sweeps:
pipefunc
'smapspec
feature lets you easily define and run N-dimensional parameter sweeps, which is perfect for scientific computing, simulations, and hyperparameter tuning. - Resource Profiling: Gain insights into your pipeline's performance with detailed CPU, memory, and timing reports.
- Caching: Avoid redundant computations with multiple caching backends.
- Type Annotation Validation: Ensures type consistency across your pipeline to catch errors early.
- Error Handling: Includes an
ErrorSnapshot
feature to capture detailed information about errors, making debugging easier.
Target Audience:
pipefunc
is ideal for:
- Scientific Computing: Streamline simulations, data analysis, and complex computational workflows.
- Machine Learning: Build robust and reproducible ML pipelines, including data preprocessing, model training, and evaluation.
- Data Engineering: Create efficient ETL processes with automatic dependency management and parallel execution.
- HPC: Run
pipefunc
on a SLURM cluster with minimal changes to your code. - Anyone working with interconnected functions who wants to improve code organization, performance, and maintainability.
pipefunc
is designed for production use, but it's also a great tool for prototyping and experimentation.
Comparison:
- vs. Hamilton: Hamilton also compiles Python functions into DAGs, but it centers on column-level DataFrame engineering, ships modifiers like @with_columns/@extract_columns, and offers built-in data/schema validation plus an optional UI for lineage and observability; pipefunc leans toward low-overhead scientific/HPC pipelines, executor-agnostic parallelism, and N-D sweeps via mapspecs.
- vs. Dask:
pipefunc
offers a higher-level, more declarative way to define pipelines. It automatically manages task scheduling and execution based on your function definitions andmapspec
s, without requiring you to write explicit parallel code. - vs. Luigi/Airflow/Prefect/Kedro: While those tools excel at ETL and event-driven workflows,
pipefunc
focuses on scientific computing, simulations, and computational workflows where fine-grained control over execution and resource allocation is crucial. Also, it's way easier to setup and develop with, with minimal dependencies! - vs. Pandas: You can easily combine
pipefunc
withPandas
! Usepipefunc
to manage the execution ofPandas
operations and parallelize your data processing pipelines. But it also works well withPolars
,Xarray
, and other libraries! - vs. Joblib:
pipefunc
offers several advantages overJoblib
.pipefunc
automatically determines the execution order of your functions, generates interactive visualizations of your pipeline, profiles resource usage, and supports multiple caching backends. Also,pipefunc
allows you to specify the mapping between inputs and outputs usingmapspec
s, which enables complex map-reduce operations.
Examples:
Simple Example:
from pipefunc import pipefunc, Pipeline
@pipefunc(output_name="c")
def add(a, b):
return a + b
@pipefunc(output_name="d")
def multiply(b, c):
return b * c
pipeline = Pipeline([add, multiply])
result = pipeline("d", a=2, b=3) # Automatically executes 'add' first
print(result) # Output: 15
pipeline.visualize() # Visualize the pipeline
Parallel Example with mapspec
:
Parallelizes for all combinations of inputs a
and b
automatically!
import numpy as np
from pipefunc import pipefunc, Pipeline
from pipefunc.map import load_outputs
@pipefunc(output_name="c", mapspec="a[i], b[j] -> c[i, j]")
def f(a: int, b: int):
return a + b
@pipefunc(output_name="mean") # no mapspec, so receives 2D `c[:, :]`
def g(c: np.ndarray):
return np.mean(c)
pipeline = Pipeline([f, g])
inputs = {"a": [1, 2, 3], "b": [4, 5, 6]}
result_dict = pipeline.map(inputs, run_folder="my_run_folder", parallel=True)
result = load_outputs("mean", run_folder="my_run_folder") # can load now too
print(result) # Output: 7.0
Getting Started:
I'm exctited to hear your feedback and answer any questions you have. Give pipefunc
a try and let me know how it can improve your workflows!
killerfridge@reddit
I'll have a look on Monday, but have you explored kedro?
I_just_made@reddit
Looks interesting, but a package sending telemetry data by default is an automatic nope.
killerfridge@reddit
Yeah I don't like either, but it's easily turned off either by removing the plugin, or setting the consent:false in . telemetry (something that I have as default in my templates)
Global_Bar1754@reddit
Appreciate the quick start snippets. I see how you can enumerate different inputs from the top level pipeline using pipeline.map, but is there a way to achieve something similar within a single pipeline execution?
For example let’s say you want to compute some value for the entire US, and to do so you need to compute some values per state, which then get further processed to give you your US result. Hamilton provides the Parallelizable/Collect mechanism for this. Does your library provide a mechanism for this too?
Gullible_Carry1049@reddit
Since you are targeting scientific/hpc you should compare to tools like LLN maestro and cycl
erisd1182@reddit
I think the Snakemake comparison is fair. Maestro definitely was designed and meant to connect different applications at the shell level when HPC scheduling is involved. Additionally the specification in YAML is meant to capture minimal code users (or provide a harness that someone who wants to write code can easily tie into). This library did remind me of a package that I had heard of when I worked in the HPC simulation space called Parsl which is also decorator focused and has hooks to execute in bash or native Python. Might be worth a look for comparison.
As the creator of Maestro, it makes me happy to see it referenced. I didn’t think any folks on Reddit would know about it!
basnijholt@reddit (OP)
Thanks for those references, I took a quick look at both, I did not know them already. They seem to be a declarative way of running different bash commands defined in a DSL (in a YAML or TOML-based file). It reminds me a bit of Snakemake [which I compare in the docs](https://pipefunc.readthedocs.io/en/latest/faq/#workflow-definition-languages-e-g-snakemake).
PipeFunc is just Python code. To adapt it in your current workflows will just take writing a couple of lines of Python code (an explicit design goal of mine is to keep the amount of boilerplate to an absolute minimum) while not requiring you to change the structure of the code itself. So adapting it is pretty low risk because removing it just amounts to removing a couple of lines of `@pipefunc` decorators.
autodialerbroken116@reddit
To perhaps be a little modest here, how would contrast what your pipelining solutions lacks in comparison to enterprise grade MLops solutions like kubeflow, airflow, and other commercial options. Why should they use your solution?
Maybe more of interest, how did this project start? Collaborators? Pain points your work solves that didn't exist?
basnijholt@reddit (OP)
Great question!
It started from the desire of having something light-weight and flexible that doesn't dictate running things in new specific ways or adding a lot of boilerplate. The main use case was physics simulations that had to run anything from laptop to a large cluster with 100k CPU cores, without having to change anything.
Tools like AirFlow and Kubeflow are extremely heavy and add a lot of boilerplate and are not well suited for research environments. They're more for environments where the same code runs over and over again, either on a schedule or triggered by some event. In addition, these "enterprise" tools also are not meant to be run on computational clusters / supercomputers with traditional job schedulers like Slurm. If they support it, it is often an afterthought.
The original target audience is therefore researchers that might not be very well versed in software engineering practices and just want something that takes care of the parallelization, data saving, caching, and running of their code. However, since releasing it I have seen people build all kinds of things, from including it in [ML training pipelines](https://github.com/yaak-ai/rbyte) to web servers.
autodialerbroken116@reddit
Thanks for response.
In contrast to your question, please regard this as polite.
Kubernetes clusters are often used in lieu of a traditional grid scheduler like SGE or Slurm. In addition, they can utilize GPU passthrough via docker, and in some ways are more suitable to simulation workflows involving large job batches and/or large jobs consisting of dozens or hundreds of CPU cores.
I'm not saying you have to do this or match the caliber of kube clusters. But I would argue that there are some enterprise grade value on your system as well as some of the previously existing solutions. If you like the idea of giving scientists and engineers some horsepower without much boilerplate, tell them to throw their dependencies in a docker container.
I think your skill set sounds pretty great and I'd encourage you to help others in the way that you are. You're not alone in this area and perhaps consider some of this alt architecture.
Good luck and have fun!
basnijholt@reddit (OP)
It you are a
uv
user, run this:To install
pipefunc
in a temporary environment and instantly run the main tutorial in a Jupyter notebook! Nothing required, exceptuv
!pip_install_account@reddit
Interesting. How do you handle storage of data/artifacts? If a node(function) generates an image that 5 other functions will(possibly) need to load and process later, how does that work? or if two functions want to work on the same file, do you provide locks or some sort of mechanism to prevent them from modifying the same data? And do we handle non-persistent and persistent data the same way?
If I want to perform some database operations to gather some data, do I open a db connection inside the function?
basnijholt@reddit (OP)
Hi! There are different implementations of storage backends (like in memory, file-based, Zarr-based, and more), but in general they all operate on individual files. This way there is never any contention and locks are not needed. There's a little tree representing the files it creates:
You don't really need to be concerned about how these files are structured exactly because there are tools to load data directly. For example, you could use:
Here
results
is the same as what would be returned by: