Обработчик консьюмера¶
Процедура, которая обрабатывает тело сообщения, полученного из Kafka.
Имя модуля и метода указываются в табличной части консьюмера — в полях Имя модуля десериализации и Имя метода десериализации.
Сигнатура¶
Параметры¶
| Параметр | Тип | Описание |
|---|---|---|
ТелоСообщения | Строка, ДвоичныеДанные | Тело сообщения из Kafka. Тип зависит от настройки Двоичные данные в консьюмере |
Свойства | Структура | Дополнительные свойства сообщения — см. Поля структуры Свойства |
ТекстЖурнала | Строка | Текст, который будет записан в журнал регистрации |
Отказ | Булево | Установите в Истина для отмены обработки сообщения |
Поля структуры Свойства¶
| Поле | Тип | Описание |
|---|---|---|
Консьюмер | СправочникСсылка.кфкКонсьюмеры | Консьюмер, получивший сообщение |
Топик | Строка | Имя топика |
Ключ | Строка | Ключ (идентификатор) сообщения, используемый для партиционирования |
Раздел | Строка | Номер раздела (partition) |
Смещение | Строка | Смещение (offset) сообщения в разделе |
Заголовки | Соответствие Из Строка | Заголовки сообщения |
ДатаЗагрузки | Дата | Дата получения сообщения системой |
ДатаОбработки | Дата | Дата последней обработки сообщения |
Минимальный пример¶
Процедура Десериализация(ТелоСообщения, Свойства, ТекстЖурнала, Отказ) Экспорт
Данные = ОбщегоНазначения.JSONВЗначение(ТелоСообщения, "dateTime", Ложь);
Элемент = Справочники.Номенклатура.НайтиПоКоду(Данные.code);
Если Элемент.Пустая() Тогда
Объект = Справочники.Номенклатура.СоздатьЭлемент();
Иначе
Объект = Элемент.ПолучитьОбъект();
КонецЕсли;
Объект.Наименование = Данные.name;
Объект.Код = Данные.code;
// ОБЯЗАТЕЛЬНО — иначе запись снова попадёт в очередь исходящих
кфкИнтеграция.Отключить(Объект);
Объект.ОбменДанными.Загрузка = Истина;
Объект.Записать();
КонецПроцедуры
Ключевые правила¶
Идемпотентность обязательна¶
Повторный вызов
Обработчик может быть вызван повторно — при перезапуске, ошибке или ручном возврате сообщения в очередь. Проектируйте обработчик так, чтобы повторный вызов с тем же сообщением не создавал дубликаты и не вызывал побочных эффектов.
Рекомендации:
- используйте ключ сообщения (
Свойства.Ключ) или поле из тела для идентификации уже обработанных записей; - проверяйте существование объекта по ссылке/ключу перед созданием нового;
- разделяйте обработчики по типам сообщений или топикам — проще гарантировать идемпотентность.
Отключение и ОбменДанными.Загрузка¶
Перед Записать() обязательно:
Отключить()— блокирует подписку адаптера на событие записи, иначе запись снова попадёт в очередь исходящих.ОбменДанными.Загрузка = Истина— стандартный механизм 1С для подавления бизнес-логики при загрузке данных из внешней системы.
Отмена обработки¶
Если Не ЗначениеЗаполнено(Данные.ref) Тогда
ТекстЖурнала = СтрШаблон(
"Отсутствует поле 'ref'. Топик: %1, смещение: %2",
Свойства.Топик,
Свойства.Смещение
);
Отказ = Истина;
Возврат;
КонецЕсли;
При Отказ = Истина сообщение получает статус «Ошибка обработки». Текст из ТекстЖурнала сохраняется и доступен через Kafka / Входящие сообщения.
Автоповтор не выполняется
При статусе ОшибкаОбработки автоматический повтор не запускается. Повторную обработку инициирует администратор из РС «Входящие сообщения» — кнопка Вернуть в очередь.