分享

SQLAlchemy + 协程,实现异步的 ORM

 古明地觉O_o 2022-12-08 发布于北京

楔子

最近猫哥转载了我在博客园上的一篇文章,是和异步的数据库驱动相关的。对于一个 web 服务而言,性能瓶颈基本都在数据库上面,如果在等待数据库返回数据的时候,能够自动切换并处理其它请求的话,那么并发量会得到显著的提升。但想做到这一点,我们就必须将同步驱动换成异步驱动。

那么异步驱动都有哪些呢?

  • aiosqlite:用于连接 SQLite;

  • asyncmy、aiomysql:用于连接 MySQL;

  • asyncpg、aiopg:用于连接 PostgreSQL;

  • cx_Oracle_async:用于连接 Oracle;

  • aioredis:用于连接 Redis;

现如今 Python 已经进化到 3.11 了,适配不同数据库的异步驱动也已经非常成熟了。但这里我要介绍的不是这些驱动,而是 ORM。不同的驱动使用起来会有一些差异,而 ORM 提供了一个统一的上层接口,屏蔽了不同驱动之间的差异。

Python 里面最有名的ORM莫过于SQLAlchemy,在早期它是一个同步的 ORM,只能适配一些同步驱动。不过从 1.4 版本的时候引入了协程,支持了异步功能,并且在使用上和之前没有太大区别。下面我们来看一下它的用法,并介绍一些最佳实践。


创建一个异步引擎

SQLAlchemy 不具备连接数据库的能力,它连接数据库还是使用了驱动,所以在使用之前我们必须先下载一个驱动才行。这里我以 MySQL 为例,使用的异步驱动为 asyncmy,直接 pip install asyncmy 安装即可。

"""
使用 create_engine 创建同步引擎
使用 create_async_engine 创建异步引擎

同步引擎搭配同步驱动
异步引擎搭配异步驱动
"""

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.engine import URL

# 也可以直接传递一个字符串
参数和 create_engine 是一样的

# create_async_engine("mysql+asyncmy://...")
engine = create_async_engine(
    URL.create("mysql+asyncmy",
               username="root",
               password="123456",
               host="82.157.146.194",
               port=3306,
               database="mysql")
)

以上我们就创建了一个异步引擎,创建方式和同步引擎没什么区别,它们的参数也都是一样的。

既然引擎有了,那么如何用该引擎操作数据库呢?

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text

engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")

# 需要定义一个协程函数
async def 
get_data():
    # 引擎内部维护了一个连接池
    # engine.connect()会从池子里取出一个连接
    async with engine.connect() as conn:
        # 调用 conn.execute() 执行 SQL 语句
        # SQL 语句需要传到 text 方法中
        query = text("SELECT * FROM girl")
        result = await conn.execute(query)

    # 返回的 result 是一个 CursorResult 对象
    # 调用 result.fetchone() 拿到单条数据
    data = result.fetchone()
    print(data)
    """
    (1, '古明地觉', 156)
    """

    # 虽然显示的是一个元组,但它其实是一个 Row 对象
    # 我们还可以将它转成字典
    print(dict(data))
    """
    {'id': 1, 'name': '古明地觉', 'height': 156}
    """


    # result 内部有一个游标
    # 再调用 result.fetchone() 会返回下一条数据
    print(result.fetchone())
    print(result.fetchone())
    """
    (2, '古明地恋', 154)
    (3, '魔理沙', 154)
    """

    # 库里面总共就 3 条数据
    # 所以当没有数据时,就会返回 None
    print(result.fetchone())
    """
    None
    """


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

用法很简单,通过 engine.connect() 可以从池子里面取出一个连接,再调用连接的 execute 方法执行 SQL 语句即可。但需要注意:字符串格式的 SQL 语句不能直接传递,需要先调用 SQLAlchemy 提供的 text 方法。

执行完毕之后,会返回一个 CursorResult 对象,调用它的 fetchone 方法会逐条结果集的数据。当然除了 fetchone,还有 fetchmany 和 fetchall,我们来看一下。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        result = await conn.execute(query)
    # 从结果集取两条数据
    data = result.fetchmany(2)
    print(data)
    """
    [(1, '古明地觉', 156), 
     (2, '古明地恋', 154)]
    """

    # 再取两条数据,但显然此时只剩下一条了
    data = result.fetchmany(2)
    print(data)
    """
    [(3, '魔理沙', 154)]
    """

    # 如果没有数据了,fetchmany 会返回空列表
    data = result.fetchmany(1)
    print(data)
    """
    []
    """

