Разделение табличных данных на отдельные сообщения¶
Сценарий. Преобразовать табличную модель данных (табличная часть справочника, набор записей регистра) в объектную — одна строка = одно сообщение. Получатель работает с потоком независимых событий вместо одного большого документа.
Обратный паттерн
Если нужно наоборот — упаковать все строки в одно сообщение — см. Документ с табличной частью или Регистр по регистратору.
Общая идея¶
- В продюсере настраиваются два ключа регистрации (
Тип объект):- полное имя метаданных (
Справочник.…,РегистрНакопления.…) — срабатывает подписка при записи объекта, - произвольный ключ (
Справочник.….ТабличнаяЧасть,РегистрНакопления.….Запись) — используется только из API.
- полное имя метаданных (
- Обработчик «верхнего» ключа (по подписке) не формирует общее сообщение сам, а ставит строки в очередь под произвольным ключом.
- Дальше есть выбор:
- передать в очередь массив структур — адаптер вызовет обработчик строки на каждую,
- передать массив готовых сообщений (
ПараметрыСообщения) — сериализация пропускается, второй обработчик не нужен.
Произвольный ключ — намеренно
Ключ вида Справочник.…ТабличнаяЧасть не совпадает с именем метаданных 1С, поэтому подписка на запись по нему не сработает. Строки попадут в топик только через явный вызов ПоместитьВОчередьИсходящих(..., "…ТабличнаяЧасть"). См. Тип объект — ключ регистрации.
Сценарий 1. Строки ТЧ справочника — каждая отдельным сообщением¶
Настройка продюсера¶
В табличной части «Объекты метаданных»:
| Тип объект | Имя топика | Тип сериализации | Имя метода |
|---|---|---|---|
Справочник.кфк_т_ТестовыйСправочник | — (не указывается) | Произвольный обработчик | СериализацияШапки |
Справочник.кфк_т_ТестовыйСправочник.ТабличнаяЧасть | catalog.rows | Произвольный обработчик | СериализацияСтроки |
Почему у шапки нет топика
Обработчик шапки вызывает Отказ = Истина — сама запись справочника в Kafka не уходит, она только триггерит регистрацию строк под произвольным ключом. Значит, топик для этого типа объекта не требуется.
Если позже решите отправлять и шапку (см. ниже), укажите для неё отдельный топик.
Обработчик шапки — ставит строки в очередь¶
Функция СериализацияШапки(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт
// Вариант 1. Шапку не отправляем, только строки.
Отказ = Истина;
// Преобразуем ТЧ в массив структур — по одной структуре на строку.
Строки = Новый Массив;
Для Каждого СтрокаТЧ Из Объект.Товары Цикл
Запись = Новый Структура;
Запись.Вставить("ref", Объект.Ссылка);
Запись.Вставить("lineNo", СтрокаТЧ.НомерСтроки);
Запись.Вставить("item", СтрокаТЧ.Номенклатура);
Запись.Вставить("qty", СтрокаТЧ.Количество);
Строки.Добавить(Запись);
КонецЦикла;
// Регистрируем массив под произвольным ключом — сработает СериализацияСтроки.
кфкИнтеграция.ПоместитьВОчередьИсходящих(
Строки,
"Справочник.кфк_т_ТестовыйСправочник.ТабличнаяЧасть",
Свойства.Продюсер);
Возврат Неопределено;
КонецФункции
Обработчик строки — итоговое сообщение¶
Функция СериализацияСтроки(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт
Результат = кфкИнтеграция.ПараметрыСообщения();
Результат.Данные = Объект; // структура, подготовленная в СериализацияШапки
// Ключ партиционирования — по владельцу строки, чтобы строки одного
// справочника попадали в одну партицию и сохраняли порядок.
Результат.Ключ = Строка(Объект.ref.УникальныйИдентификатор());
Возврат Результат;
КонецФункции
Вариант без второго обработчика¶
Если сериализация строки тривиальна, можно сразу формировать готовые сообщения в обработчике шапки — тогда второй обработчик не нужен:
Функция СериализацияШапки(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт
Отказ = Истина;
Сообщения = Новый Массив;
Для Каждого СтрокаТЧ Из Объект.Товары Цикл
Данные = Новый Структура;
Данные.Вставить("ref", Объект.Ссылка);
Данные.Вставить("lineNo", СтрокаТЧ.НомерСтроки);
Данные.Вставить("item", СтрокаТЧ.Номенклатура);
Данные.Вставить("qty", СтрокаТЧ.Количество);
// Готовое сообщение — этап сериализации пропускается.
Сообщение = кфкИнтеграция.ПараметрыСообщения(
Данные,
Строка(Объект.Ссылка.УникальныйИдентификатор()) + ":" + СтрокаТЧ.НомерСтроки);
Сообщения.Добавить(Сообщение);
КонецЦикла;
кфкИнтеграция.ПоместитьВОчередьИсходящих(
Сообщения,
"Справочник.кфк_т_ТестовыйСправочник.ТабличнаяЧасть",
Свойства.Продюсер);
Возврат Неопределено;
КонецФункции
Когда отправлять и шапку, и строки
Если принимающая сторона хочет получать и шапку справочника, и отдельные строки — не устанавливайте Отказ = Истина, а верните заполненное ПараметрыСообщения для шапки. Строки всё равно будут поставлены в очередь вызовом ПоместитьВОчередьИсходящих и уйдут в catalog.rows, а шапка — в catalog.header.
Сценарий 2. Движения регистра по регистратору — по одной записи в сообщении¶
По умолчанию для регистров с подчинением регистратору адаптер отдаёт все движения одним сообщением (см. Регистр по регистратору). Если нужен режим «одна запись = одно сообщение», применяется тот же приём.
Настройка продюсера¶
| Тип объект | Имя топика | Тип сериализации | Имя метода |
|---|---|---|---|
РегистрНакопления.ТоварыНаСкладах | — (не указывается) | Произвольный обработчик | СериализацияДвижений |
РегистрНакопления.ТоварыНаСкладах.Запись | stock.movements | Произвольный обработчик | СериализацияЗаписи |
Как и в первом сценарии, у «регистраторного» типа топик не нужен: обработчик ставит Отказ = Истина и лишь порождает сообщения для ключа …Запись.
Обработчик регистратора — разбивает набор на записи¶
Функция СериализацияДвижений(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт
Отказ = Истина; // пакетное сообщение не формируем
Запрос = Новый Запрос;
Запрос.Текст =
"ВЫБРАТЬ
| Т.Период КАК period,
| Т.Регистратор КАК recorder,
| Т.Номенклатура КАК item,
| Т.Количество КАК qty
|ИЗ
| РегистрНакопления.ТоварыНаСкладах КАК Т
|ГДЕ
| Т.Регистратор = &Регистратор";
Запрос.УстановитьПараметр("Регистратор", Объект.Регистратор);
Выборка = Запрос.Выполнить().Выбрать();
Сообщения = Новый Массив;
Пока Выборка.Следующий() Цикл
Данные = Новый Структура("period, recorder, item, qty");
ЗаполнитьЗначенияСвойств(Данные, Выборка);
Сообщение = кфкИнтеграция.ПараметрыСообщения(
Данные,
Строка(Объект.Регистратор.УникальныйИдентификатор()) + ":" + Выборка.item.УникальныйИдентификатор());
Сообщения.Добавить(Сообщение);
КонецЦикла;
кфкИнтеграция.ПоместитьВОчередьИсходящих(
Сообщения,
"РегистрНакопления.ТоварыНаСкладах.Запись",
Свойства.Продюсер);
Возврат Неопределено;
КонецФункции
Зачем ключ с регистратором
Ключ = регистратор:item держит все движения одного документа и одной номенклатуры в одной партиции — получатель увидит их в порядке записи. Если порядок не важен, ключ можно опустить.
Обработчик записи¶
Нужен, только если хотите дополнительно обогатить запись или изменить формат. В простейшем случае (сообщения уже готовы в первом обработчике) этот обработчик не вызывается.
Ограничения¶
Объёмы
- Массив, передаваемый в
ПоместитьВОчередьИсходящих, обрабатывается порционно — можно без опасения передавать большие коллекции (десятки и сотни тысяч элементов). - Каждое отдельное сообщение — до ~10 МБ.
Смотрите также¶
- Документ с табличной частью — обратный паттерн: всё в одном сообщении.
- Регистр по регистратору — пакетная модель для движений.
- Принудительная регистрация — другие способы постановки в очередь из кода.
- Произвольные события — детальнее про
ТипОбъекти ключи регистрации.