怎么在Python中异步操作数据库

寻技术 Python编程 2023年07月12日 104

本篇内容介绍了“怎么在Python中异步操作数据库”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

异步操作 MySQL

异步操作 MySQL 的话,需要使用一个 aiomysql,直接 pip install aiomysql 即可。

aiomysql 底层依赖于 pymysql,所以 aiomysql 并没有单独实现相应的连接驱动,而是在 pymysql 之上进行了封装。

查询记录

下面先来看看如何查询记录。

import asyncio
import aiomysql.sa as aio_sa
async def main():
# 创建一个异步引擎
engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10)
# 通过 engine.acquire() 获取一个连接
async with engine.acquire() as conn:
# 异步执行, 返回一个对象
result = await conn.execute("SELECT * FROM girl")
# 通过 await result.fetchone() 可以获取满足条件的第一条记录, 一个对象
data = await result.fetchone()
# 可以将对象想象成一个字典
print(data.keys())# KeysView((1, '古明地觉', 16, '地灵殿'))
print(list(data.keys()))# ['id', 'name', 'age', 'place']
print(data.values())# ValuesView((1, '古明地觉', 16, '地灵殿'))
print(list(data.values()))# [1, '古明地觉', 16, '地灵殿']
print(data.items())# ItemsView((1, '古明地觉', 16, '地灵殿'))
print(list(data.items()))# [('id', 1), ('name', '古明地觉'), ('age', 16), ('place', '地灵殿')]
# 直接转成字典也是可以的
print(dict(data))# {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}
# 最后别忘记关闭引擎, 当然你在创建引擎的时候也可以通过 async with aio_sa.create_engine 的方式创建
# async with 语句结束后会自动执行下面两行代码
engine.close()
await engine.wait_closed()
 loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

怎么样,是不是很简单呢,和同步库的操作方式其实是类似的。但是很明显,我们在获取记录的时候不会只获取一条,而是会获取多条,获取多条的话使用 await result.fetchall() 即可。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
async def main():
# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
result = await conn.execute("SELECT * FROM girl")
# 此时的 data 是一个列表, 列表里面是对象
data = await result.fetchall()
# 将里面的元素转成字典
pprint(list(map(dict, data)))
"""
[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},
 {'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},
 {'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

除了 fetchone、fetchall 之外,还有一个 fetchmany,可以获取指定记录的条数。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
async def main():
# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
result = await conn.execute("SELECT * FROM girl")
# 默认是获取一条, 得到的仍然是一个列表
data = await result.fetchmany(2)
pprint(list(map(dict, data)))
"""
[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},
 {'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上就是通过 aiomysql 查询数据库中的记录,没什么难度。但是值得一提的是,await conn.execute 里面除了可以传递一个原生的 SQL 语句之外,我们还可以借助 SQLAlchemy。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
result = await conn.execute(sql)
data = await result.fetchall()
pprint(list(map(dict, data)))
"""
[{'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},
 {'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

添加记录

然后是添加记录,我们同样可以借助 SQLAlchemy 帮助我们拼接 SQL 语句。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
# 我们还需要创建一个 SQLAlchemy 中的引擎, 然后将表反射出来
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True
insert_sql = tbl.insert().values(
[{"name": "十六夜
关闭

用微信“扫一扫”