Взаимодействие с Apache Kafka
В «1С:Шине» для взаимодействия с брокером сообщений Apache Kafka предназначены узлы процесса интеграции вида KafkaИсточник и KafkaНазначение. Эти узлы позволяют настроить асинхронную интеграцию с Kafka. Внешняя информационная система может подключиться к топикам брокера и отправлять в них сообщения либо забирать сообщения из указанных топиков.
Идентификатор группы потребителей (consumer group id)
Дл я объединения потребителей в группы используется общий идентификатор. В рамках группы один потребитель подключается к одному разделу топика. Для каждого раздела брокер хранит последнее смещение (offset), при повторном подключении потребитель группы продолжает чтение сообщений с этого смещения. Если подключается новый потребитель (например, при увеличении значения параллельных потребителей в узле), то чтение сообщений начинается в зависимости от значения параметра АвтоУстановкаСмещения (с самого раннего или позднего значения смещения).
Одна группа потребителей соответствует одному узлу KafkaИсточник. Если узел связан с группой участников, то одна группа потребителей соответствует одному участнику из группы.
Значение идентификатора группы для узла KafkaИсточник генерируется автоматически. Значение идентификатора не является человекочитаемым, поэтому для удобства анализа логов рекомендуется заполнять идентификатор клиента (свойство ИдКлиента).
Использование заг оловков
Сериализация и десериализация значений заголовков сообщений
В Kafka значения заголовков сообщения передаются как массив байтов, для работы с которым в «1С:Шине» предусмотрены операции явной и неявной сериализации значений различных типов.
Для значений заголовков входящих сообщений в узле KafkaИсточник выполняется десериализация массива байтов по следующим правилам (массив байтов приводится к ожидаемому типу):
- значение заголовка RecipientCode ⟶ тип встроенного языка Строка;
- значения осталь ных заголовков ⟶ тип встроенного языка Байты (конвертация не выполняется, передается как исходный массив байтов).
После получения сообщения обработка заголовков возможна в узлах, поддерживающих обработчики встроенного языка, например, в узле Транслятор, где возможно выполнить чтение или запись заголовка.
- Чтение значения заголовков с типом
Байты
и его приведение к типу встроенного языкаБайты
возможно с помощью методов стандартного типаСтд::ВводВывод::ЧтениеДанных
. - Запись значений заголовков возможна с использованием следующих типов:
Булево
,Байты
,Длительность
,Момент
,Число
,Строка
,Неопределено
,ЧитаемыйМассив
,ЧитаемоеСоответствие
.
Для исходящих сообщений в узле KafkaНазначение выполняется сериализация значения заголовка в массив байтов.
Исходный тип встроенного языка | Правило конвертации |
---|---|
| True (1) или False (0) как массив байтов |
| Массив байтов |
| Длительность в миллисекундах как массив байтов |
| Строковое представление даты-времени в формате ISO и кодировке UTF-8 как массив байтов |
|
|
|
|
| Не поддерживается, заголовок не передается в Kafka |
| Строковое представление массива в формате JSON и кодировке UTF-8 как массив байтов |
| Строковое представление соответствия в формате JSON и кодировке UTF-8 как массив байтов |
Служебные заголовки Apache Kafka
Для получения заголовков Kafka в обработчике встроенного языка нужно использовать следующие имена параметров сообщения:
Имя параметра для доступа из встроенного языка | Тип значения заголовка | Описание | Имя свойства сообщения процесса интеграции |
---|---|---|---|
Kafka.Topic | Строка | Имя топика, из которого получено сообщение | ТопикKafka |
Kafka.Key | Строка и ли Байты | Ключ входящего сообщения | КлючKafka |
Kafka.Offset | Число | Номер смещения в разделе топика | СмещениеKafka |
Kafka.Partition | Число | Раздел, из которого было получено сообщение | РазделKafka |
Kafka.Timestamp | Число | Отметка времени отправки входящего сообщения в формате Unix | ОтметкаВремениОтправкиKafka |