crossSvg

CDC и логическая репликация для баз данных, реализованных на стеке open source-решений

Публикации в СМИ
16.11.2022

Источник: Хабр: "CDC и логическая репликация для баз данных, реализованных на стеке open source-решений"

Привет, Хабр! На связи СберТех — мы создаём Platform V, цифровую платформу Сбера для разработки бизнес-приложений.

В платформу входит более 60 продуктов на базе собственных сборок open source, доработанных до уровня enterprise по функциональности, безопасности, производительности и отказоустойчивости. 

В этой статье расскажем про реализацию паттерна Change Data Capture и межкластерной репликации данных в продукте Platform V DataGrid, распределённой in-memory базе данных для высокопроизводительных вычислений. А также об особенностях внедрения функции и вариантах репликации. Написать материал помог наш коллега Николай Ижиков из команды по развитию баз данных на стеке open source.

4e6469f3ce96dc8aeea45c1369897040.jpg

Что такое Change Data Capture

Представим, что у вас есть база данных с критичными для бизнеса данными и жёсткими SLA по чтению и записи.  

В то же время есть компоненты системы, которым необходимо реагировать на изменения в данных, например на уведомления о поступлении нового заказа, изменение данных пользователя и т. д. Таких компонентов много, и со временем их количество только растёт. 

Если база уже нагруженная, навешивать на микросервис, создающий заказ, синхронную обработку или писать хранимую процедуру нерационально — увеличим тайминги по ответам, вероятен риск нарушения SLA.

И тут на помощь приходит паттерн Change Data Capture или CDC.

6c08bc0b10fe58dfe81618548b6200f5.png

Когда в базу данных вносятся изменения, для ускорения записи и оптимизации операций она пишет журнал изменений (UNDO, REDO логи). В него база данных последовательно записывает дельту, которую вы изменяете. 

CDC — это приложение, которое умеет обрабатывать логи изменений, выделять из них события об изменении данных и уведомлять об этом потребителя изменений, реализующего бизнес-логику.

В результате получаем линейные, упорядоченные по времени события изменений данных: что было, что есть, какая операция, над какой таблицей или кэшем была выполнена. Обработка асинхронная, не триггер и не хранимая процедура. Данные закоммитились, приложение, которое их отправляет, продолжило работать дальше, а потребитель через какое-то время получает уведомление о произошедшем изменении. 

Как и когда использовать CDC

Стриминг изменений в хранилище данных. У вас есть DWH. Обычно в режиме реального времени данные в неё поступать не должны. Для перекладывания данных можно написать процедуры, которые будут раз в час или в сутки определять дельту и отправлять её в хранилище. С помощью CDC эти же данные можно перекладывать с меньшими задержками. 

  • Постобработка событий. В системе произошло событие — пользователь зарегистрировался, создал заказ, загрузил новый файл, — и, согласно бизнес-процессу, по новой записи нужно инициировать модерацию или другие действия. 
  • Аналитика. По поступающим в CDC событиям можно считать аналитику в режиме, близком к реальному времени.
  • Логическая репликация. В CDC у нас на руках есть ВСЕ изменения, которые происходят в базе. Для реализации репликации нужно всего лишь надёжно исполнить их на реплике.

CDC в open source database

Дизайн

При любой доработке сложной системы, к которой, очевидно, относится распределённая СУБД, всегда есть риск что-то сломать. Лучший выход — делать новую фичу, вообще не трогая существующие.

Поэтому, проектируя CDC на базе Ignite, команда решила, что ignite-cdc должен выступать как отдельный java-процесс, не влияющий на ноду Ignite.

Ignite в persistence-режиме, как и любая классическая СУБД, записывает изменения в WAL (Write-Ahead Log). WAL — бинарный файл, содержащий изменения, дельты, которые мы периодически пишем в основную память (page memory).

