Перейти к содержанию

Разделение табличных данных на отдельные сообщения

Сценарий. Преобразовать табличную модель данных (табличная часть справочника, набор записей регистра) в объектную — одна строка = одно сообщение. Получатель работает с потоком независимых событий вместо одного большого документа.

Обратный паттерн

Если нужно наоборот — упаковать все строки в одно сообщение — см. Документ с табличной частью или Регистр по регистратору.

Общая идея

  1. В продюсере настраиваются два ключа регистрации (Тип объект):
    • полное имя метаданных (Справочник.…, РегистрНакопления.…) — срабатывает подписка при записи объекта,
    • произвольный ключ (Справочник.….ТабличнаяЧасть, РегистрНакопления.….Запись) — используется только из API.
  2. Обработчик «верхнего» ключа (по подписке) не формирует общее сообщение сам, а ставит строки в очередь под произвольным ключом.
  3. Дальше есть выбор:
    • передать в очередь массив структур — адаптер вызовет обработчик строки на каждую,
    • передать массив готовых сообщений (ПараметрыСообщения) — сериализация пропускается, второй обработчик не нужен.

Произвольный ключ — намеренно

Ключ вида Справочник.…ТабличнаяЧасть не совпадает с именем метаданных 1С, поэтому подписка на запись по нему не сработает. Строки попадут в топик только через явный вызов ПоместитьВОчередьИсходящих(..., "…ТабличнаяЧасть"). См. Тип объект — ключ регистрации.


Сценарий 1. Строки ТЧ справочника — каждая отдельным сообщением

Настройка продюсера

В табличной части «Объекты метаданных»:

Тип объект Имя топика Тип сериализации Имя метода
Справочник.кфк_т_ТестовыйСправочник (не указывается) Произвольный обработчик СериализацияШапки
Справочник.кфк_т_ТестовыйСправочник.ТабличнаяЧасть catalog.rows Произвольный обработчик СериализацияСтроки

Почему у шапки нет топика

Обработчик шапки вызывает Отказ = Истинасама запись справочника в Kafka не уходит, она только триггерит регистрацию строк под произвольным ключом. Значит, топик для этого типа объекта не требуется.

Если позже решите отправлять и шапку (см. ниже), укажите для неё отдельный топик.

Обработчик шапки — ставит строки в очередь

