lenec ru

← все посты

SQLAlchemy 2.0 async: типичные ошибки и как их обойти

10K

За последние два года я переписал три проекта на SQLAlchemy 2.0 в async-режиме и собрал почти полный набор граблей. Самое неприятное — поведение, которое в синхронной версии работало по умолчанию, а в async ломается без явного сообщения. Запрос просто висит, или сессия молча возвращает старые данные, или внезапно падает MissingGreenlet. Ниже — список реальных ошибок, которые я и мои коллеги ловили в проде, и что с ними делать.

Все примеры на Python 3.12 и SQLAlchemy 2.0.x. База — Postgres через asyncpg. Если у тебя psycopg async, поведение очень похожее, но пара мест отличается, я отмечу.

Lazy loading в async — главный источник боли

В синхронной SQLAlchemy ты привык писать так:

user = session.get(User, 1)
for order in user.orders:
    print(order.total)

В async это превращается в MissingGreenlet или StatementError: greenlet_spawn has not been called. Причина простая: атрибут orders — это relationship, и при первом обращении SQLAlchemy хочет сходить в базу. В async-сессии любой ввод-вывод должен быть явным await, а ты его не написал.

Решений три, по убыванию популярности:

  • Eager-загрузка через selectinload или joinedload. Самый частый рабочий вариант. Один запрос на пользователя плюс один на все его заказы — нормальная цена за предсказуемость.
  • Явная подгрузка через await session.refresh(user, ["orders"]). Удобно, когда ты заранее не знал, что нужно подтянуть.
  • Отдельный запрос. Иногда это просто чище.
from sqlalchemy import select
from sqlalchemy.orm import selectinload

stmt = (
    select(User)
    .where(User.id == 1)
    .options(selectinload(User.orders))
)
result = await session.execute(stmt)
user = result.scalar_one()
for order in user.orders:
    print(order.total)

Если кажется, что отключить lazy совсем нельзя, можно поставить на relationship lazy="raise". Тогда любой случайный доступ к незагруженному атрибуту падает сразу с понятным сообщением, а не где-то в середине рендера ответа. В команде это спасает от того, чтобы ленивый доступ просочился в прод.

Жадная загрузка под цикл — N+1 наоборот

Обратная сторона: новички, обжёгшись на lazy, начинают везде писать joinedload цепочками на четыре уровня вглубь. И дальше один запрос возвращает по 50 тысяч строк декартова произведения. Правило простое: коллекции (one-to-many, many-to-many) — через selectinload, чтобы не раздувать результат. Скаляры (many-to-one, one-to-one) — через joinedload, там JOIN дешёвый.

Один движок, одна сессия на запрос

Видел дважды, как команда заводила create_async_engine внутри функции-эндпоинта. Каждый запрос создавал новый пул соединений, которые жили до конца обработки и не закрывались по-человечески. Через час прод лежал на too many connections.

Engine — это пул. Создавай его один раз на старте приложения и переиспользуй. В FastAPI это выглядит так:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

@asynccontextmanager
async def lifespan(app: FastAPI):
    engine = create_async_engine(
        "postgresql+asyncpg://app:app@db/app",
        pool_size=10,
        max_overflow=5,
        pool_pre_ping=True,
    )
    app.state.engine = engine
    app.state.sessionmaker = async_sessionmaker(
        engine, expire_on_commit=False
    )
    yield
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

Сессия — наоборот, короткоживущая. На запрос — своя сессия. Через Depends:

from typing import AsyncIterator
from fastapi import Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession

async def get_session(request: Request) -> AsyncIterator[AsyncSession]:
    sessionmaker = request.app.state.sessionmaker
    async with sessionmaker() as session:
        yield session

Никаких глобальных session, никаких session в module-level. Сессия не потокобезопасна и не таскобезопасна, и попытки переиспользовать её между запросами заканчиваются гонками вокруг identity map.

expire_on_commit=False — без вариантов

По умолчанию после commit() SQLAlchemy инвалидирует все объекты в сессии. В синхронном мире это безопасно: при следующем доступе атрибут просто перечитается из базы. В async это тот самый MissingGreenlet: ты уже отдал управление, сессия закрыта, а в обработчике ты ещё пытаешься прочитать user.email для ответа.

Правило: при создании async-сессии ставь expire_on_commit=False. Иначе после каждого await session.commit() ты будешь обнимать refresh или ловить ошибку. Я в новых проектах ставлю это первым делом.

Транзакции: один уровень, без сюрпризов

В SQLAlchemy 2.0 рекомендованный паттерн — session.begin() вокруг логической операции, а не россыпь commit/rollback по коду:

