Перейти к содержанию

Обработчик консьюмера

Процедура, которая обрабатывает тело сообщения, полученного из Kafka.

Имя модуля и метода указываются в табличной части консьюмера — в полях Имя модуля десериализации и Имя метода десериализации.

Сигнатура

Процедура Десериализация(ТелоСообщения, Свойства, ТекстЖурнала, Отказ) Экспорт

Параметры

Параметр Тип Описание
ТелоСообщения Строка, ДвоичныеДанные Тело сообщения из Kafka. Тип зависит от настройки Двоичные данные в консьюмере
Свойства Структура Дополнительные свойства сообщения — см. Поля структуры Свойства
ТекстЖурнала Строка Текст, который будет записан в журнал регистрации
Отказ Булево Установите в Истина для отмены обработки сообщения

Поля структуры Свойства

Поле Тип Описание
Консьюмер СправочникСсылка.кфкКонсьюмеры Консьюмер, получивший сообщение
Топик Строка Имя топика
Ключ Строка Ключ (идентификатор) сообщения, используемый для партиционирования
Раздел Строка Номер раздела (partition)
Смещение Строка Смещение (offset) сообщения в разделе
Заголовки Соответствие Из Строка Заголовки сообщения
ДатаЗагрузки Дата Дата получения сообщения системой
ДатаОбработки Дата Дата последней обработки сообщения

Минимальный пример

Процедура Десериализация(ТелоСообщения, Свойства, ТекстЖурнала, Отказ) Экспорт

    Данные = ОбщегоНазначения.JSONВЗначение(ТелоСообщения, "dateTime", Ложь);

    Элемент = Справочники.Номенклатура.НайтиПоКоду(Данные.code);
    Если Элемент.Пустая() Тогда
        Объект = Справочники.Номенклатура.СоздатьЭлемент();
    Иначе
        Объект = Элемент.ПолучитьОбъект();
    КонецЕсли;

    Объект.Наименование = Данные.name;
    Объект.Код          = Данные.code;

    // ОБЯЗАТЕЛЬНО — иначе запись снова попадёт в очередь исходящих
    кфкИнтеграция.Отключить(Объект);

    Объект.ОбменДанными.Загрузка = Истина;
    Объект.Записать();

КонецПроцедуры

Ключевые правила

Идемпотентность обязательна

Повторный вызов

Обработчик может быть вызван повторно — при перезапуске, ошибке или ручном возврате сообщения в очередь. Проектируйте обработчик так, чтобы повторный вызов с тем же сообщением не создавал дубликаты и не вызывал побочных эффектов.

Рекомендации:

  • используйте ключ сообщения (Свойства.Ключ) или поле из тела для идентификации уже обработанных записей;
  • проверяйте существование объекта по ссылке/ключу перед созданием нового;
  • разделяйте обработчики по типам сообщений или топикам — проще гарантировать идемпотентность.

Отключение и ОбменДанными.Загрузка

Перед Записать() обязательно:

кфкИнтеграция.Отключить(Объект);
Объект.ОбменДанными.Загрузка = Истина;
Объект.Записать();
  • Отключить() — блокирует подписку адаптера на событие записи, иначе запись снова попадёт в очередь исходящих.
  • ОбменДанными.Загрузка = Истина — стандартный механизм 1С для подавления бизнес-логики при загрузке данных из внешней системы.

Отмена обработки

Если Не ЗначениеЗаполнено(Данные.ref) Тогда
    ТекстЖурнала = СтрШаблон(
        "Отсутствует поле 'ref'. Топик: %1, смещение: %2",
        Свойства.Топик,
        Свойства.Смещение
    );
    Отказ = Истина;
    Возврат;
КонецЕсли;

При Отказ = Истина сообщение получает статус «Ошибка обработки». Текст из ТекстЖурнала сохраняется и доступен через Kafka / Входящие сообщения.

Автоповтор не выполняется

При статусе ОшибкаОбработки автоматический повтор не запускается. Повторную обработку инициирует администратор из РС «Входящие сообщения» — кнопка Вернуть в очередь.


Смотрите также