Перейти к содержанию

Потоки данных

Детальное описание того, как данные движутся через адаптер — от записи объекта в 1С до попадания в Kafka и обратно.

Исходящие сообщения (1С → Kafka)

Обработка исходящих сообщений выполняется асинхронно и разделена на три независимых этапа, управляемых отдельными диспетчерами: регистрация, сериализация, выгрузка. Дополнительный поток — контроль дублей.

1. Регистрация

Изменения данных в 1С автоматически или вручную регистрируются в регистре «Исходящие сообщения».

Поддерживаются:

  • ссылочные объекты;
  • наборы записей регистров сведений (режим записи Независимый);
  • регистры, подчинённые регистратору (РегистрНакопления, РегистрБухгалтерии, РегистрРасчёта, РегистрСведений с режимом ПодчинениеРегистратору) — единицей регистрации является регистратор (документ), а не набор записей.

Регистрация выполняется:

  • автоматически — через подписки на события записи;
  • вручную — через программный API адаптера.

Перед постановкой в очередь выполняется фильтрация по настройкам продюсеров. Объекты, не подлежащие интеграции, отбрасываются на этапе фильтрации.

Ошибки при автоматической регистрации перехватываются: подробности записываются в журнал регистрации 1С (событие «Адаптер Kafka. Регистрация при записи»), а операция записи объекта не прерывается — пользователь продолжает работу.

Автоматическая регистрация через подписки

flowchart LR
    subgraph OneC["Прикладное решение 1С"]
        DATA_REF["Ссылочный объект"]
        DATA_TAB["Набор записей РС"]
    end
    subgraph Adapter["Адаптер Kafka"]
        EVENT_REF["Подписка - при записи объекта"]
        EVENT_TAB["Подписка - при записи набора"]
        FILTER{{Фильтр по продюсерам}}
        STORAG[/"РС Исходящие сообщения"\]
        DROP["Не подлежит интеграции"]
        ERR["Журнал регистрации 1С"]
    end
    DATA_REF --> EVENT_REF --> FILTER
    DATA_TAB --> EVENT_TAB --> FILTER
    FILTER -->|OK| STORAG
    FILTER -->|Нет| DROP
    FILTER -.->|Исключение| ERR
    STORAG -.->|Исключение| ERR

Ручная регистрация через API

flowchart LR
    subgraph OneC["Прикладное решение 1С"]
        DATA["Подготовка данных"]
        CALL["Вызов API"]
    end
    subgraph Adapter["Адаптер Kafka"]
        API["Интеграция.ПоместитьВОчередьИсходящих"]
        FILTER{{Фильтр по настройкам}}
        QUEUE[/"РС Исходящие сообщения"\]
        DROP["Регистрация отклонена"]
    end

    DATA --> CALL --> API
    API --> FILTER
    FILTER -->|OK| QUEUE
    FILTER -->|Нет| DROP

При ручной регистрации возможны два варианта:

  • если передано готовое тело сообщения, этап сериализации пропускается;
  • если переданы исходные данные, сообщение проходит этап сериализации.

2. Сериализация

Диспетчер сериализации отбирает сообщения, готовые к обработке, и распределяет их между потоками сериализации.

Потоки сериализации:

  • преобразуют данные в формат, заданный настройками продюсера;
  • формируют тело сообщения;
  • обновляют состояние сообщения.

Сериализация может выполняться:

Регистры по регистратору

При использовании КД 3.1 адаптер выполняет запрос всех записей по регистратору и применяет правила к каждой из них. Результат — одно сообщение, тело которого содержит массив объектов (пакетная модель). При использовании произвольного обработчика разработчик получает ссылку на регистратор и формирует тело сообщения самостоятельно.

3. Выгрузка в Kafka

Диспетчер выгрузки отбирает сериализованные сообщения и распределяет их между потоками выгрузки.

flowchart LR
    subgraph Adapter["Адаптер Kafka"]
        QUEUE[/"РС Исходящие сообщения"\]
        DISPATCH[[Диспетчер ФЗ]]
        subgraph WORKERS["Фоновые задания"]
            WORKER1[[Поток 1]]
            WORKER2[[Поток 2]]
            WORKERN[[Поток N]]
            PROCESS["..."]
            DLL["Simple Kafka Connector 1C (DLL)"]
            RESULT[/"РС Исходящие сообщения"\]
        end
    end
    subgraph Kafka["Apache Kafka"]
        TOPIC[(Topic)]
    end
    QUEUE --> DISPATCH
    DISPATCH --> WORKER1 --> PROCESS
    DISPATCH --> WORKER2 --> PROCESS
    DISPATCH --> WORKERN --> PROCESS
    PROCESS <--> DLL -->|отправка| TOPIC
    PROCESS --> |обновление| RESULT

Потоки выгрузки:

  • выполняют отправку тела сообщения в Kafka через внешний компонент;
  • обновляют состояние сообщения по результатам отправки.

Автоматический повтор

При статусе «Ошибка выгрузки» выполняется автоматический повтор отправки: не более 3 попыток, с интервалом равным расписанию регламентного задания. Счётчик попыток хранится в поле «Количество выгрузок» записи РС. После исчерпания попыток (а также при статусе «Ошибка обработки») повторная обработка запускается вручную администратором из РС «Исходящие сообщения».

