Свойства узла процесса интеграции вида «KafkaНазначение»

Узел этого вида является клиентом-поставщиком, предназначенным для записи исходящих из сообщений в брокер Apache Kafka.

Узел может быть как связан с группой участников, так и использоваться самостоятельно. При использовании узла, связанного с группой участников, необходимо учитывать следующие особенности настройки:
  • для свойств ИмяПользователя, Пароль и Топик необходимо указать ссылку на соответствующие свойства участника;
  • при выборе протокола безопасности SaslSsl необходимо указать ссылку на свойства участника для свойств узла ПутьХранилищаДоверенныхСертификатов и ПарольХранилищаДоверенныхСертификатов;
  • для задания значения необязательного свойства ИдКлиента необходимо использовать ссылку на соответствующее свойство участника.

Свойства узла

ОбновлениеМетрик
Обработчик, внутри которого можно обновлять метрики, добавленные в проект разработчиком. Пример:
// Разработчик добавил в проект метрику «СчетчикВУзле». Тогда обновить метрику в узлах
// можно следующим образом:
метод ОбновлениеМетрик(Контекст: МойПроцессИнтеграции.КонтекстВызова, Сообщение: МойПроцессИнтеграции.Сообщение)
    пер СчетчикВУзле = МойПроцессИнтеграции.Метрики["СчетчикВУзле"]
    СчетчикВУзле.Обновить(Сообщение.УзлыПути.Текущий.Узел.Имя, 1)
;
Длительность хранения доставленных сообщений в формате ДД.ЧЧ

Определяет время, в течение которого будут храниться сообщения, доставленные в данный узел. Задается в пользовательском интерфейсе приложения, в карточке процесса интеграции, в свойствах узла. Недоступно в среде разработки.

Длительность задается в формате ДД:ЧЧ (например, 10:02 — 10 дней и 2 часа). Если указать длительность равную 00:00, то доставленные сообщения для данного узла не будут сохраняться в . Длительность хранения должна быть равна 00:00 (по умолчанию) либо больше или равна 00:01.

Хост

Параметр Kafka: bootstrap.servers. Имя хоста bootstrap-сервера Kafka. Используется для первичного подключения и получения всех доступных серверов кластера.

Порт

Параметр Kafka: bootstrap.servers. Номер порта bootstrap-сервера Kafka для подключения. По умолчанию — 9092.

ПротоколБезопасности
Параметр Kafka: security.protocol . Протокол безопасности. Возможные значения:
  • SaslPlainText (по умолчанию)
  • SaslSsl
МеханизмSasl
Параметр Kafka: sasl.mechanism. Механизм SASL. Возможные значения:
  • Plain — значение по умолчанию (будет использоваться модуль org.apache.kafka.common.security.plain.PlainLoginModule)
  • ScramSha256 (будет использоваться модуль org.apache.kafka.common.security.plain.ScramLoginModule)
  • ScramSha512 (будет использоваться модуль org.apache.kafka.common.security.plain.ScramLoginModule)
ИмяПользователя

Параметр Kafka: sasl.jaas.config. Имя пользователя. Значение параметра username для модуля механизма SASL (задается в свойстве МеханизмSasl).

Пароль

Параметр Kafka: sasl.jaas.config. Пароль пользователя. Значение параметра password для модуля механизма SASL (задается в свойстве МеханизмSasl).

ПутьХранилищаДоверенныхСертификатов
Параметр Kafka: ssl.truststore.location. Полный путь к файлу-хранилищу доверенных сертификатов в формате jks. Примеры:
  • C:\\cert\\trustore.jks (Windows)
  • C:/cert/trustore.jks (Windows)
  • /home/user/cert/trustore.jks (Linux)

Обязательный для протокола безопасности SaslSsl.

ПарольХранилищаДоверенныхСертификатов

Параметр Kafka: ssl.truststore.password. Пароль для хранилища доверенных сертификатов. Обязательный для протокола безопасности SaslSsl.

ИдКлиента

Параметр Kafka: client.id. Логический идентификатор клиента (узла или участника группы участников), который подключается к серверу Kafka.

ТаймаутЗапроса

Параметр Kafka: request.timeout.ms. Таймаут запроса, миллисекунды. По умолчанию — 30000.

Топик

Имя топика, в который выполняется передача сообщения. Обязательно для заполнения только в том случае, если не указано имя обработчика ВыборТопика. Если задан обработчик ВыборТопика, значение свойства можно оставить пустым. Если при выполнении обработчика ВыборТопика произойдет ошибка, то значение будет вычислено из свойства Топик. Если оно окажется пустым, то будет выброшено исключение обработчика и сообщение не будет доставлено.

