Flask同步启动子线程问题

原有需求

Flask提供一套后端RESTful API,另有一个入口负责直连TCP传输文件

原有逻辑

Flask通过自带的dev WSGI启动,同时通过子线程启动一个socketserver标准库的ThreadingTCPServer,称之为FileSocketServer,这个socket server依赖Flask后端的部分组件来处理文件存储和数据库读写。我们把所有业务逻辑去掉,Flask只保留一个根路径的接口,socketserver处理请求(流式)时只是简单地向一个文件写入REQ,用来确认它是否正常工作。

app0.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from socketserver import StreamRequestHandler, ThreadingTCPServer
import threading

from flask import Flask


class DummySocketHandler(StreamRequestHandler):
def handle(self):
with open("socket.log", "a") as f:
f.write("REQ\n")


def init():
socket_server = ThreadingTCPServer(("0.0.0.0", 32470),
DummySocketHandler)
threading.Thread(target=socket_server.serve_forever).start()


app = Flask(__name__)
app.debug = True

@app.route('/')
def home():
print("Hey", flush=True)
return "Hey"


if __name__ == '__main__':
init()
app.run(host="0.0.0.0", port=8888)

另外准备了一个简单的脚本对socketserver尝试建立连接。

test_connect.pyview raw
1
2
3
4
5
6
7
8
9
10
import socket

if __name__ == '__main__':
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect(("0.0.0.0", 32470))
print("Connection OK")
except Exception as e:
print("Connection KO")
print(e)

问题1

Flask开箱的WSGI(Werkzeug)在开启了flask debug模式后会启动reloader来快速响应代码变更以方便调试,否则每次修改代码需要重启WSGI。但是Werkzeug的reload方式是在主进程外重启一个几乎完全一样的监视进程,也就是说同时会有两个Flask后端和两个socket server在跑。Flask这部分,Werkzeug应该是有相应的socket绑定逻辑防止两个相同后端的冲突(目测是所有请求会接入监视进程),但是通过子线程启动的所有其他组件都有可能发生冲突。这里由于之前已上线的业务中把socket server的端口写死了,所以我们的两个socket server就会撞绑定,导致Werkzeug的监视进程中断,后端不可用。所以现有的开发环境都是不启动FileSocketServer的,以便保证后端接口逻辑是通的。

图1. 启动app0时撞socket端口
图1. 启动app0时撞socket端口
图2. cURL验证Flask并未正确启动
图2. cURL验证Flask并未正确启动

问题2

Flask官方已警告说Werkzeug仅面向开发,没有在资源利用效率上做优化,不应该被使用在生产环境中(见图1红字警告)。而我们原有的后端在生产环境和开发环境是同一套入口,也就是Werkzeug,仅通过一个debug参数来开启或关闭reloader。虽然在大后端通过容器化把这部分细节都隐藏起来了,伸缩方面也可以通过k8s横向调出多套flask,但在生产环境里使用仅面向开发的应用服务器是不合适的,感觉就像是用php -S跑了一个后端而没上Apache/Nginx。虽然不了解这类开发用服务器与面向生产的具体有什么区别,但官方的警示总是有一定道理的。

思考

重新整理了一下需求:

  1. 要使用面向生产的WSGI来启动Flask
  2. 要保证开发环境中对代码变更的快速响应(reloader)
  3. 在尽量不改变子线程逻辑的情况下,避免开发环境下撞端口的问题

Flask官方给出的部署方式很多样,其中光standalone的WSGI就给出了很多个选项。之前的Flask用过uWSGI,这次决定尝试一下纯Python写的Gunicorn(Green Unicorn)。粗略翻了一下Gunicorn的介绍和配置文档,基本满足在开发环境(debugging reloader)和生产环境(多种可选的worker、基本的并发保障)的切换和大量性能方面的配置。

上手

根据官方的上手文档,调整了Flask入口,去掉了main,通过集成Flask类在__init__中实现启动FileSocketServer子线程的逻辑,外部则通过gunicorn的CLI来启动整个后端,而非直接通过python调起。

app1.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from socketserver import StreamRequestHandler, ThreadingTCPServer
import threading

from flask import Flask


class DummySocketHandler(StreamRequestHandler):
def handle(self):
with open("socket.log", "a") as f:
f.write("REQ\n")