4. Контроль дублей

Отдельный поток контроля дублей:

  • для группы связанных сообщений определяет последнее актуальное сообщение;
  • все предыдущие сообщения, которые не были выгружены в Kafka, помечаются состоянием «Дубль»;
  • сообщения, помеченные как «Дубль», исключаются из дальнейшей сериализации и выгрузки.

Это позволяет:

  • предотвращать отправку устаревших данных;
  • снижать нагрузку на Kafka и внешние системы;
  • обеспечивать доставку только актуального состояния данных.

Входящие сообщения (Kafka → 1С)

Обработка входящих сообщений выполняется асинхронно и разделена на два независимых этапа: загрузка и десериализация.

1. Загрузка из Kafka

Диспетчер загрузки запускает один или несколько потоков загрузки.

flowchart LR
    subgraph Adapter["Адаптер Kafka"]
        DISPATCH[[Диспетчер ФЗ]]
        subgraph WORKERS["Фоновые задания"]
            WORKER1[[Поток 1]]
            WORKER2[[Поток 2]]
            WORKERN[[Поток N]]
            PROCESS["..."]
            DLL["Simple Kafka Connector 1C (DLL)"]
            RESULT[\"РС Входящие сообщения"/]
        end
    end
    subgraph Kafka["Apache Kafka"]
        CONSUMER[("Kafka Consumer")]
    end
    DISPATCH --> WORKER1 --> PROCESS
    DISPATCH --> WORKER2 --> PROCESS
    DISPATCH --> WORKERN --> PROCESS
    PROCESS <--> DLL
    DLL <--> CONSUMER
    PROCESS -->|запись сообщений| RESULT

Потоки загрузки:

  • в непрерывном режиме ожидают поступления сообщений из Kafka;
  • получают сообщения через внешний компонент;
  • сохраняют полученные сообщения в регистр «Входящие сообщения»;
  • фиксируют начальное состояние сообщений для последующей обработки.

2. Десериализация и прикладная обработка

Диспетчер десериализации отбирает сообщения, готовые к обработке, и распределяет их между потоками десериализации.

Потоки десериализации:

  • преобразуют тело сообщения в формат, ожидаемый прикладным решением;
  • выполняют прикладную обработку сообщений;
  • обновляют состояние сообщений по результатам обработки.

Обработка может выполняться:

  • произвольным обработчиком прикладного решения;
  • через механизм 1С:Конвертация данных 3.1.

Автоповтор не выполняется

Повторная десериализация выполняется вручную — по решению администратора из РС «Входящие сообщения».


Сериализация и десериализация

Единая схема выбора способа преобразования данных:

flowchart LR
    subgraph Adapter["Адаптер Kafka"]
        subgraph OUT["Выгрузка (1С → Kafka)"]
            QUEUE_OUT[/"РС Исходящие сообщения"\]
            DISPATCH_OUT[[Диспетчер ФЗ]]
            subgraph WORKERS_OUT["Фоновые задания"]
                W1O[[Поток 1]]
                W2O[[Поток 2]]
                WNO[[Поток N]]
                FILTER_OUT{{Выбор способа сериализации}}
                RESULT_OUT[/"РС Исходящие сообщения"\]
            end
        end
        subgraph IN["Загрузка (Kafka → 1С)"]
            QUEUE_IN[\"РС Входящие сообщения"/]
            DISPATCH_IN[[Диспетчер ФЗ]]
            subgraph WORKERS_IN["Фоновые задания"]
                W1I[[Поток 1]]
                W2I[[Поток 2]]
                WNI[[Поток N]]
                FILTER_IN{{Выбор способа десериализации}}
                RESULT_IN[\"РС Входящие сообщения"/]
            end
        end
    end
    subgraph OneC["Прикладное решение 1С"]
        CUSTOM["Произвольный обработчик"]
        KD["КД 3.1"]
        XDTO["XDTO + менеджер обмена"]
    end
    QUEUE_OUT --> DISPATCH_OUT
    DISPATCH_OUT --> W1O --> FILTER_OUT
    DISPATCH_OUT --> W2O --> FILTER_OUT
    DISPATCH_OUT --> WNO --> FILTER_OUT
    FILTER_OUT -->|Произвольный| CUSTOM
    FILTER_OUT -->|КД 3.1| KD --> XDTO
    CUSTOM -->|Обновление записи| RESULT_OUT
    XDTO -->|Обновление записи| RESULT_OUT
    QUEUE_IN --> DISPATCH_IN
    DISPATCH_IN --> W1I --> FILTER_IN
    DISPATCH_IN --> W2I --> FILTER_IN
    DISPATCH_IN --> WNI --> FILTER_IN
    FILTER_IN -->|Произвольный| CUSTOM
    FILTER_IN -->|КД 3.1| KD --> XDTO
    CUSTOM -->|Обновление записи| RESULT_IN
    XDTO -->|Обновление записи| RESULT_IN

Управление потоками

Максимальное количество потоков для этапов сериализации, выгрузки, загрузки и десериализации определяется в настройках диспетчера задач.

  • минимальное количество потоков — 1;
  • при отсутствии нагрузки поток автоматически завершается;
  • при появлении нагрузки диспетчер запускает необходимое количество потоков в пределах заданного максимума.

Настройка — см. Продюсеры и Консьюмеры.