РежимКвитирования

Параметр Kafka: acks. Режим подтверждения брокером. Возможные значения:

  • Все — сообщение считается доставленным, если все сервера в кластере отправили подтверждение о доставке. Используется по умолчанию.
  • Лидер — сообщение считается доставленным, если брокер-лидер отправил подтверждение о доставке.
  • Отсутствует не дожидается подтверждения о доставке от сервера Kafka. Сообщение считается доставленным сразу после отправки из узла.
РазмерБуфераПамяти

Параметр Kafka: buffer.memory. Размер буфера памяти, который использует узел-назначения при накоплении сообщений для отправки на сервер. Не должен быть меньше максимального размера отправляемого сообщения. По умолчанию — 33554432.

МаксимальныйРазмерЗапроса

Параметр Kafka: max.request.size. Максимальный размер запроса. По умолчанию — 1048576.

МаксимальноеВремяБлокировки

Параметр Kafka: max.block.ms. Максимальное время блокировки при инициализации запроса, миллисекунды. По умолчанию — 60000.

РазмерПакета

Параметр Kafka: batch.size. Размер пакета сообщений в запросе, байты.

Узел (выступающий в роли производителя) будет пытаться объединять записи в меньшее количество запросов, если несколько записей отправляются в один раздел. Это повышает производительность как на клиенте, так и на сервере. Данная конфигурация управляет размером пакета по умолчанию в байтах. Попытки объединить в пакет записи, превышающие этот размер, предприниматься не будут.

Запросы, отправляемые брокерам, будут содержать несколько пакетов, по одному на каждый раздел с доступными для отправки данными. При малом размере пакета пакетная обработка может снизить пропускную способность (нулевой размер пакета полностью отключает пакетную обработку). При очень большом размере пакета память может расходоваться более расточительно, так как в ожидании дополнительных записей всегда будет выделяться буфер заданного размера.

Эта настройка дает верхнюю границу размера отправляемой партии. Если для данного раздела накоплено меньшее количество байт, то в ожидании появления новых записей будет происходить задержка на время, указанное в свойстве ЗадержкаСборкиПакета. По умолчанию значение свойства ЗадержкаСборкиПакета равно 0. Это означает, что записи будут отправляться немедленно, даже если накопленный размер пакета меньше значения, указанного в свойстве РазмерПакета.

ЗадержкаСборкиПакета
Параметр Kafka: linger.ms. Время ожидания при сборке пакета сообщений, миллисекунды. По умолчанию — 0.
РежимСжатия
Параметр Kafka: compression.type. Алгоритм сжатия сообщения. Возможные значения:
  • Отсутствует (по умолчанию)
  • Gzip
  • Snappy
  • Lz4
  • Zstd
СериализаторЗначения
Параметр Kafka: value.serializer. Сериализатор значения сообщения:
  • Байты — значение по умолчанию (org.apache.kafka.common.serialization.ByteArraySerializer)
  • Строка (org.apache.kafka.common.serialization.StringSerializer с параметром кодировки UTF-8)
СериализаторКлюча

Параметр Kafka: key.serializer. Сериализатор ключа сообщения:

  • Байты — значение по умолчанию (org.apache.kafka.common.serialization.ByteArraySerializer)
  • Строка (org.apache.kafka.common.serialization.StringSerializer с параметром кодировки UTF-8)
ВыборТопика
Обработчик для выбора значения топика, в который передается сообщение. Заменяет значение из свойства Топик. Если при выполнении обработчика ВыборТопика произойдет ошибка вычисления значения, то будет выполнена попытка вычислить значение из свойства Топик. Если это значение будет так же пустым, то будет выброшено исключение обработчика.
ВыборРаздела
Обработчик для выбора значения раздела топика, в который передается сообщение.
ВыборКлюча
Обработчик для выбора значения ключа сообщения. Сообщения с одинаковым ключом передаются в одинаковый раздел топика.
ВыборМоментаОтправки
Обработчик для выбора значения момента отправки сообщения из шины в брокер. Заменяет момент отправки сообщения по умолчанию (системное время ).

Пример использования

Контекст: Осуществляется отправка сообщения из системы «1С» в Apache Kafka.

Задача: Отправить сообщение из в брокер сообщений Apache Kafka.

Решение: Сообщение приходит в от участника из группы «Отправитель» и попадает в узел Канал1СИсточник. В этом узле создается очередь, в которую полученное сообщение сохраняется. Далее используем узел «KafkaНазначение», чтобы подключиться к топику брокера Apache Kafka. На последнем шаге сообщение считывается из очереди и записывается в топик брокера Kafka.