class MyFlask(Flask):
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
try:
print("Creating socket server")
self.socket_server = ThreadingTCPServer(("0.0.0.0", 32470),
DummySocketHandler)
threading.Thread(target=self.socket_server.serve_forever).start()
print("OK")
except OSError:
print('Unable to start socket server')


app = MyFlask(__name__)
app.debug = True


@app.route('/')
def home():
print('hey', flush=True)
return 'Hey'

不带–reload参数启动一下,顺利跑起Flask后端和FileSocketServer。curl一下后端接口有正常响应,通过脚本也验证了可以对FileSocketServer正常建立连接。

图3. 正确启动app1
图3. 正确启动app1
图4. cURL验证Flask已正确启动
图4. cURL验证Flask已正确启动

准备带上–reload重新启一下,结果发现ctrl+c没有顺利杀死进程,Gunicorn的日志显示shutting down worker,说明worker进程这里还在工作,没有给出信号反馈。Gunicorn本身的超时机制在默认的30秒超时时长后自动shutdown了整个进程。

图5. 无法顺利杀死进程
图5. 无法顺利杀死进程

再跑一遍,ctrl+c后直接再次强行ctrl+c,发现这次可以杀死进程,但报出一个死锁错误。

图6. 强行杀死进程报错
图6. 强行杀死进程报错

再思考

这里预判是FileSocketServer的子线程不能终止导致Flask整个进程都不能停。通过代码也可以看出,线程target是socketserver的serve_forever方法,一听就不容易停下来。查阅socketserver库的文档发现,线程模式(ThreadingMixin)以serve_forever启动的服务会一直轮询是否有shutdown指令,因此需要在另一线程中显式调用shutdown方法来停止,否则就等同于while True。

引用Python socketserver官方文档对于serve_forever()shutdown()的说明:

serve_forever(): Handle requests until an explicit shutdown() request. Poll for shutdown every poll_interval seconds.

shutdown() must be called while serve_forever() is running in a different thread otherwise it will deadlock.

那么问题又来了:我应该在什么时间点以什么方式调用shutdown呢?

绕道1

一个有效但不太符合实际应用场景的方法是,在Flask中注册一个shutdown_file_socket_server接口,通过http请求触发shutdown的显式调用。简单实现一下,curl了新增的接口,再ctrl+c终止线程,Gunicorn顺利退出,证明shutdown确实能够正确停止FileSocketServer的serve_forever。

app2.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from socketserver import StreamRequestHandler, ThreadingTCPServer
import threading

from flask import Flask


class DummySocketHandler(StreamRequestHandler):
def handle(self):
with open("socket.log", "a") as f:
f.write("REQ\n")


class MyFlask(Flask):
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
try:
print("Creating socket server")
self.socket_server = ThreadingTCPServer(("0.0.0.0", 32470),
DummySocketHandler)
threading.Thread(target=self.socket_server.serve_forever).start()
print("OK")
except OSError:
print('Unable to start socket server')


app = MyFlask(__name__)
app.debug = True


@app.route('/')
def home():
print('hey', flush=True)
return 'Hey'


@app.route('/shutdown_file_socket_server')
def shutdown_file_socket_server():
threading.Thread(target=app.socket_server.shutdown).start()
return 'OK'
图7. 调用shutdown_file_socket_server后可顺利杀死进程
图7. 调用shutdown_file_socket_server后可顺利杀死进程

这里需要注意的是,根据官方文档,socketserver在实例化的时候自动做了binding,所以即便shutdown了serve_forever还是能够对绑定的socket进行TCP连接,只不过handle request不发生作用了,通过测试脚本也验证了这一点。

If bind_and_activate is true, the constructor automatically attempts to invoke server_bind() and server_activate().

图8. 调用shutdown_file_socket_server后socketserver的绑定仍然生效
图8. 调用shutdown_file_socket_server后socketserver的绑定仍然生效

显然这种方式是有问题的:每次我想reload就要先调一下这个接口,与代码变更的快速响应背道而驰

绕道2

想了一下,调用shutdown比较合适的时间点是当Flask对象tear down的时候或者python进程收到终止信号的时候。查了一下,python类的__del__方法不能保证类的对象没被销毁,所以不太保险。而进程终止信号是有钩子的,需要使用标准库atexit,通过registry函数注册操作即可。

app3.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import atexit
from socketserver import StreamRequestHandler, ThreadingTCPServer
import threading

from flask import Flask


class DummySocketHandler(StreamRequestHandler):
def handle(self):
with open("socket.log", "a") as f:
f.write("REQ\n")


