lenec ru

← все посты

Микросервисная архитектура: паттерны коммуникации (sync, async, event-driven)

14K

Микросервисная архитектура решает проблемы монолита, но создаёт новую — распределённую коммуникацию. Выбор паттерна коммуникации между сервисами определяет надёжность, производительность и сложность системы. Разберём синхронные и асинхронные подходы, event-driven архитектуру и паттерны для распределённых транзакций.

Синхронная коммуникация: REST и gRPC

REST API

Классический подход — HTTP/JSON. Сервис A делает HTTP-запрос к сервису B, ждёт ответа. Просто, понятно, но создаёт tight coupling и проблемы с доступностью.

// Order Service вызывает Payment Service
const axios = require('axios');

async function createOrder(userId, items) {
  // 1. Создаём заказ
  const order = await db.orders.create({ userId, items, status: 'pending' });
  
  // 2. Синхронный вызов Payment Service
  try {
    const payment = await axios.post('http://payment-service/api/payments', {
      orderId: order.id,
      amount: calculateTotal(items)
    });
    
    // 3. Обновляем статус заказа
    await db.orders.update(order.id, { status: 'paid', paymentId: payment.data.id });
    
    return order;
  } catch (err) {
    // Payment Service недоступен — откатываем заказ
    await db.orders.update(order.id, { status: 'failed' });
    throw new Error('Payment failed');
  }
}

Проблемы:

  • Cascading failures — если Payment Service упал, Order Service тоже не работает
  • Latency — время ответа = сумма времён всех вызовов в цепочке
  • Tight coupling — Order Service знает URL и контракт Payment Service

Когда использовать: для read-операций (получение данных), когда нужен немедленный ответ и допустима зависимость от доступности других сервисов.

gRPC

Бинарный протокол на базе HTTP/2 с Protocol Buffers. Быстрее REST (меньше overhead на сериализацию), поддерживает streaming и bidirectional communication.

// payment.proto
syntax = "proto3";

service PaymentService {
  rpc CreatePayment(PaymentRequest) returns (PaymentResponse);
  rpc StreamPayments(Empty) returns (stream Payment);
}

message PaymentRequest {
  string order_id = 1;
  double amount = 2;
}

message PaymentResponse {
  string payment_id = 1;
  string status = 2;
}
// Order Service (gRPC client)
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

const packageDefinition = protoLoader.loadSync('payment.proto');
const paymentProto = grpc.loadPackageDefinition(packageDefinition).PaymentService;

const client = new paymentProto('payment-service:50051', grpc.credentials.createInsecure());

async function createOrder(userId, items) {
  const order = await db.orders.create({ userId, items, status: 'pending' });
  
  return new Promise((resolve, reject) => {
    client.CreatePayment({ order_id: order.id, amount: calculateTotal(items) }, (err, response) => {
      if (err) {
        db.orders.update(order.id, { status: 'failed' });
        reject(err);
      } else {
        db.orders.update(order.id, { status: 'paid', paymentId: response.payment_id });
        resolve(order);
      }
    });
  });
}

Преимущества gRPC: производительность (в 5-10 раз быстрее REST для больших объёмов), строгий контракт (protobuf), streaming. Недостатки: сложнее отладка (бинарный протокол), меньше tooling, не работает в браузерах без grpc-web.

Когда использовать: для internal коммуникации между backend-сервисами, когда критична производительность (high-throughput системы).

Асинхронная коммуникация: message queues

Вместо прямого вызова сервис отправляет сообщение в очередь. Получатель обрабатывает его когда готов. Decoupling — отправитель не знает, кто и когда обработает сообщение.

RabbitMQ (message broker)

Классическая очередь сообщений. Поддерживает routing, exchanges, dead letter queues.

// Order Service (publisher)
const amqp = require('amqplib');

async function createOrder(userId, items) {
  const order = await db.orders.create({ userId, items, status: 'pending' });
  
  // Отправляем событие в RabbitMQ
  const connection = await amqp.connect('amqp://rabbitmq:5672');
  const channel = await connection.createChannel();
  
  await channel.assertQueue('order.created');
  channel.sendToQueue('order.created', Buffer.from(JSON.stringify({
    orderId: order.id,
    userId,
    items,
    total: calculateTotal(items)
  })));
  
  await channel.close();
  await connection.close();
  
  return order;
}
// Payment Service (consumer)
const amqp = require('amqplib');

async function startConsumer() {
  const connection = await amqp.connect('amqp://rabbitmq:5672');
  const channel = await connection.createChannel();
  
  await channel.assertQueue('order.created');
  
  channel.consume('order.created', async (msg) => {
    const { orderId, total } = JSON.parse(msg.content.toString());
    
    try {
      const payment = await processPayment(orderId, total);
      console.log(`Payment processed: ${payment.id}`);
      
      // Подтверждаем обработку
      channel.ack(msg);
      
      // Публикуем событие payment.completed
      channel.sendToQueue('payment.completed', Buffer.from(JSON.stringify({
        orderId,
        paymentId: payment.id
      })));
    } catch (err) {
      console.error('Payment failed:', err);
      // Отклоняем сообщение, оно вернётся в очередь или пойдёт в DLQ
      channel.nack(msg, false, false);
    }
  });
}

