SQLAlchemy 2.0 async: типичные ошибки и как их обойти
За последние два года я переписал три проекта на SQLAlchemy 2.0 в async-режиме и собрал почти полный набор граблей. Самое неприятное — поведение, которое в синхронной версии работало по умолчанию, а в async ломается без явного сообщения. Запрос просто висит, или сессия молча возвращает старые данные, или внезапно падает MissingGreenlet. Ниже — список реальных ошибок, которые я и мои коллеги ловили в проде, и что с ними делать.
Все примеры на Python 3.12 и SQLAlchemy 2.0.x. База — Postgres через asyncpg. Если у тебя psycopg async, поведение очень похожее, но пара мест отличается, я отмечу.
Lazy loading в async — главный источник боли
В синхронной SQLAlchemy ты привык писать так:
user = session.get(User, 1)
for order in user.orders:
print(order.total)
В async это превращается в MissingGreenlet или StatementError: greenlet_spawn has not been called. Причина простая: атрибут orders — это relationship, и при первом обращении SQLAlchemy хочет сходить в базу. В async-сессии любой ввод-вывод должен быть явным await, а ты его не написал.
Решений три, по убыванию популярности:
- Eager-загрузка через
selectinloadилиjoinedload. Самый частый рабочий вариант. Один запрос на пользователя плюс один на все его заказы — нормальная цена за предсказуемость. - Явная подгрузка через
await session.refresh(user, ["orders"]). Удобно, когда ты заранее не знал, что нужно подтянуть. - Отдельный запрос. Иногда это просто чище.
from sqlalchemy import select
from sqlalchemy.orm import selectinload
stmt = (
select(User)
.where(User.id == 1)
.options(selectinload(User.orders))
)
result = await session.execute(stmt)
user = result.scalar_one()
for order in user.orders:
print(order.total)
Если кажется, что отключить lazy совсем нельзя, можно поставить на relationship lazy="raise". Тогда любой случайный доступ к незагруженному атрибуту падает сразу с понятным сообщением, а не где-то в середине рендера ответа. В команде это спасает от того, чтобы ленивый доступ просочился в прод.
Жадная загрузка под цикл — N+1 наоборот
Обратная сторона: новички, обжёгшись на lazy, начинают везде писать joinedload цепочками на четыре уровня вглубь. И дальше один запрос возвращает по 50 тысяч строк декартова произведения. Правило простое: коллекции (one-to-many, many-to-many) — через selectinload, чтобы не раздувать результат. Скаляры (many-to-one, one-to-one) — через joinedload, там JOIN дешёвый.
Один движок, одна сессия на запрос
Видел дважды, как команда заводила create_async_engine внутри функции-эндпоинта. Каждый запрос создавал новый пул соединений, которые жили до конца обработки и не закрывались по-человечески. Через час прод лежал на too many connections.
Engine — это пул. Создавай его один раз на старте приложения и переиспользуй. В FastAPI это выглядит так:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
@asynccontextmanager
async def lifespan(app: FastAPI):
engine = create_async_engine(
"postgresql+asyncpg://app:app@db/app",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
app.state.engine = engine
app.state.sessionmaker = async_sessionmaker(
engine, expire_on_commit=False
)
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
Сессия — наоборот, короткоживущая. На запрос — своя сессия. Через Depends:
from typing import AsyncIterator
from fastapi import Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
async def get_session(request: Request) -> AsyncIterator[AsyncSession]:
sessionmaker = request.app.state.sessionmaker
async with sessionmaker() as session:
yield session
Никаких глобальных session, никаких session в module-level. Сессия не потокобезопасна и не таскобезопасна, и попытки переиспользовать её между запросами заканчиваются гонками вокруг identity map.
expire_on_commit=False — без вариантов
По умолчанию после commit() SQLAlchemy инвалидирует все объекты в сессии. В синхронном мире это безопасно: при следующем доступе атрибут просто перечитается из базы. В async это тот самый MissingGreenlet: ты уже отдал управление, сессия закрыта, а в обработчике ты ещё пытаешься прочитать user.email для ответа.
Правило: при создании async-сессии ставь expire_on_commit=False. Иначе после каждого await session.commit() ты будешь обнимать refresh или ловить ошибку. Я в новых проектах ставлю это первым делом.
Транзакции: один уровень, без сюрпризов
В SQLAlchemy 2.0 рекомендованный паттерн — session.begin() вокруг логической операции, а не россыпь commit/rollback по коду:
async def create_order(session: AsyncSession, payload: OrderIn) -> Order:
async with session.begin():
order = Order(user_id=payload.user_id, total=payload.total)
session.add(order)
# тут же добавляем позиции
for item in payload.items:
session.add(OrderItem(order_id=order.id, sku=item.sku))
return order
Этот блок открывает транзакцию, при выходе делает commit, при исключении — rollback. Не надо ловить ошибки и руками откатывать. Если у тебя глубоко в коде уже есть begin(), и ты вызываешь begin() ещё раз — получишь InvalidRequestError. Используй session.begin_nested() для логической вложенности (это savepoint), либо проектируй так, чтобы транзакция стартовала на верхнем уровне.
Connection pool: pool_size, max_overflow и pool_pre_ping
Дефолты у SQLAlchemy довольно скромные: pool_size=5, max_overflow=10. Под FastAPI с десятком воркеров uvicorn это значит, что на инстанс приходится 5 постоянных соединений и до 15 пиковых. Умножь на число подов — и Postgres скажет «спасибо, не надо».
Что делать:
- Считай бюджет соединений к Postgres явно:
max_connectionsв постгресе делишь на (число подов × воркеров на под). Оставляй 20% запас на админ-сессии и репликацию. - Ставь
pool_pre_ping=True. Это лёгкийSELECT 1перед выдачей соединения из пула. Спасает от мёртвых соединений после рестарта базы или сетевых обрывов. - Если у тебя много инстансов и хочется централизованного управления — поставь PgBouncer в режиме
transaction, а у самой SQLAlchemy уменьши пул до 2–3 на инстанс. Но помни про prepared statements:asyncpgих использует по умолчанию, и в transaction-pooling они ломаются. Лекарство —statement_cache_size=0в connect_args.
engine = create_async_engine(
"postgresql+asyncpg://app:app@pgbouncer/app",
pool_size=3,
max_overflow=2,
pool_pre_ping=True,
connect_args={"statement_cache_size": 0},
)
Смешивание sync и async кода
Бывает, надо позвать что-то синхронное (старый кусок логики, плагин, что угодно), которое внутри тянется в базу. Не пытайся обернуть синхронную сессию в asyncio.to_thread и работать ей из async-кода — гарантированно поймаешь странные блокировки.
Правильный путь — функция run_sync у async-сессии:
def legacy_recalc(sync_session, user_id: int) -> None:
user = sync_session.get(User, user_id)
user.score = recompute(user)
await session.run_sync(legacy_recalc, user_id)
Внутри run_sync SQLAlchemy подсунет тебе синхронный фасад над тем же соединением. Снаружи всё остаётся честно асинхронным.
Тесты: одна транзакция на тест, откат в конце
Стандартный pytest-фикстура для async-сессии в проде у меня выглядит так:
import pytest_asyncio
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
@pytest_asyncio.fixture(scope="session")
async def engine():
engine = create_async_engine(TEST_DSN)
yield engine
await engine.dispose()
@pytest_asyncio.fixture
async def session(engine):
async with engine.connect() as conn:
trans = await conn.begin()
SessionLocal = async_sessionmaker(bind=conn, expire_on_commit=False)
async with SessionLocal() as session:
yield session
await trans.rollback()
На каждый тест — отдельное соединение, вокруг — транзакция, в конце откат. База между тестами не загрязняется, фикстуры быстрые, миграции прогоняются один раз. Если внутри кода тоже есть begin() — он автоматически становится savepoint, ничего ломать не надо.
Чего не хватает по сравнению с sync
Честный момент: async-режим всё ещё немного беднее. Часть legacy-методов помечена как deprecated, кое-где приходится использовать session.execute(select(...)) вместо короткого session.query(...). Это не страшно, но в первый месяц непривычно.
Был и пункт про события (events). Раньше before_flush и компания писались синхронно и под async ругались. В 2.0 завезли async-варианты через @event.listens_for(... "do_orm_execute") и обёртки в session.run_sync. Если у тебя на событиях много логики — проверь, что хук действительно отрабатывает в async-сессии, я ловил случай, когда хук был зарегистрирован на синхронный Session и в async просто молча игнорировался.
Что запомнить
SQLAlchemy 2.0 в async хороша, но требует дисциплины. Eager-загрузка по умолчанию, один engine на приложение, короткоживущие сессии, expire_on_commit=False, осознанный pool_size, транзакции через session.begin(). Если эти шесть пунктов соблюдены, проект годами едет без сюрпризов. Я бы посоветовал заодно прочитать раздел Asyncio Integration в официальной доке: там подробно расписано, что именно нельзя делать в async-сессии и почему.
Дальше копать имеет смысл в сторону Alembic с async и в тонкости asyncpg: prepared statements, типы массивов, обработка NOTIFY. Это уже отдельная история, и я к ней ещё вернусь.