class MyFlask(Flask):
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
try:
print("Creating socket server")
self.socket_server = ThreadingTCPServer(("0.0.0.0", 32470),
DummySocketHandler)
threading.Thread(target=self.socket_server.serve_forever).start()
print("OK")
atexit.register(self.shutdown_socket_server)
except OSError:
print('Unable to start socket server')

def shutdown_socket_server(self):
threading.Thread(target=self.socket_server.shutdown).start()


app = MyFlask(__name__)
app.debug = True


@app.route('/')
def home():
print('hey', flush=True)
return 'Hey'

看起来逻辑还算通顺,然而试了一下却与最开始的情形并无二致。增加了一下log再调试一番后发现,这个atexit是在主进程即将被终止之前调用注册好的操作,但FileSocketServer是在这个钩子之前就卡住了进程终止,所以在Gunicorn超时之前压根也没走到atexit这一步。当然,atexit本身是走的通的,通过日志也可以验证atexit确实调用到了注册好的操作,这里不再深究。

绕道3

现在能想到的programmatic的方法似乎都不太可行。而Gunicorn自身的超时机制是可以保证杀死进程的,也许可以通过调整超时时间来近似达到快速reload的目的。基于官方文档调整–graceful-timeout和–timeout为1秒,启动Gunicron,ctrl+c,确实能在1秒杀死进程。

图9. 通过调整gunicorn超时相关参数达到快速强杀进程
图9. 通过调整gunicorn超时相关参数达到快速强杀进程

加上–reload试了一下,也确实能够比较顺利地响应代码变更,只不过在reload的时候会产生一些错误信息,在开发调试和生产环境中均会造成一定程度的混淆,总体来说是一种有效但很“脏”的投机取巧。

图10. 强杀进程在reload时的脏日志
图10. 强杀进程在reload时的脏日志

正道

网上对serve_forever/shutdown的问题和讨论都比较少,个人感觉,从逻辑上讲,既然都想forever了,就不会过多考虑如何shutdown,一直跑着就好了。再深入研究一下,发现有一些讨论中提到了daemon thread。之前只对daemon有大致的概念,没有深刻的理解,查阅资料发现之前理解的daemon和daemon thread还是有不小的偏差。我一般把daemon理解为“驻守”,在系统中跑起来一个东西,它就会一直跑着。Daemon thread其实也差不多,只不过它的意义不止在于“一直跑”,更在于“不用管”。JournalDev上的一篇文章有一个很通顺的英文解释:

While a non-daemon thread blocks the main program to exit if they are not dead. A daemon thread runs without blocking the main program from exiting. And when main program exits, associated daemon threads are killed too.

非daemon的线程在没有死亡的情况下,主进程是无法优雅退出的。Daemon类的线程则不会阻塞主进程的退出。并且,当主进程终结后,daemon类的子线程也会被连带杀死。

另备注一个SO上对daemon thread的解释

这样推断出我们想要的方法就比较顺理成章了:把FileSocketServer置于一个daemon thread,当主进程结束后,它跟着去死就好了。去掉与atexit相关的逻辑,最终实现与切换到Gunicorn的第一版相比,只多了一个参数而已。

app.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from socketserver import StreamRequestHandler, ThreadingTCPServer
import threading

from flask import Flask


class DummySocketHandler(StreamRequestHandler):
def handle(self):
with open("socket.log", "a") as f:
f.write("REQ\n")


class MyFlask(Flask):
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
try:
print("Creating socket server")
self.socket_server = ThreadingTCPServer(("0.0.0.0", 32470),
DummySocketHandler)
threading.Thread(target=self.socket_server.serve_forever,
daemon=True).start()
print("OK")
except OSError:
print('Unable to start socket server')


app = MyFlask(__name__)
app.debug = True


@app.route('/')
def home():
print('hey', flush=True)
return 'Hey'

启动Gunicorn,ctrl+c,进程顺利退出,带上–reload重新启动,变更一下代码,顺利reload出来,再用脚本测一下socketserver,连接正常,请求处理也正常。

图11. 通过daemon thread顺利杀死进程
图11. 通过daemon thread顺利杀死进程
图12. 验证Flask和FileSocketServer均正常工作
图12. 验证Flask和FileSocketServer均正常工作

绕了一大圈,最终通过一个参数解决了问题,总结下来还是对线程的相关知识不够熟悉,但最终成型的方法还是可以令人满意的。