Время от времени WAL-сегмент переходит в архив. Ignite-cdc видит, что появился архивный WAL-сегмент, и обрабатывает его.

Обработка — уведомление потребителя об изменениях. Есть public API для потребителя, но можно написать и свой.  

Важно, что при этом нет перерасхода места на диске: WAL-архив — это существующая функциональность, которая нужна для восстановления после сбоев. Ignite-cdc обрабатывает ровно те же сегменты, никаких новых данных на диске не появляется.

Следующий важный момент — возможность сохранять состояние чтения. Ignite-cdc — отдельный процесс, который может падать. Нужно реализовать возможность фиксации состояния потребителя каждый раз, когда он решил, что данные обработаны и можно сохраниться. При падении обработка будет продолжена с места последнего commit-а. К счастью, поддерживать это довольно просто: нужно всего лишь сохранять указатель на том месте в сегменте, на котором чтение остановилось.

Из возможности сохранять состояние следует возможность сделать fail-fast-приложение. При любых проблемах Ignite-cdc падает. Предполагается, что поднимать его будут с помощью ОС-механизмов.

На уровне ноды всё выглядит вот так:

b05e8735058f21c8050631a4afa1a271.png

Есть небольшая тонкость: WAL-архив не бесконечный, Ignite складывает в архив столько сегментов, сколько было указано в настройках. При архивации n+1 сегмента самый старый удаляется.

Чтобы избежать ситуаций, когда CDC затормозил и не обработал уже удалённый сегмент, архивный сегмент hard-link’ом переносится в папку, с которой работает только Ignite-cdc. 

Если удалим данные из архива, файл останется в папке для СDC, и данные будут доступны. 

Если Ignite-cdc обработал сегмент, его можно будет сразу же удалить. Данные исчезнут с диска, когда оба hard-link’а будут удалены. 

Приложению понадобятся метрики. API уже есть в Ignite, и его нужно переиспользовать.

API и настройки

Для настройки CDC есть три параметра, которые нужно настроить на уровне ноды.

public class DataStorageConfiguration { long walForceArchiveTimeout; String cdcWalPath; } public class DataRegionConfiguration implements Serializable { boolean cdcEnabled; }

Здесь:

  • cdcWalPath — путь к папке, где складываются WAL-сегменты для CDC;
  • cdcEnabled — включает CDC для DataRegion’а;
  • walForceArchiveTimeout — таймаут принудительной архивации сегмента: даже если сегмент заполнен не полностью, по таймауту он будет архивирован и станет доступным для CDC.

С walForceArchiveTimeout есть тонкость. WAL-архив работает быстро за счёт того, что он является memory-mapped file. Это позволяет фактически писать не на диск, а в память для того, чтобы операционная система сбросила файл или мы могли сделать это вручную, когда сегмент будет заполнен.

Запись на диск — дорогая операция, в момент которой производительность ноды снижается, поэтому, с одной стороны, запись нужно делать как можно реже. С другой — CDC узнаёт об изменениях после архивации сегмента, поэтому запись нужно делать как можно чаще. Противоречие :)

Решить его можно, выбирая таймаут согласно требованиям приложения. 

Теперь самое интересное — сonsumer, слушатель, который позволяет узнать и обработать изменения:

public interface CdcConsumer { public void start(MetricRegistry mreg); public boolean onEvents(Iterator<CdcEvent> events); public void onTypes(Iterator<BinaryType> types); public void onMappings(Iterator<TypeMapping> mappings); public void stop(); }
  • start, stop — для инициализации и остановки;
  • onEvents — callback для обработки изменений: вернули true — состояние коммитнулось;
  • onTypes, onMappings — callback’и для обработки изменений метаинформации о типах.  

Что доступно в событии:

public interface CdcEvent extends Serializable { public Object key(); @Nullable public Object value(); public boolean primary(); public int partition(); public CacheEntryVersion version(); public int cacheId(); }
  • key, value — данные: value может быть null, если событие по remove’у;
  • primary — событие произошло на primary или backup;
  • partition — номер партиции, необходим для распределения нагрузки в соответствии с существующими в Ignite партициями;
  • version — версия entry;
  • cacheId — идентификатор кэша.

Таким образом, у нас есть приложение, которое в асинхронном виде получает уведомления обо всех изменениях всех данных внутри кластера Ignite. Теперь на основе этой функциональности мы можем реализовать как необходимые бизнес-функции, так и логическую репликацию.

Логическая репликация с помощью CDC

Под физической репликацией в данной статье я понимаю перенос между экземплярами БД внутреннего представления данных: страниц памяти и т. д. 

Под логической — выделение потока изменений из базы-источника и его воспроизведение в базе-приёмнике.

CDC позволяет реализовать именно логическую репликацию.

В Ignite есть поддержка двух схем: Ignite to Ignite и Ignite to Kafka.

Ignite to Ignite

ee8f89f360fecca9017ef046634b7baa.png

Внутри Ignite-cdc работает IgniteToIgniteCdcStreamer, кстати, доступный из коробки. Это consumer, который внутри себя поднимает клиентскую ноду Ignite, коннектится к кластеру-приёмнику и, получая изменения, отправляет почти обычную операцию put в кластер-приёмник.

Если кластер-источник недоступен, например из-за упавшей ноды, Ignite-cdc будет вечно ждать, пока нода не запустится. Новые данные не поступят, и процесс обработает те, которые были сгенерированы ещё живой нодой.

Если упал Ignite-cdc, то, во-первых, на всех остальных нодах он будет жив. Во-вторых, через некоторое время операционная система его перезапустит, CDC посмотрит, какие изменения он обработал, и продолжит отправлять их в соседний кластер.

Если потерялся соседний кластер или сетевая связность, Ignite-cdc также упадёт, а после перезапуска снова пойдёт в кластер-приёмник. Если кластер недоступен — падение. Если доступен — отлично, CDC начнёт отправлять в него изменения, которые были накоплены в WAL на диске. Диск является буфером изменений, которые будут копиться до тех пор, пока не получится их обработать и отправить в нужную точку.

Ignite to Kafka

3450ced372e1ac806a5a6e3f9ea93347.png

Это вариант репликации для ситуаций, когда кластеры Ignite не видят друг друга, нужно использовать Kafka в качестве транспорта, или если есть несколько читателей событий. 

Схема практически такая же: для обработки событий используется стример IgniteToKafkaCdcStreamer. Он раскладывает данные по партициям Kafka в соответствии с партициями Ignite.

На стороне приёмника есть приложение kafka-to-ignite — оно читает данные из Kafka и кладёт их в принимающий кластер Ignite.

Conflict resolver

Подошли к самому интересному: что произойдёт, если один ключ будет изменён на обоих кластерах? 

Ответ — сработает conflict resolver. Это интерфейс, который определяет, какие именно данные должны попасть в кластер. Он может взять «старое», «новое» значение или выполнить merge.

СDC-extension предоставляет дефолтную имплементацию, но можно реализовать и свою. При этом стоит отметить, что правильного решения при конфликтах изменений не существует. Не зная ничего о данных, невозможно точно определить, какое изменение правильное, а какое нет. 

Ключевые свойства дефолтной имплементации:

  1. Если изменение произошло на «локальном» кластере, оно выигрывает.
  2. Изменения с одного и того же кластера сравниваются по версии. Изменение с большей версией выигрывает. 
  3. Если указано поле для сравнения, записи сравниваются по нему. 
  4. Если всё предыдущее не сработало, новая запись отбрасывается. Данные разъезжаются, в логах warning, а вам нужно думать, что делать дальше.

Заключение

Внедрение паттерна CDC позволило добавить востребованную функциональность для реализации событийных подписок и создания реплик без влияния на производительность ядра самой базы данных.