Simple-Kafka_Adapter icon indicating copy to clipboard operation
Simple-Kafka_Adapter copied to clipboard

Краш 1С при остановке слушателя

Open skyq opened this issue 2 years ago • 7 comments

Платформа 8.3.20.1674, обычные формы. Создаю слушателя, читаю сообщения - все работает. Иногда при выполнении

Компонента.ОстановитьКонсьюмера();

Происходит краш 1С. Какие сведения от меня могут быть полезны?

skyq avatar Jul 12 '23 15:07 skyq

Рекомендуется использовать платформу не ниже 8.3.21. С данной версии появилась возможность запускать внешние компоненты в отдельном потоке.

Так же рекомендуется подобрать оптимальные параметры для работы слушателя https://kafka.apache.org/documentation/#connectconfigs_session.timeout.ms https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms

NuclearAPK avatar Jul 13 '23 12:07 NuclearAPK

Пробовал с 8.3.22 - та же проблема

skyq avatar Jul 21 '23 08:07 skyq

Пробовал с 8.3.22 - та же проблема

Какие параметры устанавливаете для слушателя? Какой таймаут уставлен для ожидания появления сообщений в топике?

// установка таймаута для ожидания сообщений
Компонента.УстановитьТаймаутОжидания(Таймаут);

Пример кода слушателя

	// получаем параметры задания
	ВремяНачалаЗадания    = Дата(1, 1, 1);
	ВремяОкончанияЗадания = Дата(1, 1, 1);
	Задания = РегламентныеЗадания.ПолучитьРегламентныеЗадания(Новый Структура("УникальныйИдентификатор", Новый УникальныйИдентификатор("770c7460-3082-49af-8b51-bd59561893e2")));
	Если Задания.Количество() Тогда  
		ВремяНачалаЗадания    = Задания[0].Расписание.ВремяНачала;
		ВремяОкончанияЗадания = Задания[0].Расписание.ВремяКонца;
	КонецЕсли;
	
	ВремяНачала = ТекущаяДатаСеанса();	
	ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Информация,,, СтрШаблон("Старт подписки на топик %1", Топик));
	
    Попытка
		Компонента = Новый(СтрШаблон("AddIn.%1.simpleKafka1C", "Integration"));   
	Исключение
		Подключено = ПодключитьВнешнююКомпоненту("ОбщийМакет.ВИ_КомпонентаSimpleKafka", "Integration", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.Изолированно);
		Если Подключено Тогда
			Компонента = Новый(СтрШаблон("AddIn.%1.simpleKafka1C", "Integration"));  	
		КонецЕсли;
	КонецПопытки;
	
	Если Компонента = Неопределено Тогда
		Возврат;	
	КонецЕсли;
	
	Для каждого Параметр_ Из Параметры Цикл
		Компонента.УстановитьПараметр(Строка(Параметр_.Ключ), Строка(Параметр_.Значение));	
	КонецЦикла;   	
	
	КаталогЛогов = ВИ_ОбщегоНазначения.ЗначениеПоАналитикам(Справочники.ВИ_ПроизвольнаяАналитика.ВИ_КаталогЛоговKafka, "");
	Если ЗначениеЗаполнено(КаталогЛогов) Тогда
		Компонента.УстановитьФайлЛогирования(СтрШаблон("%1%2%3.log", КаталогЛогов , ПолучитьРазделительПути(), Новый УникальныйИдентификатор));
	КонецЕсли;
   
	Результат = Компонента.ИнициализироватьКонсьюмера(Брокер, Топик);      
	
	// установка таймаута для ожидания сообщений
	Компонента.УстановитьТаймаутОжидания(Таймаут);	

	Если Не Результат Тогда  
		ТекстОшибки = СтрШаблон("Не удалось инициализировать консьюмера для топика %1", Топик);  
		ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Ошибка,,, ТекстОшибки);
		Возврат;
	КонецЕсли;                 
		
	Пока РазрешеноСлушать Цикл
		
		РазрешеноСлушать = ВИ_ОбщегоНазначения.ЗначениеПоАналитикам(Аналитики, Ложь);		
		Если Не РазрешеноСлушать Тогда
			Прервать;	
		КонецЕсли;			      
		
		Попытка
			Сообщение = Компонента.Слушать(); 
		Исключение                	
			Компонента = Неопределено;                                                         
			ВызватьИсключение ОписаниеОшибки();
		КонецПопытки;	
		
		Если НЕ ЗначениеЗаполнено(Сообщение) Тогда  
			Если (ТекущаяДатаСеанса() > ВремяНачала + ВремяРаботы) ИЛИ 
				(ЗначениеЗаполнено(ВремяОкончанияЗадания) И ЗначениеЗаполнено(ВремяНачалаЗадания) И 
					ТекущаяДатаСеанса() > НачалоДня(ТекущаяДатаСеанса()) + (ВремяОкончанияЗадания - Дата(1,1,1)) И 
					ТекущаяДатаСеанса() < НачалоДня(ТекущаяДатаСеанса()) + (ВремяНачалаЗадания - Дата(1,1,1))) Тогда
					
				РазрешеноСлушать = Ложь;   
				
			КонецЕсли;
			Продолжить;	    	
		КонецЕсли;	
		
		КлючСообщения = ПолучитьХешКлючСообщения(Сообщение);
		Если Не СообщениеПрочитано(КлючСообщения) Тогда		     
			Статус = Перечисления.ВИ_СтатусыПолученныхСообщений_Из_Kafka.Обрабатывается;
			ЗарегистрироватьСостояниеСообщения(Сообщение, КлючСообщения, Источник, Статус);		
		КонецЕсли;

	КонецЦикла;  
	
	Компонента.ОстановитьКонсьюмера();	
	Компонента = Неопределено;
	
	ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Информация,,, СтрШаблон("Остановка подписки на топик %1", Топик));

Используемые параметры: Таймаут = 5 000, enable.partition.eof = "false" fetch.wait.max.ms = 500 fetch.min.bytes = 4 096 max.poll.interval.ms = 14 500 000 log.thread.name = "true"

NuclearAPK avatar Jul 22 '23 09:07 NuclearAPK

так же рекомендуется скачать последний релиз 1.0.2

NuclearAPK avatar Jul 23 '23 19:07 NuclearAPK

Можно использовать последнюю редакцию 1.1.0. В ней появилось логирование. Достаточно указать настройку "debug" в значениях настройки должно быть указано "broker,topic,msg"

NuclearAPK avatar Sep 01 '23 11:09 NuclearAPK

Креши могут быть из-за изолированного режима. Он в платформе пока что не очень рабочий

1CMedicine avatar Sep 21 '23 15:09 1CMedicine

Пробовал с 8.3.22 - та же проблема

Попробуйте использовать релиз 1.4.0 компоненты https://github.com/NuclearAPK/Simple-Kafka_Adapter/releases/download/v1.4.0/SimpleKafka1C64_1_4_0.zip

NuclearAPK avatar Dec 19 '23 20:12 NuclearAPK