Потоки данных¶
Детальное описание того, как данные движутся через адаптер — от записи объекта в 1С до попадания в Kafka и обратно.
Исходящие сообщения (1С → Kafka)¶
Обработка исходящих сообщений выполняется асинхронно и разделена на три независимых этапа, управляемых отдельными диспетчерами: регистрация, сериализация, выгрузка. Дополнительный поток — контроль дублей.
1. Регистрация¶
Изменения данных в 1С автоматически или вручную регистрируются в регистре «Исходящие сообщения».
Поддерживаются:
- ссылочные объекты;
- наборы записей регистров сведений (режим записи Независимый);
- регистры, подчинённые регистратору (
РегистрНакопления,РегистрБухгалтерии,РегистрРасчёта,РегистрСведенийс режимом ПодчинениеРегистратору) — единицей регистрации является регистратор (документ), а не набор записей.
Регистрация выполняется:
- автоматически — через подписки на события записи;
- вручную — через программный API адаптера.
Перед постановкой в очередь выполняется фильтрация по настройкам продюсеров. Объекты, не подлежащие интеграции, отбрасываются на этапе фильтрации.
Ошибки при автоматической регистрации перехватываются: подробности записываются в журнал регистрации 1С (событие «Адаптер Kafka. Регистрация при записи»), а операция записи объекта не прерывается — пользователь продолжает работу.
Автоматическая регистрация через подписки¶
flowchart LR
subgraph OneC["Прикладное решение 1С"]
DATA_REF["Ссылочный объект"]
DATA_TAB["Набор записей РС"]
end
subgraph Adapter["Адаптер Kafka"]
EVENT_REF["Подписка - при записи объекта"]
EVENT_TAB["Подписка - при записи набора"]
FILTER{{Фильтр по продюсерам}}
STORAG[/"РС Исходящие сообщения"\]
DROP["Не подлежит интеграции"]
ERR["Журнал регистрации 1С"]
end
DATA_REF --> EVENT_REF --> FILTER
DATA_TAB --> EVENT_TAB --> FILTER
FILTER -->|OK| STORAG
FILTER -->|Нет| DROP
FILTER -.->|Исключение| ERR
STORAG -.->|Исключение| ERR Ручная регистрация через API¶
flowchart LR
subgraph OneC["Прикладное решение 1С"]
DATA["Подготовка данных"]
CALL["Вызов API"]
end
subgraph Adapter["Адаптер Kafka"]
API["Интеграция.ПоместитьВОчередьИсходящих"]
FILTER{{Фильтр по настройкам}}
QUEUE[/"РС Исходящие сообщения"\]
DROP["Регистрация отклонена"]
end
DATA --> CALL --> API
API --> FILTER
FILTER -->|OK| QUEUE
FILTER -->|Нет| DROP При ручной регистрации возможны два варианта:
- если передано готовое тело сообщения, этап сериализации пропускается;
- если переданы исходные данные, сообщение проходит этап сериализации.
2. Сериализация¶
Диспетчер сериализации отбирает сообщения, готовые к обработке, и распределяет их между потоками сериализации.
Потоки сериализации:
- преобразуют данные в формат, заданный настройками продюсера;
- формируют тело сообщения;
- обновляют состояние сообщения.
Сериализация может выполняться:
- произвольным обработчиком прикладного решения;
- через механизм 1С:Конвертация данных 3.1.
Регистры по регистратору
При использовании КД 3.1 адаптер выполняет запрос всех записей по регистратору и применяет правила к каждой из них. Результат — одно сообщение, тело которого содержит массив объектов (пакетная модель). При использовании произвольного обработчика разработчик получает ссылку на регистратор и формирует тело сообщения самостоятельно.
3. Выгрузка в Kafka¶
Диспетчер выгрузки отбирает сериализованные сообщения и распределяет их между потоками выгрузки.
flowchart LR
subgraph Adapter["Адаптер Kafka"]
QUEUE[/"РС Исходящие сообщения"\]
DISPATCH[[Диспетчер ФЗ]]
subgraph WORKERS["Фоновые задания"]
WORKER1[[Поток 1]]
WORKER2[[Поток 2]]
WORKERN[[Поток N]]
PROCESS["..."]
DLL["Simple Kafka Connector 1C (DLL)"]
RESULT[/"РС Исходящие сообщения"\]
end
end
subgraph Kafka["Apache Kafka"]
TOPIC[(Topic)]
end
QUEUE --> DISPATCH
DISPATCH --> WORKER1 --> PROCESS
DISPATCH --> WORKER2 --> PROCESS
DISPATCH --> WORKERN --> PROCESS
PROCESS <--> DLL -->|отправка| TOPIC
PROCESS --> |обновление| RESULT Потоки выгрузки:
- выполняют отправку тела сообщения в Kafka через внешний компонент;
- обновляют состояние сообщения по результатам отправки.
Автоматический повтор
При статусе «Ошибка выгрузки» выполняется автоматический повтор отправки: не более 3 попыток, с интервалом равным расписанию регламентного задания. Счётчик попыток хранится в поле «Количество выгрузок» записи РС. После исчерпания попыток (а также при статусе «Ошибка обработки») повторная обработка запускается вручную администратором из РС «Исходящие сообщения».
4. Контроль дублей¶
Отдельный поток контроля дублей:
- для группы связанных сообщений определяет последнее актуальное сообщение;
- все предыдущие сообщения, которые не были выгружены в Kafka, помечаются состоянием «Дубль»;
- сообщения, помеченные как «Дубль», исключаются из дальнейшей сериализации и выгрузки.
Это позволяет:
- предотвращать отправку устаревших данных;
- снижать нагрузку на Kafka и внешние системы;
- обеспечивать доставку только актуального состояния данных.
Входящие сообщения (Kafka → 1С)¶
Обработка входящих сообщений выполняется асинхронно и разделена на два независимых этапа: загрузка и десериализация.
1. Загрузка из Kafka¶
Диспетчер загрузки запускает один или несколько потоков загрузки.
flowchart LR
subgraph Adapter["Адаптер Kafka"]
DISPATCH[[Диспетчер ФЗ]]
subgraph WORKERS["Фоновые задания"]
WORKER1[[Поток 1]]
WORKER2[[Поток 2]]
WORKERN[[Поток N]]
PROCESS["..."]
DLL["Simple Kafka Connector 1C (DLL)"]
RESULT[\"РС Входящие сообщения"/]
end
end
subgraph Kafka["Apache Kafka"]
CONSUMER[("Kafka Consumer")]
end
DISPATCH --> WORKER1 --> PROCESS
DISPATCH --> WORKER2 --> PROCESS
DISPATCH --> WORKERN --> PROCESS
PROCESS <--> DLL
DLL <--> CONSUMER
PROCESS -->|запись сообщений| RESULT Потоки загрузки:
- в непрерывном режиме ожидают поступления сообщений из Kafka;
- получают сообщения через внешний компонент;
- сохраняют полученные сообщения в регистр «Входящие сообщения»;
- фиксируют начальное состояние сообщений для последующей обработки.
2. Десериализация и прикладная обработка¶
Диспетчер десериализации отбирает сообщения, готовые к обработке, и распределяет их между потоками десериализации.
Потоки десериализации:
- преобразуют тело сообщения в формат, ожидаемый прикладным решением;
- выполняют прикладную обработку сообщений;
- обновляют состояние сообщений по результатам обработки.
Обработка может выполняться:
- произвольным обработчиком прикладного решения;
- через механизм 1С:Конвертация данных 3.1.
Автоповтор не выполняется
Повторная десериализация выполняется вручную — по решению администратора из РС «Входящие сообщения».
Сериализация и десериализация¶
Единая схема выбора способа преобразования данных:
flowchart LR
subgraph Adapter["Адаптер Kafka"]
subgraph OUT["Выгрузка (1С → Kafka)"]
QUEUE_OUT[/"РС Исходящие сообщения"\]
DISPATCH_OUT[[Диспетчер ФЗ]]
subgraph WORKERS_OUT["Фоновые задания"]
W1O[[Поток 1]]
W2O[[Поток 2]]
WNO[[Поток N]]
FILTER_OUT{{Выбор способа сериализации}}
RESULT_OUT[/"РС Исходящие сообщения"\]
end
end
subgraph IN["Загрузка (Kafka → 1С)"]
QUEUE_IN[\"РС Входящие сообщения"/]
DISPATCH_IN[[Диспетчер ФЗ]]
subgraph WORKERS_IN["Фоновые задания"]
W1I[[Поток 1]]
W2I[[Поток 2]]
WNI[[Поток N]]
FILTER_IN{{Выбор способа десериализации}}
RESULT_IN[\"РС Входящие сообщения"/]
end
end
end
subgraph OneC["Прикладное решение 1С"]
CUSTOM["Произвольный обработчик"]
KD["КД 3.1"]
XDTO["XDTO + менеджер обмена"]
end
QUEUE_OUT --> DISPATCH_OUT
DISPATCH_OUT --> W1O --> FILTER_OUT
DISPATCH_OUT --> W2O --> FILTER_OUT
DISPATCH_OUT --> WNO --> FILTER_OUT
FILTER_OUT -->|Произвольный| CUSTOM
FILTER_OUT -->|КД 3.1| KD --> XDTO
CUSTOM -->|Обновление записи| RESULT_OUT
XDTO -->|Обновление записи| RESULT_OUT
QUEUE_IN --> DISPATCH_IN
DISPATCH_IN --> W1I --> FILTER_IN
DISPATCH_IN --> W2I --> FILTER_IN
DISPATCH_IN --> WNI --> FILTER_IN
FILTER_IN -->|Произвольный| CUSTOM
FILTER_IN -->|КД 3.1| KD --> XDTO
CUSTOM -->|Обновление записи| RESULT_IN
XDTO -->|Обновление записи| RESULT_IN Управление потоками¶
Максимальное количество потоков для этапов сериализации, выгрузки, загрузки и десериализации определяется в настройках диспетчера задач.
- минимальное количество потоков — 1;
- при отсутствии нагрузки поток автоматически завершается;
- при появлении нагрузки диспетчер запускает необходимое количество потоков в пределах заданного максимума.
Настройка — см. Продюсеры и Консьюмеры.