lenec ru

← все посты

Rust tokio: асинхронный runtime — практическое руководство

10K

Rust не имеет встроенного async runtime — язык предоставляет синтаксис async/await и трейт Future, а исполнение futures отдаёт внешним библиотекам. Tokio — де-факто стандартный runtime для асинхронного Rust: event loop, планировщик задач, асинхронный I/O и примитивы синхронизации в одном пакете.

Зачем tokio: Future и runtime

В Rust async fn возвращает impl Future — ленивую структуру, которая ничего не делает, пока её не заполлит runtime. Без runtime futures не исполняются:

// Это просто создаёт Future, но не запускает его
async fn fetch_data() -> String {
    // ...
    "data".to_string()
}

// Нужен runtime для исполнения
#[tokio::main]
async fn main() {
    let data = fetch_data().await;
    println!("{data}");
}

Макрос #[tokio::main] создаёт multi-threaded runtime и блокирует текущий поток до завершения async main.

Runtime: multi-thread vs current_thread

Tokio предлагает два режима:

// Multi-thread: work-stealing scheduler, N потоков (по умолчанию = CPU cores)
#[tokio::main]
async fn main() { }

// Явная настройка
#[tokio::main(worker_threads = 4)]
async fn main() { }

// Single-thread: один поток, меньше overhead
#[tokio::main(flavor = "current_thread")]
async fn main() { }

current_thread подходит для CLI-утилит и лёгких сервисов. Для нагруженных серверов — multi-thread с work-stealing между потоками.

Задачи: spawn, JoinHandle, abort

tokio::spawn запускает future как отдельную задачу в планировщике:

use tokio::task::JoinHandle;

async fn process_requests() {
    let handle: JoinHandle<u32> = tokio::spawn(async {
        // Выполняется конкурентно
        expensive_computation().await
    });

    // Дождаться результата
    let result = handle.await.unwrap();

    // Или отменить задачу
    let handle2 = tokio::spawn(long_running_task());
    handle2.abort(); // Задача будет отменена при следующем .await внутри
}

Важно: future, переданный в spawn, должен быть Send + 'static — он может исполняться на любом потоке runtime.

I/O: TCP, файлы, буферизация

Tokio предоставляет асинхронные аналоги стандартных I/O-примитивов:

use tokio::net::TcpListener;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

async fn run_server() -> anyhow::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("Connection from {addr}");

        tokio::spawn(async move {
            let (reader, mut writer) = socket.into_split();
            let mut buf_reader = BufReader::new(reader);
            let mut line = String::new();

            while buf_reader.read_line(&mut line).await.unwrap() > 0 {
                writer.write_all(line.as_bytes()).await.unwrap();
                line.clear();
            }
        });
    }
}

Файловый I/O:

use tokio::fs;

async fn read_config() -> anyhow::Result<String> {
    let content = fs::read_to_string("config.toml").await?;
    Ok(content)
}

Под капотом tokio::fs использует thread pool для блокирующих syscalls — файловый I/O в Linux не имеет настоящего async API (кроме io_uring).

Синхронизация: Mutex, channels

Tokio предоставляет async-версии примитивов синхронизации:

use tokio::sync::{Mutex, mpsc, oneshot};
use std::sync::Arc;

// Async Mutex — можно держать через .await
let shared_state = Arc::new(Mutex::new(Vec::<String>::new()));

let state = shared_state.clone();
tokio::spawn(async move {
    let mut guard = state.lock().await;
    guard.push("hello".to_string());
});

// mpsc channel — multiple producer, single consumer
let (tx, mut rx) = mpsc::channel::<String>(100);

tokio::spawn(async move {
    tx.send("message".to_string()).await.unwrap();
});

while let Some(msg) = rx.recv().await {
    println!("Got: {msg}");
}

// oneshot — одноразовый канал для ответа
let (resp_tx, resp_rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
    let result = compute().await;
    resp_tx.send(result).unwrap();
});
let answer = resp_rx.await.unwrap();

Когда использовать tokio::sync::Mutex vs std::sync::Mutex: если критическая секция содержит .await — только tokio Mutex. Если нет — std::sync::Mutex быстрее (не требует async overhead).

Подводные камни

  • Blocking в async — вызов блокирующего кода (CPU-heavy вычисления, синхронный I/O) внутри async задачи блокирует весь поток runtime. Решение: tokio::task::spawn_blocking
let hash = tokio::task::spawn_blocking(move || {
    argon2::hash_password(password.as_bytes(), &salt)
}).await.unwrap();
  • Send + 'static — future для spawn не может содержать не-Send типы (Rc, Cell) и ссылки с ограниченным lifetime. Это частый источник ошибок компиляции
  • Cancellation safety — когда future отменяется (drop), он может остановиться на любом .await. Если между двумя await вы частично изменили состояние — оно останется в неконсистентном виде. Используйте tokio::select! с осторожностью
  • Размер future — каждый .await увеличивает размер state machine. Глубоко вложенные async fn могут создавать огромные futures. Решение: Box::pin для рекурсивных futures
// select! — cancellation safety важна
tokio::select! {
    msg = rx.recv() => { /* обработка */ }
    _ = tokio::time::sleep(Duration::from_secs(5)) => {
        println!("timeout");
    }
}

Итог

Tokio — мощный runtime, но требует понимания ownership в async-контексте. Используйте channels вместо shared state где возможно, помните про blocking и cancellation safety. Для сетевых сервисов на Rust — единственный разумный выбор.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.