Свойства узла процесса интеграции вида «KafkaИсточник»

Узел этого вида является клиентом-потребителем, предназначенным для чтения входящих в сообщений из брокера Apache Kafka.

Узел может быть как связан с группой участников, так и использоваться самостоятельно. При использовании узла, связанного с группой участников, необходимо учитывать следующие особенности настройки:
  • для свойств ИмяПользователя, Пароль и Топик необходимо указать ссылку на соответствующие свойства участника;
  • при выборе протокола безопасности SaslSsl необходимо указать ссылку на свойства участника для свойств узла ПутьХранилищаДоверенныхСертификатов и ПарольХранилищаДоверенныхСертификатов;
  • для задания значения необязательного свойства ИдКлиента необходимо использовать ссылку на соответствующее свойство участника.

Свойства узла

ОбновлениеМетрик
Обработчик, внутри которого можно обновлять метрики, добавленные в проект разработчиком. Пример:
// Разработчик добавил в проект метрику «СчетчикВУзле». Тогда обновить метрику в узлах
// можно следующим образом:
метод ОбновлениеМетрик(Контекст: МойПроцессИнтеграции.КонтекстВызова, Сообщение: МойПроцессИнтеграции.Сообщение)
    пер СчетчикВУзле = МойПроцессИнтеграции.Метрики["СчетчикВУзле"]
    СчетчикВУзле.Обновить(Сообщение.УзлыПути.Текущий.Узел.Имя, 1)
;
Хост

Параметр Kafka: bootstrap.servers. Имя хоста bootstrap-сервера Kafka. Используется для первичного подключения и получения всех доступных серверов кластера.

Порт

Параметр Kafka: bootstrap.servers. Номер порта bootstrap-сервера Kafka для подключения. По умолчанию — 9092.

ПротоколБезопасности
Параметр Kafka: security.protocol . Протокол безопасности. Возможные значения:
  • SaslPlainText (по умолчанию)
  • SaslSsl
МеханизмSasl
Параметр Kafka: sasl.mechanism. Механизм SASL. Возможные значения:
  • Plain — значение по умолчанию (будет использоваться модуль org.apache.kafka.common.security.plain.PlainLoginModule)
  • ScramSha256 (будет использоваться модуль org.apache.kafka.common.security.plain.ScramLoginModule)
  • ScramSha512 (будет использоваться модуль org.apache.kafka.common.security.plain.ScramLoginModule)
ИмяПользователя

Параметр Kafka: sasl.jaas.config. Имя пользователя. Значение параметра username для модуля механизма SASL (задается в свойстве МеханизмSasl).

Пароль

Параметр Kafka: sasl.jaas.config. Пароль пользователя. Значение параметра password для модуля механизма SASL (задается в свойстве МеханизмSasl).

ПутьХранилищаДоверенныхСертификатов
Параметр Kafka: ssl.truststore.location. Полный путь к файлу-хранилищу доверенных сертификатов в формате jks. Примеры:
  • C:\\cert\\trustore.jks (Windows)
  • C:/cert/trustore.jks (Windows)
  • /home/user/cert/trustore.jks (Linux)

Обязательный для протокола безопасности SaslSsl.

ПарольХранилищаДоверенныхСертификатов

Параметр Kafka: ssl.truststore.password. Пароль для хранилища доверенных сертификатов. Обязательный для протокола безопасности SaslSsl.

ИдКлиента

Параметр Kafka: client.id. Логический идентификатор клиента (узла или участника группы участников), который подключается к серверу Kafka.

ТаймаутЗапроса

Параметр Kafka: request.timeout.ms. Таймаут запроса, миллисекунды. По умолчанию — 30000.

АвтоУстановкаСмещения
Параметр Kafka: auto.offset.reset. Режим установки смещения при изменениях в группе потребителей. Возможные значения:
  • Ранний (earliest) — при добавлении нового потребителя будет браться самое раннее значение офсета для чтения раздела.
  • Последний (latest) — при добавлении нового потребителя будет браться последнее подтвержденное значение офсета для чтения раздела. Используется по умолчанию.
  • Отсутствует (none) — при добавлении нового потребителя будет выброшена ошибка.
КоличествоПотребителей

Количество параллельных потребителей в группе. Группа потребителей — это механизм параллелизации чтения сообщений из топика. Если узел используется самостоятельно, то одна группа потребителей будет соответствовать одному узлу KafkaИсточник. Если узел связан с группой участников, то одна группа потребителей будет соответствовать одному участнику из группы.

Выбор значения зависит от количества разделов в топике, рекомендуется настраивать количество потребителей равным количеству разделов в топике. Максимальное значение — 5 потребителей. По умолчанию — 1.

ИнтервалАвтоФиксации

Параметр Kafka: auto.commit.interval.ms. Автофиксация — это режим автоматического подтверждения успешного получения сообщения, в результате которого происходит изменение смещения для потребителя в разделе топика. Данное свойство позволяет задать интервал в миллисекундах, с которым будет происходить автоматическая фиксация. По умолчанию — 5000.

ТаймаутСессии

Параметр Kafka: session.timeout.ms. Таймаут сессии, миллисекунды. По умолчанию — 10000.

Клиент периодически посылает брокеру сигналы о своей активности. Если до истечения таймаута сессии брокер не получит ни одного сигнала, то брокер удалит этого клиента из группы и инициирует ребалансировку.

МаксимальныйРазмерВыборки

Параметр Kafka: fetch.max.bytes. Максимальный размер данных в байтах, который сервер Kafka должен вернуть в ответ на запрос. Записи извлекаются потребителем партиями, и если первая партия записей в первом непустом разделе выборки больше этого значения, то эта партия записей все равно будет возвращена, чтобы потребитель мог продвигаться вперед. Таким образом, данное значение не является абсолютным максимумом. Данный параметр позволяет регулировать частоту запросов к серверу Kafka — чем меньше значение, тем большее количество запросов будет необходимо с увеличением количества данных. По умолчанию — 52428800.

МаксимальныйРазмерВыборкиИзРаздела

Параметр Kafka: max.partition.fetch.bytes. Максимальный размер данных в байтах, который ожидается в выборке из раздела. Запрос будет обработан даже в случае, если фактический размер ответа окажется больше указанного в свойстве значения. По умолчанию — 1048576.

ДесериализаторЗначения

Параметр Kafka: value.deserializer. Десериализатор значения сообщения. Возможные значения:

  • Байты — значение по умолчанию (org.apache.kafka.common.serialization.ByteArrayDeserializer)
  • Строка (org.apache.kafka.common.serialization.StringDeserializer с параметром кодировки UTF-8)
ДесериализаторКлюча

Параметр Kafka: key.deserializer. Десериализатор ключа сообщения. Возможные значения:

  • Байты — значение по умолчанию (org.apache.kafka.common.serialization.ByteArrayDeserializer)
  • Строка (org.apache.kafka.common.serialization.StringDeserializer с параметром кодировки UTF-8)

Пример использования

Контекст: Осуществляется отправка сообщения из Apache Kafka в систему на платформе «1С».

Задача: Забрать сообщение из топика Apache Kafka и передать его дальше по схеме процесса интеграции.

Решение: Используем узел «KafkaИсточник», чтобы подключиться к топику Kafka. Когда в топике появляется новое сообщение, считывает его и передает дальше по схеме в узел Канал1СНазначение («ВОфис»). Таким образом, сообщение сохраняется в очередь, которая создается в . Из этой очереди сообщение доставляется получателю, который входит в группу участников «Офис».