SQLAlchemy多线程下事务隔离机制详解

1. 起因

通过开启多线程,并发查询订单详情信息,通过将不同订单对象发送给不同线程,执行完所需要的业务逻辑之后,对订单对象的数据进行修改,然后执行 commit,查询数据库发现数据没有更新,且后台日志没有任何的报错

**错误代码:**

from app.ext import db
from flask import current_app
from concurrent.futures import ThreadPoolExecutor

def do_something(order, app):
    with app.app_context():
        order.status = ‘running‘
        db.session.commit()

def main():
    orders = Order.query.fitery_by(status=‘created‘).all()
    app = current_app._get_current_object()
    with ThreadPoolExecutor(max_workers=10) as executor:
        for order in orders:
            executor(do_something, order, app)

2. 排查

2.1 开启SQLAlchemy 中打印SQL参数

app.config["SQLALCHEMY_ECHO"] = True

2.2 查看日志

通过日志发现在主线程中查询 orders 的时候,可以打印SQL,但在新开启的线程执行到 db.session.commit() 时,没有对应SQL打印。

**初步定位是线程传递 order 的问题,SQLAlchemy 事务应该不允许跨线程。**

3. 解决方案

**在开启的线程中,通过传递的订单ID获取订单对象**

from app.ext import db
from flask import current_app
from concurrent.futures import ThreadPoolExecutor

def do_something(order_id, app):
    with app.app_context():
        order = Order.query.get(order_id)
        order.status = ‘running‘
        db.session.commit()

def main():
    orders = Order.query.fitery_by(status=‘created‘).all()
    app = current_app._get_current_object()
    with ThreadPoolExecutor(max_workers=10) as executor:
        for order in orders:
            executor(do_something, order.id, app)

4. 总结

当生成 Session 对象的时,这个对象并不是线程安全的,是一个本地线程对象(thread local storage),因此当跨线程时,就不在此 session 范围内了,从而导致对象无法提交。这样也避免了多线程、多进程情况下,污染其他线程或进程数据。

5. 参考

相关推荐