Optimal design for threading with gunicorn and flask apps..
Posted by ReallyReadyRain3@reddit | Python | View on Reddit | 9 comments
I have a Flask Web application that spawns multiple threads at runtime using the threading module.
Each thread is waiting for an item to be put on the queue. Items are added to the queue via the UI.
This issue appears when I put gunicorn in front of it, I get this issue:
Exception ignored in:
> Traceback (most recent call last): File "site-packages/gevent/threading.py", line 398, in after_fork_in_child assert not thread.is_alive() ^^^^^^^^^^^^^^^^^
ganjlord@reddit
It would help if you were more specific about what exactly you are trying to achieve. If you are implementing a task queue, for example, there are a bunch of libraries you can use instead.
ReallyReadyRain3@reddit (OP)
I just have a simple enough program that works and I would like to prevent the warning if possible.
I basically want to keep the threading as it works well and is simple enough..
Prospector2@reddit
This happens because Gunicorn with gevent type workers (any other asynchronous workers) does not work with threading module threads being executed BEFORE the workers are forked, which creates a conflict when starting the processes.
You could move thread initialization to after the worker fork, use Gunicorn's @post_fork hook to ensure that threads are only created after the worker is forked
Ex:
from myapp import create_app, start_background_threads
Here you initialize (no threads yet)
app = create_app()
def post_fork(server, worker): """Hook executed AFTER worker fork.""" start_background_threads()
Modify your app following this idea:
from flask import Flask from threading import Thread import queue
Shared queue
task_queue = queue.Queue() threads_started = False
def create_app(): app = Flask(name) # These are the settings and routes return app
def worker_thread(): """Consumes queue items indefinitely.""" whileTrue: item = task_queue.get() # Here you process the item print(f"Processing: {item}") task_queue.task_done()
def start_background_threads(): """You will start the threads (called AFTER the fork by Gunicorn).""" global threads_started if not threads_started: for _ in range(4): # 4 threads, e.g. thread = Thread(target=worker_thread, daemon=True) thread.start() threads_started = True print("Threads started blablabla")
lyddydaddy@reddit
At a high level, threading and prefork models are not very compatible.
The specific issue is either that you need custom atfork, or that you’re relying on other threads to exist after fork, they won’t in the child process.
ReallyReadyRain3@reddit (OP)
So how would you avoid this situation?
lyddydaddy@reddit
Don’t mix threads and gunicorn.
Specifically, the threading queue is not shared between the worker processes, meaning that depending on which process the ui requests lands on, it may or may not be able to wake up the right thread that’s waiting for an item.
If you want threading, go with cherrypy
Constant_Bath_6077@reddit
Why you recommend a framework that has hundred of unresolved issues? https://github.com/cherrypy/cherrypy/issues?q=is%3Aissue%20state%3Aopen%20label%3Abug
ReallyReadyRain3@reddit (OP)
That's why I just want to keep it simple, maybe I need to change gunicorn to something else that will work with flask and threading?
ReallyReadyRain3@reddit (OP)
I only have a single gunicorn thread and worker, so it should work? As it's the same queue for each thread and they are all waiting for the next item so whichever gets that is OK.
Any futher thoughts or where to read more on the topic?
Thanks for your help, it's much appreciated!