startConsumer();

Преимущества: Order Service не падает, если Payment Service недоступен. Сообщение ждёт в очереди. Retry и dead letter queues из коробки.

Apache Kafka (event streaming)

Не просто очередь — распределённый лог событий. Сообщения хранятся долго (дни/недели), можно перечитывать. Подходит для event sourcing и real-time analytics.

// Order Service (producer)
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ brokers: ['kafka:9092'] });
const producer = kafka.producer();

async function createOrder(userId, items) {
  const order = await db.orders.create({ userId, items, status: 'pending' });
  
  await producer.connect();
  await producer.send({
    topic: 'orders',
    messages: [
      {
        key: order.id,
        value: JSON.stringify({
          type: 'OrderCreated',
          orderId: order.id,
          userId,
          items,
          total: calculateTotal(items),
          timestamp: Date.now()
        })
      }
    ]
  });
  
  return order;
}
// Payment Service (consumer)
const consumer = kafka.consumer({ groupId: 'payment-service' });

async function startConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders', fromBeginning: false });
  
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());
      
      if (event.type === 'OrderCreated') {
        try {
          const payment = await processPayment(event.orderId, event.total);
          
          // Публикуем событие PaymentCompleted
          await producer.send({
            topic: 'payments',
            messages: [
              {
                key: event.orderId,
                value: JSON.stringify({
                  type: 'PaymentCompleted',
                  orderId: event.orderId,
                  paymentId: payment.id,
                  timestamp: Date.now()
                })
              }
            ]
          });
        } catch (err) {
          console.error('Payment failed:', err);
          // Логируем ошибку, можем отправить в DLQ-топик
        }
      }
    }
  });
}

startConsumer();

Kafka vs RabbitMQ: Kafka — для high-throughput (миллионы событий/сек), долгое хранение, event sourcing. RabbitMQ — для task queues, когда нужны сложные routing rules и гарантии доставки.

Event-driven architecture и event sourcing

Event-driven — сервисы реагируют на события, а не вызывают друг друга напрямую. Каждое изменение состояния — это событие.

Архитектура:

Order Service --> [OrderCreated] --> Kafka
                                       |
                                       +--> Payment Service --> [PaymentCompleted]
                                       |
                                       +--> Inventory Service --> [ItemsReserved]
                                       |
                                       +--> Notification Service --> Email/SMS

Каждый сервис подписан на нужные события и публикует свои. Полное decoupling — Order Service не знает о существовании Notification Service.

Event sourcing

Вместо хранения текущего состояния храним все события. Состояние восстанавливается replay событий.

// Event store
const events = [
  { type: 'OrderCreated', orderId: '123', items: [...], timestamp: 1000 },
  { type: 'PaymentCompleted', orderId: '123', paymentId: 'p456', timestamp: 2000 },
  { type: 'OrderShipped', orderId: '123', trackingId: 't789', timestamp: 3000 }
];

// Восстановление состояния
function rebuildOrderState(orderId) {
  const orderEvents = events.filter(e => e.orderId === orderId);
  
  const state = { id: orderId, status: 'unknown', items: [], paymentId: null, trackingId: null };
  
  for (const event of orderEvents) {
    switch (event.type) {
      case 'OrderCreated':
        state.status = 'created';
        state.items = event.items;
        break;
      case 'PaymentCompleted':
        state.status = 'paid';
        state.paymentId = event.paymentId;
        break;
      case 'OrderShipped':
        state.status = 'shipped';
        state.trackingId = event.trackingId;
        break;
    }
  }
  
  return state;
}

Преимущества: полная история изменений (audit log), можно восстановить состояние на любой момент времени, легко добавлять новые read-модели (CQRS). Недостатки: сложность, eventual consistency.

Saga pattern для distributed transactions

В микросервисах нет распределённых транзакций (2PC слишком медленный и хрупкий). Saga — это последовательность локальных транзакций с компенсирующими действиями при ошибке.

Choreography-based saga

Каждый сервис слушает события и публикует свои. Нет центрального координатора.

// Order Service
async function createOrder(userId, items) {
  const order = await db.orders.create({ userId, items, status: 'pending' });
  await publishEvent('OrderCreated', { orderId: order.id, userId, items, total: calculateTotal(items) });
  return order;
}

// Слушаем события
consumer.on('PaymentCompleted', async ({ orderId, paymentId }) => {
  await db.orders.update(orderId, { status: 'paid', paymentId });
  await publishEvent('OrderPaid', { orderId });
});