所以 fetchmany 接收一个整数,就是获取指定数量的数据。而 fetchall 就简单了,显然它是获取结果集的全部数据。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        result = await conn.execute(query)
    data = result.fetchall()
    print(data)
    """
    [(1, '古明地觉', 156), 
     (2, '古明地恋', 154), 
     (3, '魔理沙', 154)]
    """

    # 列表里面的 Row 对象都转成字典
    print(list(map(dict, data)))
    """
    [{'id': 1, 'name': '古明地觉', 'height': 156}, 
     {'id': 2, 'name': '古明地恋', 'height': 154}, 
     {'id': 3, 'name': '魔理沙', 'height': 154}]
    """

还是比较简单的,通过 CursorResult 的这三个方法,便可以获取想要的数据。然后再补充一点,我们说 SQL 语句需要放在 text 方法中,然后才能传给连接的 execute 方法。虽然这个过程稍微有点麻烦,但好处就是我们可以使用 SQLAlchemy 提供的占位符功能。

async def get_data():
    async with engine.connect() as conn:
        # :id 就是一个占位符,那么它等于多少呢?
        # 再调用 bindparams 指定即可
        # 并且占位符的数量没有限制
        query = text(
            "SELECT * FROM girl WHERE id > :id"
        ).bindparams(id=1)
        result = await conn.execute(query)
    data = result.fetchall()
    # 此时只返回了两条数据
    print(list(map(dict, data)))
    """
    [{'id': 2, 'name': '古明地恋', 'height': 154}, 
     {'id': 3, 'name': '魔理沙', 'height': 154}]
    """

以后执行 SQL 语句的时候,就通过这种方式去执行即可。当然我们这里只介绍了查询,增删改还没有说,下面来看看它在面对增删改时的表现。


执行增删改语句

先来看看添加数据:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import Table, MetaData, Column
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR

engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")

async def get_data():
    # 构建数据库表
    table = Table(
        "girl",  # 表名
        MetaData(),  # MetaData() 实例
        # 表里面的列
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            {"name""芙兰朵露""height"150})
        result = await conn.execute(query)
        # 返回受影响的行数
        print(result.rowcount)  # 1
        # 返回数据在插入之后的主键
        print(result.inserted_primary_key)  # (4,)
        # 对于增删改而言,还必须调用一次 commit
        # 否则数据不会写入到库中
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

以上是插入单条数据,我们也可以同时插入多条数据。而方法也很简单,插入单条数据是往 values 里面传一个字典,而插入多条数据只需要传一个列表即可。

async def get_data():
    # 构建数据库表
    table = Table(
        "girl", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            [{"name""琪露诺""height"151},
             {"name""十六夜咲夜""height"165}])
        await conn.execute(query)
        await conn.commit()

我们看一下数据库,看看数据有没有变化。

数据成功地写入到库中了,然后再来看看修改数据:

async def get_data():
    table = Table(
        "girl", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        # 修改 id = 1 的 name 字段
        query = table.update().where(
            table.c.id == 1).values({"name""satori"})
        result = await conn.execute(query)
        print(result.rowcount)  # 1

        # 少女们都长高了 10 厘米
        # 不调用 where,则修改所有行
        query = table.update().values(
            {"height": Column("height") + 10}
        )
        result = await conn.execute(query)
        # 受影响的行数为 6
        print(result.rowcount)  # 6
        # 别忘了提交
        await conn.commit()

看一下表数据有没有变:

数据成功被修改。另外这里的 where 只有单个条件,如果是多个条件,那么彼此之间使用 & 或 | 进行连接,代表 and 和 or。

最后是删除数据:

async def get_data():
    table = Table(
        "girl", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        # 删除 id = 1 的数据
        query = table.delete().where(
            table.c.id == 1)
        result = await conn.execute(query)
        print(result.rowcount)  # 1

        # 删除 id 为 2、3 的数据
        query = table.delete().where(
            table.c.id.in_([23]))
        result = await conn.execute(query)
        print(result.rowcount)  # 2
        await conn.commit()

那么数据有没有被成功删除呢?

成功将数据删掉了。


异步引擎的性能提升

必须要说明的是,如果只是单次的数据库请求,那么同步引擎和异步引擎之间没什么差异,耗时是差不多的。但如果是多个请求,那么异步引擎可以实现并发访问,我们举个例子。这里为了更好地观察到现象,我往表里写了 100w 条数据。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        await conn.execute(query)

async def 
main():
    start = time.perf_counter()
    await get_data()
    end = time.perf_counter()
    print(f"单次请求耗时: 
{end - start}s")
    """
    单次请求耗时: 26.8164807s
    """


    start = time.perf_counter()
    await asyncio.gather(*[get_data()] * 20)
    end = time.perf_counter()
    print(f"二十次请求耗时: 
{end - start}s")
    """
    二十次请求耗时: 27.2821891s
    """


    start = time.perf_counter()
    await asyncio.gather(*[get_data()] * 50)
    end = time.perf_counter()
    print(f"五十次请求耗时: 
{end - start}s")
    """
    五十次请求耗时: 27.480469s
    """


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

可以看到耗时是差不多的,如果你写了一个服务,请求过来的时候需要从数据库读数据(假设耗时 2s),然后返回。那么无论是来一个请求,还是同时来十个请求,耗时都是差不多的,大概 2s。可能同时处理十个请求的耗时会多一些,但不会多太多,因为请求数据库这个过程是并发进行的。

当然啦,并发处理的请求数肯定也是有上限的,不可能无限大,因为数据库连接池内部的连接数量是有限的。所以任何一个由多个组件构成的系统,随着并发数的提高,总会出现瓶颈。可能一开始的瓶颈服务访问数据库的连接数量不够,但随着连接的增多瓶颈又会转移到数据库上。这个时候可以搭建一个 MySQL 集群,以及引入 Redis 缓存,进一步提升并发量。

所以服务到底选择什么样的架构,取决于你的业务量,随着业务量的增大,一开始行之有效的架构设计就会变得力不从心,总会在某个地方出现瓶颈。我们只能根据实际情况进行调整,使得服务的处理能力尽可能地延展下去。


引擎的反射

在使用同步引擎的时候,我们应该都用过它的反射功能,举个例子。

from pprint import pprint
from sqlalchemy import create_engine
from sqlalchemy import inspect

# 此处为同步引擎
engine = create_engine(
    "mysql+pymysql://root:123456@82.157.146.194/mysql")
inspector = inspect(engine)

# 返回当前数据库下都有哪些表
pprint(inspector.get_table_names())
"""
['columns_priv',
 'component',
 'db',
 'default_roles',
 ......
"""

# 返回默认的数据库
pprint(inspector.default_schema_name)
"""
'mysql'
"""

# 返回所有的数据库
# 如果是 PostgreSQL,则返回 schema
pprint(inspector.get_schema_names())
"""
['information_schema', 'mysql', 
 'performance_schema', 'sys']
"""

# 返回当前数据库下都有哪些视图
pprint(inspector.get_view_names())
"""
[]
"""

# 查看一张表都有哪些列
# 里面包含了列名、类型、默认值、注释等信息
pprint(inspector.get_columns("girl"))
"""
[{'autoincrement': True,
  'comment': None,
  'default': None,
  'name': 'id',
  'nullable': False,
  'type': INTEGER()},
 {'comment': None,
  'default': None,
  'name': 'name',
  'nullable': True,
  'type': VARCHAR(length=255)},
 {'autoincrement': False,
  'comment': None,
  'default': None,
  'name': 'height',
  'nullable': True,
  'type': INTEGER()}]
"""

# 返回一张表的主键约束
pprint(inspector.get_pk_constraint("girl"))
# 返回一张表的所有外键
pprint(inspector.get_foreign_keys("girl"))
# 返回一张表的索引
pprint(inspector.get_indexes("girl"))
# 返回一张表的唯一性约束
pprint(inspector.get_unique_constraints("girl"))
# 返回一张表的注释
pprint(inspector.get_table_comment("girl"))

通过反射引擎,我们可以拿到很多的元信息。当然,也能将一张表反射出来。但这是同步引擎才具有的功能,异步引擎目前还不支持反射。

当然这些信息本质上也是执行了相关查询才获取到的,我们也可以使用异步引擎手动执行,比如查看表字段信息:

async def main():
    async with engine.connect() as conn:
        query = text("SELECT COLUMN_NAME, DATA_TYPE "
                     "FROM INFORMATION_SCHEMA.COLUMNS "
                     "WHERE TABLE_NAME='girl'")
        data = (await conn.execute(query)).fetchall()
        print(list(map(dict, data)))
        """
        [{'COLUMN_NAME': 'height', 'DATA_TYPE': 'int'}, 
         {'COLUMN_NAME': 'id', 'DATA_TYPE': 'int'}, 
         {'COLUMN_NAME': 'name', 'DATA_TYPE': 'varchar'}]
        """

其它的一些元信息也可以通过查询的方式获取。


小结

以上就是 SQLAlchemy + 协程相关的内容,这篇文章算是对猫哥那篇文章的一个补充。如果你使用的是 FastAPI、Sanic 之类的框架,那么也应该要搭配一个异步的 ORM 才能发挥出威力。

最后特别感谢《Python 猫》的号主猫哥,在我创建公众号之后给了我莫大的帮助,正是他第一次转载文章帮我引流,才让我有了继续写下去的动力。这里反向推荐一波。

当然也感谢其他转载我文章的号主,欢迎大家多多转载。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多