Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Outbox‑паттерн и транзакционная отправка событий: как не терять данные между БД и брокером и снизить число инцидентов

Разработка и технологии16 января 2026 г.
Когда сервис пишет в базу и отправляет событие в Kafka или очередь, между этими действиями легко потерять данные или получить дубликаты. Outbox‑паттерн решает это: сначала мы атомарно сохраняем событие в таблицу вместе с бизнес‑изменением, затем отдельный воркер безопасно публикует событие в брокер. В итоге — меньше инцидентов, больше доверия к данным и быстрее интеграции.
Outbox‑паттерн и транзакционная отправка событий: как не терять данные между БД и брокером и снизить число инцидентов

  • Оглавление
    • Зачем нужен Outbox и в чём бизнес‑выгода
    • Архитектура: как работает Outbox‑паттерн
    • Схема таблицы и индексы в PostgreSQL
    • Пример транзакции: создаём заказ и пишем событие
    • Воркер‑ретранслятор: безопасная публикация и повторные попытки
    • Защита от дубликатов на стороне потребителя
    • Производительность, очистка и партиционирование Outbox
    • Альтернативы и дополнения: триггеры, LISTEN/NOTIFY, Debezium
    • Наблюдаемость, метрики и алерты
    • Типичные ошибки и как их избежать
    • Чек‑лист внедрения

Зачем нужен Outbox и в чём бизнес‑выгода

Простой пример: пользователь оформляет заказ. Сервис создаёт запись в БД и должен отправить событие «order.created» в Kafka, чтобы склады, биллинг и аналитика среагировали. Если отправлять событие сразу после вставки в БД, возникают две проблемы:

  • Потеря событий: база закоммитилась, а отправка в брокер упала — интеграции не узнают о заказе.
  • Дубликаты: отправка прошла, но подтверждение не дошло — мы повторим отправку и получим два события.

Оба сценария приводят к инцидентам: недопоставка, неверные начисления, каскадные ошибки, ручные разборы. Outbox‑паттерн решает это предсказуемо и дёшево:

  • Гарантирует, что каждое бизнес‑изменение сопровождается событием, либо не произойдёт вообще.
  • Позволяет безопасно повторять отправку без риска испортить данные.
  • Даёт чёткую точку аудита: можно увидеть все события и их статус.

В итоге: меньше инцидентов, меньше ручных «распутываний», быстрее онбординг новых интеграций.

Архитектура: как работает Outbox‑паттерн

Идея проста:

  1. В одной транзакции с бизнес‑операцией (например, INSERT в orders) мы пишем событие в таблицу outbox.
  2. Отдельный процесс (воркер‑ретранслятор) периодически читает неподтверждённые записи из outbox и публикует их в брокер сообщений (Kafka, RabbitMQ, NATS, SQS — не важно).
  3. После успешной публикации воркер помечает запись как опубликованную. При ошибке — увеличивает счётчик попыток и попробует ещё раз с увеличением паузы.

Ключ: запись в outbox и изменение бизнес‑состояния происходят атомарно в одной транзакции. Значит, либо мы имеем и заказ, и событие, либо ничего; «рассинхрона» нет.

Схема таблицы и индексы в PostgreSQL

Минимальный набор полей: тип агрегата, идентификатор, тип события, полезная нагрузка, ключ дедупликации, время создания, отметка публикации и счётчик попыток.

-- Таблица Outbox для PostgreSQL
CREATE TABLE IF NOT EXISTS outbox_events (
  id UUID PRIMARY KEY,
  aggregate_type TEXT NOT NULL,             -- например, 'order'
  aggregate_id TEXT NOT NULL,               -- например, ID заказа
  event_type TEXT NOT NULL,                 -- например, 'order.created'
  payload JSONB NOT NULL,                   -- данные события
  headers JSONB NOT NULL DEFAULT '{}',      -- метаданные: трассировка, источник
  dedup_key TEXT NOT NULL,                  -- ключ для снятия дублей на потребителе
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  published_at TIMESTAMPTZ NULL,            -- момент успешной публикации
  attempt SMALLINT NOT NULL DEFAULT 0
);

-- Индексы для быстрого чтения непубликованных событий и для чистки
CREATE INDEX IF NOT EXISTS idx_outbox_pending
  ON outbox_events (published_at NULLS FIRST, created_at);

CREATE INDEX IF NOT EXISTS idx_outbox_aggregate
  ON outbox_events (aggregate_type, aggregate_id);