consumer.on('PaymentFailed', async ({ orderId, reason }) => {
  await db.orders.update(orderId, { status: 'cancelled' });
  await publishEvent('OrderCancelled', { orderId, reason });
});
// Payment Service
consumer.on('OrderCreated', async ({ orderId, total }) => {
  try {
    const payment = await processPayment(orderId, total);
    await publishEvent('PaymentCompleted', { orderId, paymentId: payment.id });
  } catch (err) {
    await publishEvent('PaymentFailed', { orderId, reason: err.message });
  }
});
// Inventory Service
consumer.on('OrderPaid', async ({ orderId }) => {
  const order = await getOrderDetails(orderId);
  
  try {
    await reserveItems(order.items);
    await publishEvent('ItemsReserved', { orderId });
  } catch (err) {
    // Компенсация: возвращаем деньги
    await publishEvent('ItemsReservationFailed', { orderId, reason: err.message });
  }
});

consumer.on('OrderCancelled', async ({ orderId }) => {
  // Компенсация: освобождаем зарезервированные товары
  await releaseItems(orderId);
});

Orchestration-based saga

Центральный orchestrator управляет saga. Проще отслеживать состояние, но создаёт single point of failure.

// Order Saga Orchestrator
class OrderSaga {
  async execute(userId, items) {
    const sagaId = generateId();
    const state = { sagaId, step: 0, orderId: null, paymentId: null };
    
    try {
      // Шаг 1: создать заказ
      state.orderId = await this.createOrder(userId, items);
      state.step = 1;
      
      // Шаг 2: обработать платёж
      state.paymentId = await this.processPayment(state.orderId, calculateTotal(items));
      state.step = 2;
      
      // Шаг 3: зарезервировать товары
      await this.reserveItems(state.orderId, items);
      state.step = 3;
      
      // Успех
      await this.completeOrder(state.orderId);
      return { success: true, orderId: state.orderId };
      
    } catch (err) {
      // Откат (compensating transactions)
      await this.compensate(state);
      return { success: false, error: err.message };
    }
  }
  
  async compensate(state) {
    // Откатываем в обратном порядке
    if (state.step >= 3) {
      await this.releaseItems(state.orderId);
    }
    if (state.step >= 2) {
      await this.refundPayment(state.paymentId);
    }
    if (state.step >= 1) {
      await this.cancelOrder(state.orderId);
    }
  }
  
  async createOrder(userId, items) {
    const response = await axios.post('http://order-service/orders', { userId, items });
    return response.data.id;
  }
  
  async processPayment(orderId, amount) {
    const response = await axios.post('http://payment-service/payments', { orderId, amount });
    return response.data.id;
  }
  
  async reserveItems(orderId, items) {
    await axios.post('http://inventory-service/reserve', { orderId, items });
  }
  
  async releaseItems(orderId) {
    await axios.post('http://inventory-service/release', { orderId });
  }
  
  async refundPayment(paymentId) {
    await axios.post('http://payment-service/refund', { paymentId });
  }
  
  async cancelOrder(orderId) {
    await axios.patch(`http://order-service/orders/${orderId}`, { status: 'cancelled' });
  }
  
  async completeOrder(orderId) {
    await axios.patch(`http://order-service/orders/${orderId}`, { status: 'completed' });
  }
}

Orchestration проще для сложных saga с множеством шагов и условной логикой. Choreography — для простых flow, где важна автономность сервисов.

Практический пример: заказ в e-commerce через события

Полный flow создания заказа с event-driven подходом:

1. User --> Order Service: POST /orders
2. Order Service --> Kafka: OrderCreated
3. Payment Service <-- Kafka: OrderCreated
4. Payment Service --> Kafka: PaymentCompleted
5. Inventory Service <-- Kafka: PaymentCompleted
6. Inventory Service --> Kafka: ItemsReserved
7. Shipping Service <-- Kafka: ItemsReserved
8. Shipping Service --> Kafka: OrderShipped
9. Notification Service <-- Kafka: OrderShipped
10. Notification Service --> Email/SMS

Каждый сервис автономен, может масштабироваться независимо, падение одного не ломает остальные. Eventual consistency — заказ не сразу переходит в статус "shipped", но это приемлемо для большинства бизнес-процессов.

Вывод

Выбор паттерна коммуникации в микросервисах зависит от требований. Синхронная коммуникация (REST, gRPC) — для read-операций и когда нужен немедленный ответ. Асинхронная (RabbitMQ, Kafka) — для decoupling и устойчивости к сбоям. Event-driven архитектура даёт максимальную автономность сервисов, но требует eventual consistency. Saga pattern решает проблему распределённых транзакций через компенсирующие действия. В реальных системах используется комбинация подходов — синхронные вызовы для критичных операций, асинхронные события для фоновых задач, saga для сложных бизнес-процессов.

Комментарии 0

  • Будьте первым, кто оставит комментарий.

Войдите, чтобы оставить комментарий.