Rust tokio: асинхронный runtime — практическое руководство
Rust не имеет встроенного async runtime — язык предоставляет синтаксис async/await и трейт Future, а исполнение futures отдаёт внешним библиотекам. Tokio — де-факто стандартный runtime для асинхронного Rust: event loop, планировщик задач, асинхронный I/O и примитивы синхронизации в одном пакете.
Зачем tokio: Future и runtime
В Rust async fn возвращает impl Future — ленивую структуру, которая ничего не делает, пока её не заполлит runtime. Без runtime futures не исполняются:
// Это просто создаёт Future, но не запускает его
async fn fetch_data() -> String {
// ...
"data".to_string()
}
// Нужен runtime для исполнения
#[tokio::main]
async fn main() {
let data = fetch_data().await;
println!("{data}");
}
Макрос #[tokio::main] создаёт multi-threaded runtime и блокирует текущий поток до завершения async main.
Runtime: multi-thread vs current_thread
Tokio предлагает два режима:
// Multi-thread: work-stealing scheduler, N потоков (по умолчанию = CPU cores)
#[tokio::main]
async fn main() { }
// Явная настройка
#[tokio::main(worker_threads = 4)]
async fn main() { }
// Single-thread: один поток, меньше overhead
#[tokio::main(flavor = "current_thread")]
async fn main() { }
current_thread подходит для CLI-утилит и лёгких сервисов. Для нагруженных серверов — multi-thread с work-stealing между потоками.
Задачи: spawn, JoinHandle, abort
tokio::spawn запускает future как отдельную задачу в планировщике:
use tokio::task::JoinHandle;
async fn process_requests() {
let handle: JoinHandle<u32> = tokio::spawn(async {
// Выполняется конкурентно
expensive_computation().await
});
// Дождаться результата
let result = handle.await.unwrap();
// Или отменить задачу
let handle2 = tokio::spawn(long_running_task());
handle2.abort(); // Задача будет отменена при следующем .await внутри
}
Важно: future, переданный в spawn, должен быть Send + 'static — он может исполняться на любом потоке runtime.
I/O: TCP, файлы, буферизация
Tokio предоставляет асинхронные аналоги стандартных I/O-примитивов:
use tokio::net::TcpListener;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
async fn run_server() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (socket, addr) = listener.accept().await?;
println!("Connection from {addr}");
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut buf_reader = BufReader::new(reader);
let mut line = String::new();
while buf_reader.read_line(&mut line).await.unwrap() > 0 {
writer.write_all(line.as_bytes()).await.unwrap();
line.clear();
}
});
}
}
Файловый I/O:
use tokio::fs;
async fn read_config() -> anyhow::Result<String> {
let content = fs::read_to_string("config.toml").await?;
Ok(content)
}
Под капотом tokio::fs использует thread pool для блокирующих syscalls — файловый I/O в Linux не имеет настоящего async API (кроме io_uring).
Синхронизация: Mutex, channels
Tokio предоставляет async-версии примитивов синхронизации:
use tokio::sync::{Mutex, mpsc, oneshot};
use std::sync::Arc;
// Async Mutex — можно держать через .await
let shared_state = Arc::new(Mutex::new(Vec::<String>::new()));
let state = shared_state.clone();
tokio::spawn(async move {
let mut guard = state.lock().await;
guard.push("hello".to_string());
});
// mpsc channel — multiple producer, single consumer
let (tx, mut rx) = mpsc::channel::<String>(100);
tokio::spawn(async move {
tx.send("message".to_string()).await.unwrap();
});
while let Some(msg) = rx.recv().await {
println!("Got: {msg}");
}
// oneshot — одноразовый канал для ответа
let (resp_tx, resp_rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
let result = compute().await;
resp_tx.send(result).unwrap();
});
let answer = resp_rx.await.unwrap();
Когда использовать tokio::sync::Mutex vs std::sync::Mutex: если критическая секция содержит .await — только tokio Mutex. Если нет — std::sync::Mutex быстрее (не требует async overhead).
Подводные камни
- Blocking в async — вызов блокирующего кода (CPU-heavy вычисления, синхронный I/O) внутри async задачи блокирует весь поток runtime. Решение:
tokio::task::spawn_blocking
let hash = tokio::task::spawn_blocking(move || {
argon2::hash_password(password.as_bytes(), &salt)
}).await.unwrap();
- Send + 'static — future для spawn не может содержать не-Send типы (Rc, Cell) и ссылки с ограниченным lifetime. Это частый источник ошибок компиляции
- Cancellation safety — когда future отменяется (drop), он может остановиться на любом .await. Если между двумя await вы частично изменили состояние — оно останется в неконсистентном виде. Используйте
tokio::select!с осторожностью - Размер future — каждый .await увеличивает размер state machine. Глубоко вложенные async fn могут создавать огромные futures. Решение:
Box::pinдля рекурсивных futures
// select! — cancellation safety важна
tokio::select! {
msg = rx.recv() => { /* обработка */ }
_ = tokio::time::sleep(Duration::from_secs(5)) => {
println!("timeout");
}
}
Итог
Tokio — мощный runtime, но требует понимания ownership в async-контексте. Используйте channels вместо shared state где возможно, помните про blocking и cancellation safety. Для сетевых сервисов на Rust — единственный разумный выбор.