async def create_order(session: AsyncSession, payload: OrderIn) -> Order:
    async with session.begin():
        order = Order(user_id=payload.user_id, total=payload.total)
        session.add(order)
        # тут же добавляем позиции
        for item in payload.items:
            session.add(OrderItem(order_id=order.id, sku=item.sku))
    return order

Этот блок открывает транзакцию, при выходе делает commit, при исключении — rollback. Не надо ловить ошибки и руками откатывать. Если у тебя глубоко в коде уже есть begin(), и ты вызываешь begin() ещё раз — получишь InvalidRequestError. Используй session.begin_nested() для логической вложенности (это savepoint), либо проектируй так, чтобы транзакция стартовала на верхнем уровне.

Connection pool: pool_size, max_overflow и pool_pre_ping

Дефолты у SQLAlchemy довольно скромные: pool_size=5, max_overflow=10. Под FastAPI с десятком воркеров uvicorn это значит, что на инстанс приходится 5 постоянных соединений и до 15 пиковых. Умножь на число подов — и Postgres скажет «спасибо, не надо».

Что делать:

  • Считай бюджет соединений к Postgres явно: max_connections в постгресе делишь на (число подов × воркеров на под). Оставляй 20% запас на админ-сессии и репликацию.
  • Ставь pool_pre_ping=True. Это лёгкий SELECT 1 перед выдачей соединения из пула. Спасает от мёртвых соединений после рестарта базы или сетевых обрывов.
  • Если у тебя много инстансов и хочется централизованного управления — поставь PgBouncer в режиме transaction, а у самой SQLAlchemy уменьши пул до 2–3 на инстанс. Но помни про prepared statements: asyncpg их использует по умолчанию, и в transaction-pooling они ломаются. Лекарство — statement_cache_size=0 в connect_args.
engine = create_async_engine(
    "postgresql+asyncpg://app:app@pgbouncer/app",
    pool_size=3,
    max_overflow=2,
    pool_pre_ping=True,
    connect_args={"statement_cache_size": 0},
)

Смешивание sync и async кода

Бывает, надо позвать что-то синхронное (старый кусок логики, плагин, что угодно), которое внутри тянется в базу. Не пытайся обернуть синхронную сессию в asyncio.to_thread и работать ей из async-кода — гарантированно поймаешь странные блокировки.

Правильный путь — функция run_sync у async-сессии:

def legacy_recalc(sync_session, user_id: int) -> None:
    user = sync_session.get(User, user_id)
    user.score = recompute(user)

await session.run_sync(legacy_recalc, user_id)

Внутри run_sync SQLAlchemy подсунет тебе синхронный фасад над тем же соединением. Снаружи всё остаётся честно асинхронным.

Тесты: одна транзакция на тест, откат в конце

Стандартный pytest-фикстура для async-сессии в проде у меня выглядит так:

import pytest_asyncio
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

@pytest_asyncio.fixture(scope="session")
async def engine():
    engine = create_async_engine(TEST_DSN)
    yield engine
    await engine.dispose()

@pytest_asyncio.fixture
async def session(engine):
    async with engine.connect() as conn:
        trans = await conn.begin()
        SessionLocal = async_sessionmaker(bind=conn, expire_on_commit=False)
        async with SessionLocal() as session:
            yield session
        await trans.rollback()

На каждый тест — отдельное соединение, вокруг — транзакция, в конце откат. База между тестами не загрязняется, фикстуры быстрые, миграции прогоняются один раз. Если внутри кода тоже есть begin() — он автоматически становится savepoint, ничего ломать не надо.

Чего не хватает по сравнению с sync

Честный момент: async-режим всё ещё немного беднее. Часть legacy-методов помечена как deprecated, кое-где приходится использовать session.execute(select(...)) вместо короткого session.query(...). Это не страшно, но в первый месяц непривычно.

Был и пункт про события (events). Раньше before_flush и компания писались синхронно и под async ругались. В 2.0 завезли async-варианты через @event.listens_for(... "do_orm_execute") и обёртки в session.run_sync. Если у тебя на событиях много логики — проверь, что хук действительно отрабатывает в async-сессии, я ловил случай, когда хук был зарегистрирован на синхронный Session и в async просто молча игнорировался.

Что запомнить

SQLAlchemy 2.0 в async хороша, но требует дисциплины. Eager-загрузка по умолчанию, один engine на приложение, короткоживущие сессии, expire_on_commit=False, осознанный pool_size, транзакции через session.begin(). Если эти шесть пунктов соблюдены, проект годами едет без сюрпризов. Я бы посоветовал заодно прочитать раздел Asyncio Integration в официальной доке: там подробно расписано, что именно нельзя делать в async-сессии и почему.

Дальше копать имеет смысл в сторону Alembic с async и в тонкости asyncpg: prepared statements, типы массивов, обработка NOTIFY. Это уже отдельная история, и я к ней ещё вернусь.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.