inlong icon indicating copy to clipboard operation
inlong copied to clipboard

[Bug][Manager] Failed to use Elasticsearch sink

Open haibo-duan opened this issue 2 years ago • 1 comments

What happened

When I used Elasticsearch as the data node, the configuration process encountered the following error:

[ ] 2023-11-07 08:19:08.132 - INFO [inlong-mq-process-0] a.i.m.s.r.s.e.ElasticsearchApi:166 - create test_pulsar_table_es:true [ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0] .i.m.s.s.StreamSinkServiceImpl:518 - success to update sink status=130 for id=1 with log: success to create es resource [ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0] .ElasticsearchResourceOperator:111 - success to create es resource for sinkInfo=SinkInfo(id=1, inlongGroupId=test_pulsar_group, inlongStreamId=test_pulsar_stream, sinkType=ELASTICSEARCH, inlongClusterName=null, sinkName=test_pulsar_stream_sink, dataNodeName=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d, description=null, enableCreateResource=1, extParams={"hosts":"http://elasticsearch:9200","username":"admin","password":"******","indexName":"test_pulsar_table_es","flushInterval":null,"flushRecord":1000,"retryTimes":null,"keyFieldNames":null,"documentType":"test_type","primaryKey":"name","esVersion":7,"encryptVersion":1,"properties":{}}, status=100, creator=admin, mqResource=test_pulsar_stream, dataType=CSV, sourceSeparator=124, dataEscapeChar=null) [ ] 2023-11-07 08:19:08.139 -ERROR [inlong-mq-process-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=3, processName=CREATE_STREAM_RESOURCE, processDisplayName=Create Stream, inlongGroupId=test_pulsar_group, taskId=4, elementName=InitSink, elementDisplayName=Stream-InitSink, eventType=TaskEvent, event=COMPLETE, listener=StreamSinkResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=find no proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d ) error:
java.lang.IllegalArgumentException: find no proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d at org.apache.inlong.manager.common.util.Preconditions.expectNotBlank(Preconditions.java:143) ~[manager-common-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator.assignCluster(AbstractStandaloneSinkResourceOperator.java:58) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.resource.sink.es.ElasticsearchResourceOperator.createSinkResource(ElasticsearchResourceOperator.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener.listen(StreamSinkResourceListener.java:95) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$0(QueueResourceListener.java:159) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] [ ] 2023-11-07 08:19:08.154 - INFO [inlong-mq-process-0] .m.s.s.InlongStreamServiceImpl:725 - success to update stream after approve for groupId=test_pulsar_group, streamId=test_pulsar_stream [ ] 2023-11-07 08:19:08.160 -ERROR [inlong-mq-process-0] .m.s.l.q.QueueResourceListener:168 - failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream [ ] 2023-11-07 08:19:08.160 -ERROR [inlong-workflow-0] .m.s.l.q.QueueResourceListener:179 - failed to execute stream process in asynchronously
java.util.concurrent.ExecutionException: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_342] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:176) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] Caused by: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$1(QueueResourceListener.java:169) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_342] ... 3 more [ ] 2023-11-07 08:19:08.161 -ERROR [inlong-workflow-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=2, processName=CREATE_GROUP_RESOURCE, processDisplayName=Create Group, inlongGroupId=test_pulsar_group, taskId=2, elementName=InitMQ, elementDisplayName=Group-InitMQ, eventType=TaskEvent, event=COMPLETE, listener=QueueResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream) error:
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:180) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] [ ] 2023-11-07 08:19:08.169 - INFO [inlong-workflow-0] .s.l.g.InitGroupFailedListener:62 - begin to execute InitGroupFailedListener for groupId=test_pulsar_group [ ] 2023-11-07 08:19:08.170 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:446 - begin to update group status to [120] for groupId=test_pulsar_group by user=admin

image

image

image

What you expected to happen

Use ES sink

How to reproduce

Use ES sink

Environment

Ubuntu 22.04.3 LTS

InLong version

master

InLong Component

InLong Manager

Are you willing to submit PR?

  • [X] Yes, I am willing to submit a PR!

Code of Conduct

haibo-duan avatar Nov 07 '23 08:11 haibo-duan

This issue is stale because it has been open for 60 days with no activity.

github-actions[bot] avatar Feb 04 '24 01:02 github-actions[bot]