Simple-Kafka_Adapter
Simple-Kafka_Adapter copied to clipboard
Краш 1С при остановке слушателя
Платформа 8.3.20.1674, обычные формы. Создаю слушателя, читаю сообщения - все работает. Иногда при выполнении
Компонента.ОстановитьКонсьюмера();
Происходит краш 1С. Какие сведения от меня могут быть полезны?
Рекомендуется использовать платформу не ниже 8.3.21. С данной версии появилась возможность запускать внешние компоненты в отдельном потоке.
Так же рекомендуется подобрать оптимальные параметры для работы слушателя https://kafka.apache.org/documentation/#connectconfigs_session.timeout.ms https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms
Пробовал с 8.3.22 - та же проблема
Пробовал с 8.3.22 - та же проблема
Какие параметры устанавливаете для слушателя? Какой таймаут уставлен для ожидания появления сообщений в топике?
// установка таймаута для ожидания сообщений
Компонента.УстановитьТаймаутОжидания(Таймаут);
Пример кода слушателя
// получаем параметры задания
ВремяНачалаЗадания = Дата(1, 1, 1);
ВремяОкончанияЗадания = Дата(1, 1, 1);
Задания = РегламентныеЗадания.ПолучитьРегламентныеЗадания(Новый Структура("УникальныйИдентификатор", Новый УникальныйИдентификатор("770c7460-3082-49af-8b51-bd59561893e2")));
Если Задания.Количество() Тогда
ВремяНачалаЗадания = Задания[0].Расписание.ВремяНачала;
ВремяОкончанияЗадания = Задания[0].Расписание.ВремяКонца;
КонецЕсли;
ВремяНачала = ТекущаяДатаСеанса();
ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Информация,,, СтрШаблон("Старт подписки на топик %1", Топик));
Попытка
Компонента = Новый(СтрШаблон("AddIn.%1.simpleKafka1C", "Integration"));
Исключение
Подключено = ПодключитьВнешнююКомпоненту("ОбщийМакет.ВИ_КомпонентаSimpleKafka", "Integration", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.Изолированно);
Если Подключено Тогда
Компонента = Новый(СтрШаблон("AddIn.%1.simpleKafka1C", "Integration"));
КонецЕсли;
КонецПопытки;
Если Компонента = Неопределено Тогда
Возврат;
КонецЕсли;
Для каждого Параметр_ Из Параметры Цикл
Компонента.УстановитьПараметр(Строка(Параметр_.Ключ), Строка(Параметр_.Значение));
КонецЦикла;
КаталогЛогов = ВИ_ОбщегоНазначения.ЗначениеПоАналитикам(Справочники.ВИ_ПроизвольнаяАналитика.ВИ_КаталогЛоговKafka, "");
Если ЗначениеЗаполнено(КаталогЛогов) Тогда
Компонента.УстановитьФайлЛогирования(СтрШаблон("%1%2%3.log", КаталогЛогов , ПолучитьРазделительПути(), Новый УникальныйИдентификатор));
КонецЕсли;
Результат = Компонента.ИнициализироватьКонсьюмера(Брокер, Топик);
// установка таймаута для ожидания сообщений
Компонента.УстановитьТаймаутОжидания(Таймаут);
Если Не Результат Тогда
ТекстОшибки = СтрШаблон("Не удалось инициализировать консьюмера для топика %1", Топик);
ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Ошибка,,, ТекстОшибки);
Возврат;
КонецЕсли;
Пока РазрешеноСлушать Цикл
РазрешеноСлушать = ВИ_ОбщегоНазначения.ЗначениеПоАналитикам(Аналитики, Ложь);
Если Не РазрешеноСлушать Тогда
Прервать;
КонецЕсли;
Попытка
Сообщение = Компонента.Слушать();
Исключение
Компонента = Неопределено;
ВызватьИсключение ОписаниеОшибки();
КонецПопытки;
Если НЕ ЗначениеЗаполнено(Сообщение) Тогда
Если (ТекущаяДатаСеанса() > ВремяНачала + ВремяРаботы) ИЛИ
(ЗначениеЗаполнено(ВремяОкончанияЗадания) И ЗначениеЗаполнено(ВремяНачалаЗадания) И
ТекущаяДатаСеанса() > НачалоДня(ТекущаяДатаСеанса()) + (ВремяОкончанияЗадания - Дата(1,1,1)) И
ТекущаяДатаСеанса() < НачалоДня(ТекущаяДатаСеанса()) + (ВремяНачалаЗадания - Дата(1,1,1))) Тогда
РазрешеноСлушать = Ложь;
КонецЕсли;
Продолжить;
КонецЕсли;
КлючСообщения = ПолучитьХешКлючСообщения(Сообщение);
Если Не СообщениеПрочитано(КлючСообщения) Тогда
Статус = Перечисления.ВИ_СтатусыПолученныхСообщений_Из_Kafka.Обрабатывается;
ЗарегистрироватьСостояниеСообщения(Сообщение, КлючСообщения, Источник, Статус);
КонецЕсли;
КонецЦикла;
Компонента.ОстановитьКонсьюмера();
Компонента = Неопределено;
ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Информация,,, СтрШаблон("Остановка подписки на топик %1", Топик));
Используемые параметры: Таймаут = 5 000, enable.partition.eof = "false" fetch.wait.max.ms = 500 fetch.min.bytes = 4 096 max.poll.interval.ms = 14 500 000 log.thread.name = "true"
так же рекомендуется скачать последний релиз 1.0.2
Можно использовать последнюю редакцию 1.1.0. В ней появилось логирование. Достаточно указать настройку "debug" в значениях настройки должно быть указано "broker,topic,msg"
Креши могут быть из-за изолированного режима. Он в платформе пока что не очень рабочий
Пробовал с 8.3.22 - та же проблема
Попробуйте использовать релиз 1.4.0 компоненты https://github.com/NuclearAPK/Simple-Kafka_Adapter/releases/download/v1.4.0/SimpleKafka1C64_1_4_0.zip