-- Ограничение уникальности по dedup_key (опционально, если генерируете устойчивый ключ)
CREATE UNIQUE INDEX IF NOT EXISTS ux_outbox_dedup
  ON outbox_events (dedup_key);

Комментарии:

  • dedup_key — строка, по которой потребители могут гарантированно убрать дубликаты. Это может быть UUID операции или комбинация (aggregate_id + версия).
  • Индекс idx_outbox_pending позволяет быстро находить «висящие» записи.

Пример транзакции: создаём заказ и пишем событие

Ниже — короткий пример на Go: создаём заказ и атомарно пишем событие в outbox. Код полностью рабочий: он создаёт таблицы (если их нет), добавляет заказ и событие, затем запускает воркер, который «публикует» событие (в примере — в лог). Вместо лога подключите клиент вашего брокера.

package main

import (
	context "context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"math"
	"os"
	"os/signal"
	"syscall"
	"time"

	_ "github.com/lib/pq"
	"github.com/google/uuid"
)

// Сообщение для публикации
type OutboxMessage struct {
	ID            string          `json:"id"`
	AggregateType string          `json:"aggregate_type"`
	AggregateID   string          `json:"aggregate_id"`
	EventType     string          `json:"event_type"`
	Payload       json.RawMessage `json:"payload"`
	Headers       json.RawMessage `json:"headers"`
	DedupKey      string          `json:"dedup_key"`
	CreatedAt     time.Time       `json:"created_at"`
}

