Ergonomic Concurrency
Posted by kwargs_@reddit | Python | View on Reddit | 15 comments
Project name: Pipevine
Project link: https://github.com/arrno/pipevine
What My Project Does
Pipevine is a lightweight async pipeline and worker-pool library for Python.
It helps you compose concurrent dataflows with backpressure, retries, and cancellation.. without all the asyncio boilerplate.
Target Audience
Developers who work with data pipelines, streaming, or CPU/IO-bound workloads in Python.
It’s designed to be production-ready but lightweight enough for side projects and experimentation.
How to Get Started
pip install pipevine
import asyncio
from pipevine import Pipeline, work_pool
@work_pool(buffer=10, retries=3, num_workers=4)
async def process_data(item, state):
# Your processing logic here
return item * 2
@work_pool(buffer=5, retries=1)
async def validate_data(item, state):
if item < 0:
raise ValueError("Negative values not allowed")
return item
# Create and run pipeline
pipe = Pipeline(range(100)) >> process_data >> validate_data
result = await pipe.run()
Feedback Requested
I’d love thoughts on:
- API ergonomics (does it feel Pythonic?)
- Use cases where this could simplify your concurrency setup
- Naming and documentation clarity
sfermigier@reddit
Why ">>" and not "|" (
__or__
magic method)? For a project called "pipeline", this would have looked more natural, IMHO.kwargs_@reddit (OP)
True. Would it be too over the top to allow both? 😅
c_is_4_cookie@reddit
Sweet project and a very nice interface. Is there a way to merge two process outputs into a stage that requires two sources?
kwargs_@reddit (OP)
The idea was by using a mix_pool with two or more different handlers and a merge function, the flow would fork out to the handlers then back in at the merge function. Does that cover your use case or are you thinking of something different?
c_is_4_cookie@reddit
Yes, i think so
techlatest_net@reddit
Pipevine looks super promising for taming async chaos! The API feels intuitive—integrating retries, num_workers, and backpressure effortlessly. Definitely Pythonic, though a visual example in the docs might help clarify control flow for newcomers. This could save headaches in managing CPU-bound workflows or stream consumers. Naming seems on point, though clearer hints on lifecycle management (e.g.,
stop()
behaviors) would be golden. Would love to see edge-case details like starvation mitigation. Amazing work—this feels destined to power production pipelines. 🚀kwargs_@reddit (OP)
Thanks for the positive feedback
Recursive_Boomerang@reddit
GPT GPT everywhere
gdchinacat@reddit
From the sample you posted, yes, this feels very pythonic. It was clear from the code what everything did. I really like the use of >>/__rshift__ for the pipelining.
kwargs_@reddit (OP)
Thanks! The rshift overloading is my favorite part personally. Love that you can do that that in python.
gdchinacat@reddit
I've been working on a project that uses the rich comparison functions to allow you to write this to have the function called when the expression becomes true. This will count as fast as possible until some other predicated function stops it.
I love that @ is used for decorations so this literally reads 'at count ?= 0'.
kwargs_@reddit (OP)
Very cool! Is there a GitHub link I can checkout for this? Would love to read more and drop a star
gdchinacat@reddit
https://github.com/gdchinacat/reactions
PurepointDog@reddit
Does this support progress bars/indicators?
kwargs_@reddit (OP)
You mean like progress indicators to show percent completion? Interesting idea. Not yet because it’s agnostic about the size of the generator (could be infinite) but that could be a cool feature to add.
Regarding errors.. right now, if you raise an exception in a handler, it counts it, optionally logs it, and continues. There’s a special kill switch handlers can emit to tear down the pipeline. I haven’t decided yet if this is the best approach.