Взаимодействие Apache Kafka с
В для взаимодействия с брокером сообщений Apache Kafka предназначены узлы процесса интеграции вида KafkaИсточник и KafkaНазначение. Эти узлы позволяют настроить асинхронную интеграцию с Kafka. Внешняя информационная система может подключиться к топикам брокера и отправлять в них сообщения либо забирать сообщения из указанных топиков.
Индентификатор группы потребителей (consumer group id)
Для объединения потребителей в группы используется общий идентификатор. В рамках группы один потребитель подключается к одному разделу топика. Для каждого раздела брокер хранит последнее смещение (offset), при повторном подключении потребитель группы продолжает чтение сообщений с этого смещения. Если подключается новый потребитель (например, при увеличении значения параллельных потребителей в узле), то чтение сообщений начинается в зависимости от значения параметра АвтоУстановкаСмещения (с самого раннего или позднего значения смещения).
Одна группа потребителей соответствует одному узлу KafkaИсточник. Если узел связан с группой участников, то одна группа потребителей соответствует одному участнику из группы.
Значение идентификатора группы для узла KafkaИсточник генерируется автоматически. Значение идентификатора не является человекочитаемым, поэтому для удобства анализа логов рекомендуется заполнять идентификатор клиента (свойство ИдКлиента).
Использование заголовков
Сериализация и десериализация значений заголовков сообщенийВ Kafka значения заголовков сообщения передаются как массив байтов, для работы с которым в предусмотрены операции явной и неявной сериализации значений различных типов.
- (конвертация не выполняется, передается как исходный массив байтов).
После получения сообщения обработка заголовков возможна в узлах, поддерживающих обработчики встроенного языка, например, в узле Транслятор, где возможно выполнить чтение или запись заголовка.
- Чтение значения заголовков с типом Байты и его приведение к типу встроенного языка Байты возможно с помощью методов стандартного типа Стд::ВводВывод::ЧтениеДанных.
- Запись значений заголовков возможна с использованием следующих типов: Булево, Байты, Длительность, Момент, Число, Строка, Неопределено, ЧитаемыйМассив, ЧитаемоеСоответствие.
Для исходящих сообщений в узле KafkaНазначение выполняется сериализация значения заголовка в массив байтов.
Исходный тип встроенного языка | Правило конвертации |
---|---|
Булево | True (1) или False (0) как массив байтов |
Байты | Массив байтов |
Длительность | Длительность в миллисекундах как массив байтов |
Момент, ДатаВремя и другие типы из пространства имен Стд::Время | Строковое представление даты-времени в формате ISO и кодировке UTF-8 как массив байтов |
Строка | Строка в кодировке UTF-8 как массив байтов |
Число |
|
Неопределено | Не поддерживается, заголовок не передается в Kafka |
Массив | Строковое представление массива в формате JSON и кодировке UTF-8 как массив байтов |
Соответствие | Строковое представление соответствия в формате JSON и кодировке UTF-8 как массив байтов |
Для получения заголовков Kafka в обработчике встроенного языка нужно использовать следующие имена параметров сообщения:
Имя параметра для доступа из встроенного языка | Тип значения заголовка | Описание | Имя свойства сообщения процесса интеграции |
---|---|---|---|
Kafka.Topic |
Строка | Имя топика, из которого получено сообщение | ТопикKafka |
Kafka.Key |
Строка или Байты | Ключ входящего сообщения | КлючKafka |
Kafka.Offset |
Число | Номер смещения в разделе топика | СмещениеKafka |
Kafka.Partition |
Число | Раздел, из которого было получено сообщение | РазделKafka |
Kafka.Timestamp |
Число | Отметка времени отправки входящего сообщения в формате Unix | ОтметкаВремениОтправкиKafka |