func main() {
	// Подключение к PostgreSQL
	dsn := envOr("PG_DSN", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
	db, err := sql.Open("postgres", dsn)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	if err := initSchema(db); err != nil {
		log.Fatal(err)
	}

	// Вставим тестовый заказ и событие
	orderID := uuid.New().String()
	if err := createOrderWithEvent(db, orderID); err != nil {
		log.Fatal(err)
	}
	log.Println("Заказ создан и событие записано в outbox:", orderID)

	// Запустим воркер публикации
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	pub := func(ctx context.Context, m OutboxMessage) error {
		// Здесь используйте клиент Kafka/Rabbit/NATS. В примере — лог.
		log.Printf("Публикация события: id=%s type=%s agg=%s/%s\n", m.ID, m.EventType, m.AggregateType, m.AggregateID)
		return nil
	}

	cfg := RelayConfig{BatchSize: 100, BaseBackoff: time.Second, MaxBackoff: 30 * time.Second}
	go startRelay(ctx, db, pub, cfg)

	<-ctx.Done()
	log.Println("Остановка…")
}

func envOr(k, def string) string {
	if v := os.Getenv(k); v != "" {
		return v
	}
	return def
}

func initSchema(db *sql.DB) error {
	sqlStmt := `
CREATE TABLE IF NOT EXISTS orders (
  id TEXT PRIMARY KEY,
  status TEXT NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE IF NOT EXISTS outbox_events (
  id UUID PRIMARY KEY,
  aggregate_type TEXT NOT NULL,
  aggregate_id TEXT NOT NULL,
  event_type TEXT NOT NULL,
  payload JSONB NOT NULL,
  headers JSONB NOT NULL DEFAULT '{}',
  dedup_key TEXT NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  published_at TIMESTAMPTZ NULL,
  attempt SMALLINT NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_outbox_pending
  ON outbox_events (published_at NULLS FIRST, created_at);
CREATE UNIQUE INDEX IF NOT EXISTS ux_outbox_dedup
  ON outbox_events (dedup_key);
`
	_, err := db.Exec(sqlStmt)
	return err
}

func createOrderWithEvent(db *sql.DB, orderID string) error {
	tx, err := db.Begin()
	if err != nil { return err }
	defer func() { _ = tx.Rollback() }()

	if _, err := tx.Exec(`INSERT INTO orders(id, status) VALUES($1, 'created')`, orderID); err != nil {
		return err
	}

	payload := map[string]any{"order_id": orderID, "status": "created"}
	payloadBytes, _ := json.Marshal(payload)
	headers := map[string]any{"source": "orders-service"}
	headersBytes, _ := json.Marshal(headers)

	outboxID := uuid.New()
	dedup := orderID + ":v1" // стабильный ключ для снятия дублей

	_, err = tx.Exec(`
INSERT INTO outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, dedup_key)
VALUES ($1, 'order', $2, 'order.created', $3, $4, $5)
ON CONFLICT (dedup_key) DO NOTHING
`, outboxID, orderID, payloadBytes, headersBytes, dedup)
	if err != nil {
		return err
	}

	return tx.Commit()
}

// Конфиг воркера
 type RelayConfig struct {
	BatchSize   int
	BaseBackoff time.Duration
	MaxBackoff  time.Duration
}

// startRelay — ретранслятор: читает неподтверждённые записи, публикует и помечает как опубликованные
func startRelay(ctx context.Context, db *sql.DB, publish func(context.Context, OutboxMessage) error, cfg RelayConfig) {
	backoff := cfg.BaseBackoff
	for {
		select {
		case <-ctx.Done():
			return
		default:
		}

		msgs, err := lockAndLoadBatch(ctx, db, cfg.BatchSize)
		if err != nil {
			log.Println("ошибка загрузки батча:", err)
			time.Sleep(backoff)
			backoff = minDuration(cfg.MaxBackoff, time.Duration(float64(backoff)*1.5))
			continue
		}
		backoff = cfg.BaseBackoff

		if len(msgs) == 0 {
			time.Sleep(300 * time.Millisecond)
			continue
		}

		for _, m := range msgs {
			if err := publish(ctx, m); err != nil {
				log.Printf("публикация не удалась (id=%s): %v\n", m.ID, err)
				markAttempt(ctx, db, m.ID)
				continue
			}
			markPublished(ctx, db, m.ID)
		}
	}
}

func lockAndLoadBatch(ctx context.Context, db *sql.DB, n int) ([]OutboxMessage, error) {
	tx, err := db.BeginTx(ctx, &sql.TxOptions{})
	if err != nil { return nil, err }
	defer func() { _ = tx.Rollback() }()

	rows, err := tx.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload, headers, dedup_key, created_at
FROM outbox_events
WHERE published_at IS NULL
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT $1
`, n)
	if err != nil { return nil, err }
	defer rows.Close()

	var res []OutboxMessage
	for rows.Next() {
		var m OutboxMessage
		if err := rows.Scan(&m.ID, &m.AggregateType, &m.AggregateID, &m.EventType, &m.Payload, &m.Headers, &m.DedupKey, &m.CreatedAt); err != nil {
			return nil, err
		}
		res = append(res, m)
	}
	if err := rows.Err(); err != nil { return nil, err }

	if err := tx.Commit(); err != nil { return nil, err }
	return res, nil
}

func markPublished(ctx context.Context, db *sql.DB, id string) {
	_, err := db.ExecContext(ctx, `UPDATE outbox_events SET published_at = now() WHERE id = $1`, id)
	if err != nil { log.Println("ошибка отметки published:", err) }
}

func markAttempt(ctx context.Context, db *sql.DB, id string) {
	_, err := db.ExecContext(ctx, `UPDATE outbox_events SET attempt = attempt + 1 WHERE id = $1`, id)
	if err != nil { log.Println("ошибка увеличения attempt:", err) }
}

func minDuration(a, b time.Duration) time.Duration { if a < b { return a }; return b }

Что важно в этом примере:

  • FOR UPDATE SKIP LOCKED позволяет безопасно запускать несколько воркеров параллельно — они не мешают друг другу.
  • Отдельные функции markPublished/markAttempt сохраняют результат попытки.
  • dedup_key создаётся стабильно — потребители могут убирать дубликаты.

Воркер‑ретранслятор: безопасная публикация и повторные попытки

Рекомендации к продакшн‑воркеру:

  • Повторные попытки с экспоненциальной паузой и джиттером (случайным разбросом), чтобы не устроить пики.
  • Ограничение числа попыток и перевод «безнадёжных» событий в отдельный статус (dead letter). Лучше хранить их в отдельной таблице outbox_dead для последующего ручного анализа.
  • Параллельная обработка батча с контролем одновременных публикаций (ограничение пула горутин/потоков).
  • Трассировка: прокидывайте trace_id в headers, чтобы связать событие с исходным запросом в системе наблюдаемости.

Защита от дубликатов на стороне потребителя

Событие может быть опубликовано больше одного раза (например, воркер не получил подтверждение от брокера). Потребителю нужна идемпотентная обработка. Три простых способа:

  1. Уникальный ключ в таблице потребителя по dedup_key, чтобы «вставить‑или‑пропустить»:
-- У потребителя
CREATE TABLE IF NOT EXISTS processed_events (
  dedup_key TEXT PRIMARY KEY,
  processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- При обработке события
INSERT INTO processed_events(dedup_key) VALUES ($1) ON CONFLICT DO NOTHING;
-- Далее делать бизнес‑логику только если новая строка действительно вставлена
  1. Версионирование и оптимистические блокировки: храните версию агрегата и применяйте событие только если версия больше текущей.

  2. Временной кэш ключей: храните dedup_key в Redis с TTL, если обработка чисто реактивная и вы не изменяете хранение состояния.

Производительность, очистка и партиционирование Outbox

  • Очистка: published записи не должны копиться годами. Держите их 7–30 дней для расследований, затем переносите в архив и удаляйте партиями (например, по created_at). Делайте это в фоне, не блокируя горячие запросы.
  • Партиционирование: если у вас высокий поток событий, сделайте партиции по дате (например, по месяцам). Это ускорит сканирование и упрощает чистку.
  • Индексы: держите узкий «горячий» индекс по (published_at, created_at). Остальные — по потребности аналитики.
  • Batch‑обработка: читайте события порциями, публикуйте параллельно с ограничением конкурентности.

Пример чистки партиями:

-- Удаляем старые опубликованные записи пачками по 10k
DELETE FROM outbox_events
WHERE published_at IS NOT NULL AND published_at < now() - INTERVAL '30 days'
LIMIT 10000;

В PostgreSQL до версии 13 LIMIT в DELETE не поддерживался, используйте CTE с ORDER BY и PK, либо партиции. В новых версиях можно добавить USING и фильтры по партициям.

Альтернативы и дополнения: триггеры, LISTEN/NOTIFY, Debezium

  • Триггеры: можно писать в outbox из триггера на таблицу заказов. Это уменьшает код в приложении, но усложняет отладку и версионирование логики.
  • LISTEN/NOTIFY: после вставки в outbox делайте NOTIFY — воркер проснётся и заберёт свежие записи. Это снижает задержку по сравнению с простым polling каждые N миллисекунд.
  • CDC‑подход (Debezium и др.): специализированные коннекторы читают журнал транзакций БД и публикуют события в Kafka. Часто их комбинируют с Outbox‑таблицей (outbox‑router), чтобы формировать корректные бизнес‑события и не «подслушивать» внутренние служебные изменения.

Наблюдаемость, метрики и алерты

Какие метрики держать под рукой:

  • outbox.pending.count — число неподтверждённых событий.
  • outbox.publish.latency.p50/p95/p99 — задержка между created_at и published_at.
  • outbox.retry.count — число повторных попыток по типу события.
  • outbox.dead.count — число «мертвых» событий.

Алерты:

  • Резкий рост pending выше порога N минут.
  • Нет успешных публикаций X минут.
  • Ошибки публикации > Y% в течение 5 минут.

Логи и трассировка:

  • Логируйте id, dedup_key, aggregate_id, event_type, attempt и ошибку брокера.
  • Прокидывайте trace_id в headers и связывайте его с исходным HTTP‑запросом.

Типичные ошибки и как их избежать

  • Писать в outbox и бизнес‑таблицу в разных транзакциях. Итог — рассинхрон. Решение: всегда одна транзакция.
  • Отсутствие dedup_key или нестабильный ключ. Итог — сложно убирать дубли. Решение: стабильный ключ на основе сущности и версии.
  • Один воркер без SKIP LOCKED. Итог — гонки и блокировки. Решение: FOR UPDATE SKIP LOCKED и несколько воркеров.
  • Нет чистки outbox. Итог — медленные запросы, раздутые индексы. Решение: регулярная чистка/партиционирование.
  • «Слишком умные» события (слишком много данных). Итог — тяжёлые публикации, сложные изменения схемы. Решение: держите payload компактным, добавляйте только необходимые поля.

Чек‑лист внедрения

  • Таблица outbox с индексами и стабильным dedup_key.
  • Транзакция: бизнес‑запись + вставка в outbox.
  • Воркер с FOR UPDATE SKIP LOCKED, батчами и ретраями.
  • Идемпотентные потребители с защитой от дублей.
  • Метрики, алерты, трассировка и логи с ключами событий.
  • Очистка/архивация и, при необходимости, партиционирование.
  • Тесты на сбои: падение брокера, обрыв сети, повтор запуска воркера, конфликт dedup_key.

Заключение: Outbox‑паттерн — это небольшой слой поверх вашей БД, который радикально снижает риск потерь и дублей при интеграциях через события. Он прост, предсказуем, хорошо масштабируется и окупается уже на первом предотвращённом инциденте с платежами или логистикой.


outboxнадёжностьмикросервисы