# 普通插入(耗时2.3秒) with db_session: for i inrange(1000): User(name=f'user_{i}') # 批量插入(耗时0.15秒) with db_session: User.insert_batch([dict(name=f'user_{i}')for i inrange(1000)])
3. 原生SQL支持
query =""" SELECT u.name, COUNT(p.id) FROM users u LEFT JOIN posts p ON u.id = p.author_id GROUP BY u.id """ with db_session: result = db.select(query)
# 错误示例 user =User[1] print(user.posts)# 会话已关闭时报错 # 正确做法 with db_session: user =User[1] posts = user.posts[:]# 立即加载
问题2:循环导入 采用集中式模型定义文件(models.py),使用延迟关系定义:
classDepartment(db.Entity): employees =Set("Employee") classEmployee(db.Entity): department =Required(Department)
七、生态整合与扩展
1. FastAPI集成方案
from fastapi importDepends defget_db(): with db_session: yield db @app.post("/users") asyncdefcreate_user(user:UserSchema, db=Depends(get_db)): new_user =User(**user.dict()) commit() return new_user
2. 异步支持
通过配合asyncpg驱动实现异步操作:
from pony.orm import db_session asyncdefget_users(): loop = asyncio.get_event_loop() await loop.run_in_executor(None, db_session(lambda:list(select(u for u inUser))))
3. 监控体系建设
集成Prometheus监控:
from prometheus_client importCounter QUERY_COUNTER =Counter('pony_queries_total','Total DB queries') classMonitoringQueryTranslator(QueryTranslator): def_execute(self, connection, sql, arguments): QUERY_COUNTER.inc() returnsuper()._execute(connection, sql, arguments) db.query_translator =MonitoringQueryTranslator(db)