
Дубли запросов — это деньги и репутация. Клиент нажал кнопку «Оплатить» дважды, браузер повторил запрос после обрыва, интеграция сделала ретрай. Без идемпотентности вы получите:
Идемпотентность устраняет класс проблем на корню: повтор одного и того же намерения даёт один и тот же результат. Это спокойные ретраи, меньше конфликтов в интеграциях и предсказуемая работа фоновых задач.
Вы не контролируете сеть и перезапуски, но контролируете свою архитектуру и хранение фактов. Это и есть опорная точка для идемпотентности.
Базовая идея проста: клиент присылает уникальный ключ намерения. Сервер:
Как формировать ключ:
Срок жизни ключа зависит от домена: для заказов обычно от 24 часов до 7 дней достаточно.
Ниже рабочая схема для REST‑эндпоинта создания заказа. Принципы легко переносятся на любой стек.
-- Ключ идемпотентности и кэш ответа
create table if not exists idempotency_keys (
key text primary key,
body_hash text not null,
response_status int,
response_body jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idempotency_keys_created_at_idx on idempotency_keys (created_at);
create or replace function set_updated_at()
returns trigger as $$
begin
new.updated_at = now();
return new;
end;
$$ language plpgsql;
create trigger trg_idem_updated
before update on idempotency_keys
for each row execute function set_updated_at();
# requirements: fastapi, uvicorn, psycopg[binary]
import hashlib
import json
from typing import Any, Dict
import psycopg
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
# Подключение к БД (pool)
pool = psycopg.ConnectionPool(
conninfo='postgresql://app:app@localhost:5432/app',
min_size=1,
max_size=10,
)
def sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
async def create_order(tx: psycopg.Connection, payload: Dict[str, Any]) -> Dict[str, Any]:
# Пример бизнес‑логики: создаём заказ и возвращаем его состояние
cur = tx.execute(
'insert into orders(customer_id, amount, status) values (%s, %s, %s) returning id, customer_id, amount, status',
(payload['customer_id'], payload['amount'], 'new')
)
row = cur.fetchone()
return {
'id': row[0],
'customer_id': row[1],
'amount': float(row[2]),
'status': row[3],
}
@app.post('/orders')
async def create_order_endpoint(request: Request, idempotency_key: str = Header(None, convert_underscores=False)):
if not idempotency_key:
raise HTTPException(status_code=400, detail='Отсутствует заголовок Idempotency-Key')
raw = await request.body()
body_hash = sha256_hex(raw)
payload = await request.json()
with pool.connection() as conn:
conn.execute('begin')
try:
# Пытаемся вставить ключ: если вставился — мы «владельцы» выполнения
inserted = conn.execute(
'insert into idempotency_keys(key, body_hash) values (%s, %s) on conflict do nothing',
(idempotency_key, body_hash)
)
if inserted.rowcount == 1:
# Новый ключ — выполняем бизнес‑операцию в той же транзакции
result = await create_order(conn, payload)
status = 201
# Кэшируем ответ
conn.execute(
'update idempotency_keys set response_status = %s, response_body = %s where key = %s',
(status, json.dumps(result), idempotency_key)
)
conn.execute('commit')
return JSONResponse(content=result, status_code=status)
# Ключ уже существует: ждём завершения первого запроса и проверяем хеш тела
row = conn.execute(
'select body_hash, response_status, response_body from idempotency_keys where key = %s for update',
(idempotency_key,)
).fetchone()
if not row:
conn.execute('rollback')
raise HTTPException(status_code=500, detail='Сбой хранения ключа идемпотентности')
stored_hash, resp_status, resp_body = row
if stored_hash != body_hash:
conn.execute('rollback')
raise HTTPException(status_code=409, detail='Idempotency-Key уже использован с другим телом запроса')
if resp_status is not None:
# Ответ уже готов — возвращаем кэш
conn.execute('commit')
return JSONResponse(content=json.loads(resp_body), status_code=resp_status)
# Ответ ещё не записан: редко, но возможно. Подождём чуть‑чуть и перечитаем.
# В бою лучше применить нотификации LISTEN/NOTIFY или короткий поллинг с таймаутом.
conn.execute('commit')
raise HTTPException(status_code=425, detail='Запрос обрабатывается, повторите чуть позже')
except Exception:
conn.execute('rollback')
raise
Пример вызова:
curl -X POST \
-H 'Content-Type: application/json' \
-H 'Idempotency-Key: 4f2b4a1a-7f0e-4d1b-9b2e-9a0fb02a1d77' \
-d '{"customer_id":123,"amount":1500}' \
http://localhost:8000/orders
Повтор того же запроса с тем же ключом вернёт ровно тот же ответ. Если тело другое — 409, чтобы не было «магии» и скрытых конфликтов данных.
Повторная доставка сообщения — норма для надёжных очередей. Нужно, чтобы задача исполнялась один раз логически, даже если код запускается несколько раз.
Есть два слоя защиты:
create table if not exists tasks (
id bigserial primary key,
task_key text not null, -- бизнес‑идентификатор для дедупликации (например, payment:123)
payload jsonb not null,
status text not null default 'pending', -- pending | running | done | failed
run_at timestamptz not null default now(),
attempts int not null default 0,
last_error text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
unique (task_key)
);
create index if not exists tasks_ready_idx on tasks (status, run_at);
Исполнитель выбирает задачку без гонок через блокировку строк:
import time
import json
import psycopg
pool = psycopg.ConnectionPool(
conninfo='postgresql://app:app@localhost:5432/app',
min_size=1,
max_size=10,
)
def handle_task(tx: psycopg.Connection, task):
# Здесь вызывается бизнес‑операция, которая сама по себе идемпотентна
data = task['payload']
# ... делаем работу ...
return {'ok': True}
def worker_loop():
while True:
with pool.connection() as conn:
conn.execute('begin')
row = conn.execute(
"""
select id, task_key, payload, attempts
from tasks
where status = 'pending' and run_at <= now()
for update skip locked
limit 1
"""
).fetchone()
if not row:
conn.execute('commit')
time.sleep(0.5)
continue
task = {
'id': row[0],
'task_key': row[1],
'payload': row[2],
'attempts': row[3],
}
conn.execute('update tasks set status = %s where id = %s', ('running', task['id']))
conn.execute('commit')
# Выполняем вне транзакции, чтобы не держать блокировки
ok = True
err = None
try:
with pool.connection() as c2:
c2.execute('begin')
handle_task(c2, task)
c2.execute('commit')
except Exception as e:
ok = False
err = str(e)
with pool.connection() as conn3:
conn3.execute('begin')
if ok:
conn3.execute('update tasks set status = %s where id = %s', ('done', task['id']))
else:
delay_sec = min(60, 2 ** min(6, task['attempts']))
conn3.execute(
'update tasks set status = %s, attempts = attempts + 1, last_error = %s, run_at = now() + (%s || '' seconds'')::interval where id = %s',
('pending', err, str(delay_sec), task['id'])
)
conn3.execute('commit')
За счёт for update skip locked одновременно работающие процессы не возьмут одну и ту же запись. Уникальный task_key не даст создать дубликат семантической задачи.
Даже с ключами и блокировками у вас останутся повторные попытки. Чтобы деньги и учёт были в порядке, делаем эффекты идемпотентными на уровне БД.
Идея: перед выполнением эффекта фиксируем «факт применения» с уникальным бизнес‑идентификатором. Если такой факт уже есть — пропускаем эффект. Делать это нужно в той же транзакции, где меняются связанные данные.
-- Факты применённых эффектов (например, списаний)
create table if not exists applied_effects (
id bigserial primary key,
effect_key text not null unique, -- например, charge:payment_id:123
created_at timestamptz not null default now()
);
Использование в транзакции:
-- Псевдокод на SQL для атомарности
begin;
-- Пытаемся зафиксировать факт эффекта
insert into applied_effects(effect_key) values ('charge:payment:123')
on conflict do nothing;
-- Проверяем, действительно ли мы первые
with ins as (
select 1 from applied_effects where effect_key = 'charge:payment:123'
)
-- Выполняем эффект только если запись появилась сейчас
-- На практике можно смотреть количество затронутых строк при insert
update payments set charged = true where id = 123 and not charged;
commit;
В прикладном коде проще: делаете insert и анализируете rowcount. Если 0 — эффект уже применён, выходим без действий.
-- Чистим ключи старше 7 дней
delete from idempotency_keys where created_at < now() - interval '7 days';
Важно: удаление старых ключей не должно ломать активные ретраи. Поэтому выбирайте TTL больше возможного окна повторов клиента/интеграции.
Метрики и логи помогут быстро ловить неверное использование ключей и регрессии:
Алерты:
Спринт 1:
Спринт 2:
Если договориться, что повтор вернёт только итоговый ресурс (например, объект заказа по его идентификатору), то да. Но хранение готового ответа упрощает жизнь клиентам и снижает нагрузку на бэкенд.
Транзакция защищает от частично применённых изменений внутри одной попытки. Идемпотентность гарантирует одинаковый результат при повторах разных попыток во времени.
Можно хранить ключи в Redis с TTL для очень нагруженных систем. Но критичные эффекты всё равно защищайте уникальными ключами в постоянном хранилище (БД), чтобы переживать перезагрузки.
При повторном запросе лучше ждать готовый ответ с коротким таймаутом и затем вернуться к клиенту с 425 или 202. Для UX — показывайте «ожидаем завершения операции», а не спиннер без конца.
Идемпотентность — это не про красивый паттерн, а про спокойные деньги и предсказуемые процессы. Внедрив её в API и фоновые задачи, вы сокращаете инциденты и стоимость поддержки, а команда релизит смелее.