Перейти к основному содержимому

Свойства узла процесса интеграции вида «KafkaИсточник»

Узел этого вида является клиентом-потребителем, предназначенным для чтения входящих в «1С:Шина» сообщений из брокера Apache Kafka.

Узел может быть как связан с группой участников, так и использоваться самостоятельно.

Свойства узла

АвтоУстановкаСмещения

Параметр Kafka: auto.offset.reset. Режим установки смещения при изменениях в группе потребителей. Возможные значения:

  • Ранний (earliest) — при добавлении нового потребителя будет браться самое раннее значение офсета для чтения раздела.
  • Последний (latest) — при добавлении нового потребителя будет браться последнее подтвержденное значение офсета для чтения раздела. Используется по умолчанию.
  • Отсутствует (none) — при добавлении нового потребителя будет выброшена ошибка.

ДесериализаторЗначения

Параметр Kafka: value.deserializer. Десериализатор значения сообщения. Возможные значения:

  • Байты — значение по умолчанию (org.apache.kafka.common.serialization.ByteArrayDeserializer)
  • Строка (org.apache.kafka.common.serialization.StringDeserializer с параметром кодировки UTF-8)

ДесериализаторКлюча

Параметр Kafka: key.deserializer. Десериализатор ключа сообщения. Возможные значения:

  • Байты — значение по умолчанию (org.apache.kafka.common.serialization.ByteArrayDeserializer)
  • Строка (org.apache.kafka.common.serialization.StringDeserializer с параметром кодировки UTF-8)

ИдКлиента

Параметр Kafka: client.id. Логический идентификатор клиента (узла или участника группы участников), который подключается к серверу Kafka.

Имя

Имя узла, уникальное в рамках процесса интеграции. Используется для обращения к узлу из встроенного языка (УзелСхемыИнтеграции.Имя). Должно начинаться с буквы, за которой следуют буквы, цифры или символы '_' (нижнее подчеркивание). Минимальная длина — 2 символа

ИмяПользователя

Параметр Kafka: sasl.jaas.config. Имя пользователя. Значение параметра username для модуля механизма SASL (задается в свойстве МеханизмSasl).

ИнтервалАвтоФиксации

Параметр Kafka: auto.commit.interval.ms. Автофиксация — это режим автоматического подтверждения успешного получения сообщения, в результате которого происходит изменение смещения для потребителя в разделе топика. Данное свойство позволяет задать интервал в миллисекундах, с которым будет происходить автоматическая фиксация. По умолчанию — 5000.

КоличествоПотребителей

Количество параллельных потребителей в группе. Группа потребителей — это механизм параллелизации чтения сообщений из топика. Если узел используется самостоятельно, то одна группа потребителей будет соответствовать одному узлу KafkaИсточник. Если узел связан с группой участников, то одна группа потребителей будет соответствовать одному участнику из группы.

Выбор значения зависит от количества разделов в топике, рекомендуется настраивать количество потребителей равным количеству разделов в топике. Максимальное значение — 5 потребителей. По умолчанию — 1.

МаксимальныйРазмерВыборки

Параметр Kafka: fetch.max.bytes. Максимальный размер данных в байтах, который сервер Kafka должен вернуть в ответ на запрос. Записи извлекаются потребителем партиями, и если первая партия записей в первом непустом разделе выборки больше этого значения, то эта партия записей все равно будет возвращена, чтобы потребитель мог продвигаться вперед. Таким образом, данное значение не является абсолютным максимумом. Данный параметр позволяет регулировать частоту запросов к серверу Kafka — чем меньше значение, тем большее количество запросов будет необходимо с увеличением количества данных. По умолчанию — 52428800.

МаксимальныйРазмерВыборкиИзРаздела

Параметр Kafka: max.partition.fetch.bytes. Максимальный размер данных в байтах, который ожидается в выборке из раздела. Запрос будет обработан даже в случае, если фактический размер ответа окажется больше указанного в свойстве значения. По умолчанию — 1048576.

МеханизмSasl

Параметр Kafka: sasl.mechanism. Механизм SASL. Возможные значения:

  • Plain — значение по умолчанию (будет использоваться модуль org.apache.kafka.common.security.plain.PlainLoginModule)
  • ScramSha256 (будет использоваться модуль org.apache.kafka.common.security.plain.ScramLoginModule)
  • ScramSha512 (будет использоваться модуль org.apache.kafka.common.security.plain.ScramLoginModule)

ОбновлениеМетрик

Обработчик, внутри которого можно обновлять метрики, добавленные в проект разработчиком. Пример:

// Разработчик добавил в проект метрику «СчетчикВУзле». Тогда обновить метрику в узлах
// можно следующим образом:
метод ОбновлениеМетрик(Контекст: МойПроцессИнтеграции.КонтекстВызова, Сообщение: МойПроцессИнтеграции.Сообщение)
пер СчетчикВУзле = МойПроцессИнтеграции.Метрики["СчетчикВУзле"]
СчетчикВУзле.Обновить(Сообщение.УзлыПути.Текущий.Узел.Имя, 1)
;

Описание

Произвольное описание узла для разработчика. Используется при редактировании схемы процесса интеграции. Недоступно из встроенного языка

Пароль

Параметр Kafka: sasl.jaas.config. Пароль пользователя. Значение параметра password для модуля механизма SASL (задается в свойстве МеханизмSasl).

ПарольХранилищаДоверенныхСертификатов

Параметр Kafka: ssl.truststore.password. Пароль для хранилища доверенных сертификатов. Обязательный для протокола безопасности SaslSsl.

Порт

Параметр Kafka: bootstrap.servers. Номер порта bootstrap-сервера Kafka для подключения. По умолчанию — 9092.

ПротоколБезопасности

Параметр Kafka: security.protocol. Протокол безопасности. Возможные значения:

  • SaslPlainText (по умолчанию)
  • SaslSsl

ПутьХранилищаДоверенныхСертификатов

Параметр Kafka: ssl.truststore.location. Полный путь к файлу-хранилищу доверенных сертификатов в формате jks. Примеры:

  • C:\\cert\\trustore.jks (Windows)
  • C:/cert/trustore.jks (Windows)
  • /home/user/cert/trustore.jks (Linux)

Обязательный для протокола безопасности SaslSsl.

ТаймаутЗапроса

Параметр Kafka: request.timeout.ms. Таймаут запроса, миллисекунды. По умолчанию — 30000.

ТаймаутСессии

Параметр Kafka: session.timeout.ms. Таймаут сессии, миллисекунды. По умолчанию — 10000.

Клиент периодически посылает брокеру сигналы о своей активности. Если до истечения таймаута сессии брокер не получит ни одного сигнала, то брокер удалит этого клиента из группы и инициирует ребалансировку.

Хост

Параметр Kafka: bootstrap.servers. Имя хоста bootstrap-сервера Kafka. Используется для первичного подключения и получения всех доступных серверов кластера.

Пример использования

Контекст: Осуществляется отправка сообщения из Apache Kafka в систему на платформе «1С».

Задача: Забрать сообщение из топика Apache Kafka и передать его дальше по схеме процесса интеграции.

Решение: Используем узел «KafkaИсточник», чтобы подключиться к топику Kafka. Когда в топике появляется новое сообщение, «1С:Шина» считывает его и передает дальше по схеме в узел Канал1СНазначение («ВОфис»). Таким образом, сообщение сохраняется в очередь, которая создается в «1С:Шине». Из этой очереди сообщение доставляется получателю, который входит в группу участников «Офис».