Перейти к основному содержимому

Взаимодействие с Apache Kafka

В «1С:Шине» для взаимодействия с брокером сообщений Apache Kafka предназначены узлы процесса интеграции вида KafkaИсточник и KafkaНазначение. Эти узлы позволяют настроить асинхронную интеграцию с Kafka. Внешняя информационная система может подключиться к топикам брокера и отправлять в них сообщения либо забирать сообщения из указанных топиков.

Идентификатор группы потребителей (consumer group id)

Для объединения потребителей в группы используется общий идентификатор. В рамках группы один потребитель подключается к одному разделу топика. Для каждого раздела брокер хранит последнее смещение (offset), при повторном подключении потребитель группы продолжает чтение сообщений с этого смещения. Если подключается новый потребитель (например, при увеличении значения параллельных потребителей в узле), то чтение сообщений начинается в зависимости от значения параметра АвтоУстановкаСмещения (с самого раннего или позднего значения смещения).

Одна группа потребителей соответствует одному узлу KafkaИсточник. Если узел связан с группой участников, то одна группа потребителей соответствует одному участнику из группы.

Значение идентификатора группы для узла KafkaИсточник генерируется автоматически. Значение идентификатора не является человекочитаемым, поэтому для удобства анализа логов рекомендуется заполнять идентификатор клиента (свойство ИдКлиента).

Использование заголовков

Сериализация и десериализация значений заголовков сообщений

В Kafka значения заголовков сообщения передаются как массив байтов, для работы с которым в «1С:Шине» предусмотрены операции явной и неявной сериализации значений различных типов.

Для значений заголовков входящих сообщений в узле KafkaИсточник выполняется десериализация массива байтов по следующим правилам (массив байтов приводится к ожидаемому типу):

  • значение заголовка RecipientCodeтип встроенного языка Строка;
  • значения остальных заголовковтип встроенного языка Байты (конвертация не выполняется, передается как исходный массив байтов).

После получения сообщения обработка заголовков возможна в узлах, поддерживающих обработчики встроенного языка, например, в узле Транслятор, где возможно выполнить чтение или запись заголовка.

  • Чтение значения заголовков с типом Байты и его приведение к типу встроенного языка Байты возможно с помощью методов стандартного типа Стд::ВводВывод::ЧтениеДанных.
  • Запись значений заголовков возможна с использованием следующих типов: Булево, Байты, Длительность, Момент, Число, Строка, Неопределено, ЧитаемыйМассив, ЧитаемоеСоответствие.

Для исходящих сообщений в узле KafkaНазначение выполняется сериализация значения заголовка в массив байтов.

Исходный тип встроенного языка

Правило конвертации

Булево

True (1) или False (0) как массив байтов

Байты

Массив байтов

Длительность

Длительность в миллисекундах как массив байтов

Момент, ДатаВремя и другие типы из пространства имен Стд::Время

Строковое представление даты-времени в формате ISO и кодировке UTF-8 как массив байтов

Строка

Строка в кодировке UTF-8 как массив байтов

Число

  • Обработчик встроенного языка — строковое представление числа в кодировке UTF-8 как массив байтов
  • Узлы-источники — число как массив байтов, в зависимости от размера типа числа

Неопределено

Не поддерживается, заголовок не передается в Kafka

Массив

Строковое представление массива в формате JSON и кодировке UTF-8 как массив байтов

Соответствие

Строковое представление соответствия в формате JSON и кодировке UTF-8 как массив байтов

Служебные заголовки Apache Kafka

Для получения заголовков Kafka в обработчике встроенного языка нужно использовать следующие имена параметров сообщения:

Имя параметра для доступа из встроенного языкаТип значения заголовкаОписаниеИмя свойства сообщения процесса интеграции
Kafka.TopicСтрокаИмя топика, из которого получено сообщениеТопикKafka
Kafka.KeyСтрока или БайтыКлюч входящего сообщенияКлючKafka
Kafka.OffsetЧислоНомер смещения в разделе топикаСмещениеKafka
Kafka.PartitionЧислоРаздел, из которого было получено сообщениеРазделKafka
Kafka.TimestampЧислоОтметка времени отправки входящего сообщения в формате UnixОтметкаВремениОтправкиKafka