Взаимодействие с Apache Kafka

В для взаимодействия с брокером сообщений Apache Kafka предназначены узлы процесса интеграции вида KafkaИсточник и KafkaНазначение. Эти узлы позволяют настроить асинхронную интеграцию с Kafka. Внешняя информационная система может подключиться к топикам брокера и отправлять в них сообщения либо забирать сообщения из указанных топиков.

Индентификатор группы потребителей (consumer group id)

Для объединения потребителей в группы используется общий идентификатор. В рамках группы один потребитель подключается к одному разделу топика. Для каждого раздела брокер хранит последнее смещение (offset), при повторном подключении потребитель группы продолжает чтение сообщений с этого смещения. Если подключается новый потребитель (например, при увеличении значения параллельных потребителей в узле), то чтение сообщений начинается в зависимости от значения параметра АвтоУстановкаСмещения (с самого раннего или позднего значения смещения).

Одна группа потребителей соответствует одному узлу KafkaИсточник. Если узел связан с группой участников, то одна группа потребителей соответствует одному участнику из группы.

Значение идентификатора группы для узла KafkaИсточник генерируется автоматически. Значение идентификатора не является человекочитаемым, поэтому для удобства анализа логов рекомендуется заполнять идентификатор клиента (свойство ИдКлиента).

Использование заголовков

Сериализация и десериализация значений заголовков сообщений

В Kafka значения заголовков сообщения передаются как массив байтов, для работы с которым в предусмотрены операции явной и неявной сериализации значений различных типов.

Для значений заголовков входящих сообщений в узле KafkaИсточник выполняется десериализация массива байтов по следующим правилам (массив байтов приводится к ожидаемому типу):
  • Значение заголовка RecipientCode > тип встроенного языка Строка
  • Значения остальных заголовков > тип встроенного языка Байты (конвертация не выполняется, передается как исходный массив байтов).

После получения сообщения обработка заголовков возможна в узлах, поддерживающих обработчики встроенного языка, например, в узле Транслятор, где возможно выполнить чтение или запись заголовка.

  • Чтение значения заголовков с типом Байты и его приведение к типу встроенного языка Байты возможно с помощью методов стандартного типа Стд::ВводВывод::ЧтениеДанных.
  • Запись значений заголовков возможна с использованием следующих типов: Булево, Байты, Длительность, Момент, Число, Строка, Неопределено, ЧитаемыйМассив, ЧитаемоеСоответствие.

Для исходящих сообщений в узле KafkaНазначение выполняется сериализация значения заголовка в массив байтов.

Исходный тип встроенного языка Правило конвертации
Булево True (1) или False (0) как массив байтов
Байты Массив байтов
Длительность Длительность в миллисекундах как массив байтов
Момент, ДатаВремя и другие типы из пространства имен Стд::Время Строковое представление даты-времени в формате ISO и кодировке UTF-8 как массив байтов
Строка Строка в кодировке UTF-8 как массив байтов
Число
  • Обработчик встроенного языка — строковое представление числа в кодировке UTF-8 как массив байтов
  • Узлы-источники — число как массив байтов, в зависимости от размера типа числа
Неопределено Не поддерживается, заголовок не передается в Kafka
Массив Строковое представление массива в формате JSON и кодировке UTF-8 как массив байтов
Соответствие Строковое представление соответствия в формате JSON и кодировке UTF-8 как массив байтов
Служебные заголовки Apache Kafka

Для получения заголовков Kafka в обработчике встроенного языка нужно использовать следующие имена параметров сообщения:

Имя параметра для доступа из встроенного языка Тип значения заголовка Описание Имя свойства сообщения процесса интеграции
Kafka.Topic Строка Имя топика, из которого получено сообщение ТопикKafka
Kafka.Key Строка или Байты Ключ входящего сообщения КлючKafka
Kafka.Offset Число Номер смещения в разделе топика СмещениеKafka
Kafka.Partition Число Раздел, из которого было получено сообщение РазделKafka
Kafka.Timestamp Число Отметка времени отправки входящего сообщения в формате Unix ОтметкаВремениОтправкиKafka