Функция СериализацияШапки(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт

    // Вариант 1. Шапку не отправляем, только строки.
    Отказ = Истина;

    // Преобразуем ТЧ в массив структур — по одной структуре на строку.
    Строки = Новый Массив;
    Для Каждого СтрокаТЧ Из Объект.Товары Цикл
        Запись = Новый Структура;
        Запись.Вставить("ref",    Объект.Ссылка);
        Запись.Вставить("lineNo", СтрокаТЧ.НомерСтроки);
        Запись.Вставить("item",   СтрокаТЧ.Номенклатура);
        Запись.Вставить("qty",    СтрокаТЧ.Количество);
        Строки.Добавить(Запись);
    КонецЦикла;

    // Регистрируем массив под произвольным ключом — сработает СериализацияСтроки.
    кфкИнтеграция.ПоместитьВОчередьИсходящих(
        Строки,
        "Справочник.кфк_т_ТестовыйСправочник.ТабличнаяЧасть",
        Свойства.Продюсер);

    Возврат Неопределено;

КонецФункции

Обработчик строки — итоговое сообщение

Функция СериализацияСтроки(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт

    Результат = кфкИнтеграция.ПараметрыСообщения();
    Результат.Данные = Объект; // структура, подготовленная в СериализацияШапки

    // Ключ партиционирования — по владельцу строки, чтобы строки одного
    // справочника попадали в одну партицию и сохраняли порядок.
    Результат.Ключ = Строка(Объект.ref.УникальныйИдентификатор());

    Возврат Результат;

КонецФункции

Вариант без второго обработчика

Если сериализация строки тривиальна, можно сразу формировать готовые сообщения в обработчике шапки — тогда второй обработчик не нужен:

Функция СериализацияШапки(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт

    Отказ = Истина;

    Сообщения = Новый Массив;
    Для Каждого СтрокаТЧ Из Объект.Товары Цикл
        Данные = Новый Структура;
        Данные.Вставить("ref",    Объект.Ссылка);
        Данные.Вставить("lineNo", СтрокаТЧ.НомерСтроки);
        Данные.Вставить("item",   СтрокаТЧ.Номенклатура);
        Данные.Вставить("qty",    СтрокаТЧ.Количество);

        // Готовое сообщение — этап сериализации пропускается.
        Сообщение = кфкИнтеграция.ПараметрыСообщения(
            Данные,
            Строка(Объект.Ссылка.УникальныйИдентификатор()) + ":" + СтрокаТЧ.НомерСтроки);
        Сообщения.Добавить(Сообщение);
    КонецЦикла;

    кфкИнтеграция.ПоместитьВОчередьИсходящих(
        Сообщения,
        "Справочник.кфк_т_ТестовыйСправочник.ТабличнаяЧасть",
        Свойства.Продюсер);

    Возврат Неопределено;

КонецФункции

Когда отправлять и шапку, и строки

Если принимающая сторона хочет получать и шапку справочника, и отдельные строки — не устанавливайте Отказ = Истина, а верните заполненное ПараметрыСообщения для шапки. Строки всё равно будут поставлены в очередь вызовом ПоместитьВОчередьИсходящих и уйдут в catalog.rows, а шапка — в catalog.header.


Сценарий 2. Движения регистра по регистратору — по одной записи в сообщении

По умолчанию для регистров с подчинением регистратору адаптер отдаёт все движения одним сообщением (см. Регистр по регистратору). Если нужен режим «одна запись = одно сообщение», применяется тот же приём.

Настройка продюсера

Тип объект Имя топика Тип сериализации Имя метода
РегистрНакопления.ТоварыНаСкладах (не указывается) Произвольный обработчик СериализацияДвижений
РегистрНакопления.ТоварыНаСкладах.Запись stock.movements Произвольный обработчик СериализацияЗаписи

Как и в первом сценарии, у «регистраторного» типа топик не нужен: обработчик ставит Отказ = Истина и лишь порождает сообщения для ключа …Запись.

Обработчик регистратора — разбивает набор на записи

Функция СериализацияДвижений(Объект, Свойства, ТекстЖурнала, Отказ) Экспорт

    Отказ = Истина; // пакетное сообщение не формируем

    Запрос = Новый Запрос;
    Запрос.Текст =
        "ВЫБРАТЬ
        |    Т.Период       КАК period,
        |    Т.Регистратор  КАК recorder,
        |    Т.Номенклатура КАК item,
        |    Т.Количество   КАК qty
        |ИЗ
        |    РегистрНакопления.ТоварыНаСкладах КАК Т
        |ГДЕ
        |    Т.Регистратор = &Регистратор";
    Запрос.УстановитьПараметр("Регистратор", Объект.Регистратор);
    Выборка = Запрос.Выполнить().Выбрать();

    Сообщения = Новый Массив;
    Пока Выборка.Следующий() Цикл
        Данные = Новый Структура("period, recorder, item, qty");
        ЗаполнитьЗначенияСвойств(Данные, Выборка);

        Сообщение = кфкИнтеграция.ПараметрыСообщения(
            Данные,
            Строка(Объект.Регистратор.УникальныйИдентификатор()) + ":" + Выборка.item.УникальныйИдентификатор());
        Сообщения.Добавить(Сообщение);
    КонецЦикла;

    кфкИнтеграция.ПоместитьВОчередьИсходящих(
        Сообщения,
        "РегистрНакопления.ТоварыНаСкладах.Запись",
        Свойства.Продюсер);

    Возврат Неопределено;

КонецФункции

Зачем ключ с регистратором

Ключ = регистратор:item держит все движения одного документа и одной номенклатуры в одной партиции — получатель увидит их в порядке записи. Если порядок не важен, ключ можно опустить.

Обработчик записи

Нужен, только если хотите дополнительно обогатить запись или изменить формат. В простейшем случае (сообщения уже готовы в первом обработчике) этот обработчик не вызывается.


Ограничения

Объёмы

  • Массив, передаваемый в ПоместитьВОчередьИсходящих, обрабатывается порционно — можно без опасения передавать большие коллекции (десятки и сотни тысяч элементов).
  • Каждое отдельное сообщение — до ~10 МБ.

Смотрите также