[Bug][Manager] Failed to use Elasticsearch sink
What happened
When I used Elasticsearch as the data node, the configuration process encountered the following error:
[ ] 2023-11-07 08:19:08.132 - INFO [inlong-mq-process-0] a.i.m.s.r.s.e.ElasticsearchApi:166 - create test_pulsar_table_es:true
[ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0] .i.m.s.s.StreamSinkServiceImpl:518 - success to update sink status=130 for id=1 with log: success to create es resource
[ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0] .ElasticsearchResourceOperator:111 - success to create es resource for sinkInfo=SinkInfo(id=1, inlongGroupId=test_pulsar_group, inlongStreamId=test_pulsar_stream, sinkType=ELASTICSEARCH, inlongClusterName=null, sinkName=test_pulsar_stream_sink, dataNodeName=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d, description=null, enableCreateResource=1, extParams={"hosts":"http://elasticsearch:9200","username":"admin","password":"******","indexName":"test_pulsar_table_es","flushInterval":null,"flushRecord":1000,"retryTimes":null,"keyFieldNames":null,"documentType":"test_type","primaryKey":"name","esVersion":7,"encryptVersion":1,"properties":{}}, status=100, creator=admin, mqResource=test_pulsar_stream, dataType=CSV, sourceSeparator=124, dataEscapeChar=null)
[ ] 2023-11-07 08:19:08.139 -ERROR [inlong-mq-process-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=3, processName=CREATE_STREAM_RESOURCE, processDisplayName=Create Stream, inlongGroupId=test_pulsar_group, taskId=4, elementName=InitSink, elementDisplayName=Stream-InitSink, eventType=TaskEvent, event=COMPLETE, listener=StreamSinkResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=find no proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d ) error:
java.lang.IllegalArgumentException: find no proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d
at org.apache.inlong.manager.common.util.Preconditions.expectNotBlank(Preconditions.java:143) ~[manager-common-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator.assignCluster(AbstractStandaloneSinkResourceOperator.java:58) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.resource.sink.es.ElasticsearchResourceOperator.createSinkResource(ElasticsearchResourceOperator.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener.listen(StreamSinkResourceListener.java:95) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$0(QueueResourceListener.java:159) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
[ ] 2023-11-07 08:19:08.154 - INFO [inlong-mq-process-0] .m.s.s.InlongStreamServiceImpl:725 - success to update stream after approve for groupId=test_pulsar_group, streamId=test_pulsar_stream
[ ] 2023-11-07 08:19:08.160 -ERROR [inlong-mq-process-0] .m.s.l.q.QueueResourceListener:168 - failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream
[ ] 2023-11-07 08:19:08.160 -ERROR [inlong-workflow-0] .m.s.l.q.QueueResourceListener:179 - failed to execute stream process in asynchronously
java.util.concurrent.ExecutionException: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_342]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_342]
at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:176) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
Caused by: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream
at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$1(QueueResourceListener.java:169) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_342]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_342]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_342]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_342]
... 3 more
[ ] 2023-11-07 08:19:08.161 -ERROR [inlong-workflow-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=2, processName=CREATE_GROUP_RESOURCE, processDisplayName=Create Group, inlongGroupId=test_pulsar_group, taskId=2, elementName=InitMQ, elementDisplayName=Group-InitMQ, eventType=TaskEvent, event=COMPLETE, listener=QueueResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream) error:
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream
at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:180) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
[ ] 2023-11-07 08:19:08.169 - INFO [inlong-workflow-0] .s.l.g.InitGroupFailedListener:62 - begin to execute InitGroupFailedListener for groupId=test_pulsar_group
[ ] 2023-11-07 08:19:08.170 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:446 - begin to update group status to [120] for groupId=test_pulsar_group by user=admin
What you expected to happen
Use ES sink
How to reproduce
Use ES sink
Environment
Ubuntu 22.04.3 LTS
InLong version
master
InLong Component
InLong Manager
Are you willing to submit PR?
- [X] Yes, I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This issue is stale because it has been open for 60 days with no activity.