pinpoint icon indicating copy to clipboard operation
pinpoint copied to clipboard

setup flink job error

Open sjkcdpc opened this issue 1 year ago • 8 comments

software version **

pinpoint v2.3.3 flink flink-1.14.5-bin-scala_2.11.tgz hadoop-2.10.2 hbase-2.4.13

Describe your problem**

[root@pp-02 ~]# tail -f /var/log/pinpoint-collector.log 
07-22 20:46:00.000 [cStat-Worker-16] WARN  c.n.p.c.s.SendAgentStatService           : not send flink server. Because FlinkTcpDataSender is null.
07-22 20:46:30.030 [cStat-Worker-17] WARN  c.n.p.c.s.SendAgentStatService           : not send flink server. Because FlinkTcpDataSender is null.
07-22 20:47:00.000 [cStat-Worker-18] WARN  c.n.p.c.s.SendAgentStatService           : not send flink server. Because FlinkTcpDataSender is null.

07-22 22:17:14.014 [weeper-pool3-t1] INFO  o.a.h.h.i.AbstractRpcClient              -- Cleanup idle connection to pp-03:16020
07-22 22:17:14.014 [weeper-pool3-t1] INFO  o.a.h.h.i.AbstractRpcClient              -- Cleanup idle connection to pp-02:16020
07-22 22:18:00.000 [    scheduler-3] INFO  o.s.b.c.l.s.SimpleJobLauncher            -- Job: [FlowJob: [name=alarmJob]] launched with the following parameters: [{schedule.date=1658499480000}]
07-22 22:18:00.000 [    scheduler-3] INFO  o.s.b.c.j.SimpleStepHandler              -- Executing step: [alarmPartitionStep]
07-22 22:18:00.000 [rForPartition-1] INFO  c.n.p.b.a.c.SlowCountChecker             -- SlowCountChecker result is false for application (testapp). value is 0. (threshold : 1).
07-22 22:18:00.000 [rForPartition-1] INFO  c.n.p.b.a.c.SlowRateChecker              -- SlowRateChecker result is false for application (testapp). value is 0. (threshold : 1).
07-22 22:18:00.000 [rForPartition-1] INFO  c.n.p.b.a.c.ResponseCountChecker         -- ResponseCountChecker result is false for application (testapp). value is 0. (threshold : 1).
07-22 22:18:00.000 [rForPartition-1] INFO  c.n.p.b.a.c.SlowCountChecker             -- SlowCountChecker result is false for application (testapp). value is 0. (threshold : 1).
07-22 22:18:00.000 [rForPartition-1] INFO  o.s.b.c.s.AbstractStep                   -- Step: [alarmStep:alarm_partition_number_1] executed in 326ms
07-22 22:18:00.000 [    scheduler-3] INFO  o.s.b.c.s.AbstractStep                   -- Step: [alarmPartitionStep] executed in 455ms
07-22 22:18:00.000 [    scheduler-3] INFO  o.s.b.c.l.s.SimpleJobLauncher            -- Job: [FlowJob: [name=alarmJob]] completed with the following parameters: [{schedule.date=1658499480000}] and the following status: [COMPLETED] in 514ms
07-22 22:20:00.000 [    scheduler-4] INFO  o.s.b.c.l.s.SimpleJobLauncher            -- Job: [FlowJob: [name=flinkCheckJob]] launched with the following parameters: [{schedule.date=1658499600001}]
07-22 22:20:00.000 [    scheduler-4] INFO  o.s.b.c.j.SimpleStepHandler              -- Executing step: [flinkCheckStep]
07-22 22:20:00.000 [    scheduler-4] ERROR o.s.b.c.s.AbstractStep                   -- Encountered an error executing step flinkCheckStep in job flinkCheckJob
java.lang.Exception: job fail : [Aggregation Stat Data]
        at com.navercorp.pinpoint.batch.flink.HealthCheckTaskletV2.execute(HealthCheckTaskletV2.java:93) ~[pinpoint-batch-2.3.3.jar!/:2.3.3]
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar!/:5.3.6]
        at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.3.6.jar!/:5.3.6]
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.4.RELEASE.jar!/:4.2.4.RELEASE]
        at com.navercorp.pinpoint.batch.common.JobLaunchSupport.run(JobLaunchSupport.java:45) [pinpoint-batch-2.3.3.jar!/:2.3.3]
        at com.navercorp.pinpoint.batch.common.BatchJobLauncher.flinkCheckJob(BatchJobLauncher.java:61) [pinpoint-batch-2.3.3.jar!/:2.3.3]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
        at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) [spring-context-5.3.6.jar!/:5.3.6]
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.6.jar!/:5.3.6]
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95) [spring-context-5.3.6.jar!/:5.3.6]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_171]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_171]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_171]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_171]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
07-22 22:20:00.000 [    scheduler-4] INFO  o.s.b.c.s.AbstractStep                   -- Step: [flinkCheckStep] executed in 17ms
07-22 22:20:00.000 [    scheduler-4] INFO  o.s.b.c.l.s.SimpleJobLauncher            -- Job: [FlowJob: [name=flinkCheckJob]] completed with the following parameters: [{schedule.date=1658499600001}] and the following status: [FAILED] in 63ms

####config file **

pinpoint/batch/src/main/resources/batch-root.properties
batch.server.env=release

#smtp config
pinpoint.url=http://192.168.5.205:8080
alarm.mail.server.url=smtp.exmail.qq.com
alarm.mail.server.port=587
alarm.mail.server.username=xxx
alarm.mail.server.password=xxx
alarm.mail.sender.address=xxx

alarm.mail.transport.protocol=smtp
alarm.mail.smtp.port=587
alarm.mail.smtp.auth=true
alarm.mail.smtp.starttls.enable=false
alarm.mail.smtp.starttls.required=false
alarm.mail.debug=true

# webhook config
webhook.enable=true

#flink server list
batch.flink.server=pp-01

#cleanup inactive agents job
job.cleanup.inactive.agents=false

# "0 0 3 * * WED" = 3:00 AM on every Wednesday.
# "0 0 0 10 * *" = 0:00 AM on the 10th of every month.
# "0 0 16 * * MON-FRI" = 4:00 PM on every weekdays.
#  There is no default value.
job.cleanup.inactive.agents.cron=

# Default value is 30 (minimum value is 30)
#job.cleanup.inactive.agents.duration.days=
pinpoint/collector/src/main/resources/pinpoint-collector-root.properties
pinpoint.zookeeper.address=pp-01,pp-02,pp-03

# base data receiver config  ---------------------------------------------------------------------
collector.receiver.base.ip=0.0.0.0
collector.receiver.base.port=9994

# number of tcp worker threads
collector.receiver.base.worker.threadSize=8
# capacity of tcp worker queue
collector.receiver.base.worker.queueSize=1024
# monitoring for tcp worker
collector.receiver.base.worker.monitor=true

collector.receiver.base.request.timeout=3000
collector.receiver.base.closewait.timeout=3000
# 5 min
collector.receiver.base.ping.interval=300000
# 30 min
collector.receiver.base.pingwait.timeout=1800000


# stat receiver config  ---------------------------------------------------------------------
collector.receiver.stat.udp=true
collector.receiver.stat.udp.ip=0.0.0.0
collector.receiver.stat.udp.port=9995
collector.receiver.stat.udp.receiveBufferSize=4194304
## required linux kernel 3.9 & java 9+
collector.receiver.stat.udp.reuseport=false
## If not set, follow the cpu count automatically.
#collector.receiver.stat.udp.socket.count=1

# Should keep in mind that TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
collector.receiver.stat.tcp=false
collector.receiver.stat.tcp.ip=0.0.0.0
collector.receiver.stat.tcp.port=9995

collector.receiver.stat.tcp.request.timeout=3000
collector.receiver.stat.tcp.closewait.timeout=3000
# 5 min
collector.receiver.stat.tcp.ping.interval=300000
# 30 min
collector.receiver.stat.tcp.pingwait.timeout=1800000

# number of udp statworker threads
collector.receiver.stat.worker.threadSize=8
# capacity of udp statworker queue
collector.receiver.stat.worker.queueSize=64
# monitoring for udp stat worker
collector.receiver.stat.worker.monitor=true


# span receiver config  ---------------------------------------------------------------------
collector.receiver.span.udp=true
collector.receiver.span.udp.ip=0.0.0.0
collector.receiver.span.udp.port=9996
collector.receiver.span.udp.receiveBufferSize=4194304
## required linux kernel 3.9 & java 9+
collector.receiver.span.udp.reuseport=false
## If not set, follow the cpu count automatically.
#collector.receiver.span.udp.socket.count=1


# Should keep in mind that TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
collector.receiver.span.tcp=false
collector.receiver.span.tcp.ip=0.0.0.0
collector.receiver.span.tcp.port=9996

collector.receiver.span.tcp.request.timeout=3000
collector.receiver.span.tcp.closewait.timeout=3000
# 5 min
collector.receiver.span.tcp.ping.interval=300000
# 30 min
collector.receiver.span.tcp.pingwait.timeout=1800000

# number of udp statworker threads
collector.receiver.span.worker.threadSize=32
# capacity of udp statworker queue
collector.receiver.span.worker.queueSize=256
# monitoring for udp stat worker
collector.receiver.span.worker.monitor=true


# configure l4 ip address to ignore health check logs
# support raw address and CIDR address (Ex:10.0.0.1,10.0.0.1/24)
collector.l4.ip=

# change OS level read/write socket buffer size (for linux)
#sudo sysctl -w net.core.rmem_max=
#sudo sysctl -w net.core.wmem_max=
# check current values using:
#$ /sbin/sysctl -a | grep -e rmem -e wmem

# number of agent event worker threads
collector.agentEventWorker.threadSize=4
# capacity of agent event worker queue
collector.agentEventWorker.queueSize=1024

# Determines whether to register the information held by com.navercorp.pinpoint.collector.monitor.CollectorMetric to jmx
collector.metric.jmx=true
collector.metric.jmx.domain=pinpoint.collector.metrics

statistics.flushPeriod=1000
# Use the statistics agent status.
collector.statistics.agent-state.enable=true


# -------------------------------------------------------------------------------------------------
# The cluster related options are used to establish connections between the agent, collector, and web in order to send/receive data between them in real time.
# You may enable additional features using this option (Ex : RealTime Active Thread Chart).
# -------------------------------------------------------------------------------------------------
# Usage : Set the following options for collector/web components that reside in the same cluster in order to enable this feature.
# 1. cluster.enable (pinpoint-web.properties, pinpoint-collector-root.properties) - "true" to enable
# 2. cluster.zookeeper.address (pinpoint-web.properties, pinpoint-collector-root.properties) - address of the ZooKeeper instance that will be used to manage the cluster
# 3. cluster.web.tcp.port (pinpoint-web.properties) - any available port number (used to establish connection between web and collector)
# -------------------------------------------------------------------------------------------------
# Please be aware of the following:
#1. If the network between web, collector, and the agents are not stable, it is advisable not to use this feature.
#2. We recommend using the cluster.web.tcp.port option. However, in cases where the collector is unable to establish connection to the web, you may reverse this and make the web establish connection to the collector.
#   In this case, you must set cluster.connect.address (pinpoint-web.properties); and cluster.listen.ip, cluster.listen.port (pinpoint-collector-root.properties) accordingly.
cluster.enable=true
cluster.zookeeper.address=pp-01,pp-02,pp-03
cluster.zookeeper.znode_root=/pinpoint-cluster
cluster.zookeeper.sessiontimeout=30000
cluster.listen.ip=
cluster.listen.port=-1

#collector.admin.password=
#collector.admin.api.rest.active=
#collector.admin.api.jmx.active=

collector.spanEvent.sequence.limit=10000

# Specifies the size to store data before flushing from CachedStatisticsDao.
# The default is -1. If it is -1, there is no limit.
collector.cachedStatDao.caller.limit=-1
collector.cachedStatDao.callee.limit=-1
collector.cachedStatDao.self.limit=-1
collector.cachedStatDao.bulk.enable=true
collector.map-link.avg.enable=true
collector.map-link.max.enable=true

# Flink configuration
flink.cluster.enable=true
flink.cluster.zookeeper.address=pp-01,pp-02,pp-03
flink.cluster.zookeeper.znode_root=/pinpoint-cluster
flink.cluster.zookeeper.sessiontimeout=3000
flink.cluster.zookeeper.retry.interval=5000

sjkcdpc avatar Jul 22 '22 14:07 sjkcdpc

Hi @sjkcdpc Please set this property. Put the IP of the flink server you configured at the batch.flink.server configuration.

#flink server list
batch.flink.server= 

minwoo-jung avatar Jul 25 '22 02:07 minwoo-jung

Already set
image

sjkcdpc avatar Jul 25 '22 03:07 sjkcdpc

Are you using flink version 1.14.5? pinpoint 2.3 does not support flink 1.14. It is recommended to downgrade the flink version or upgrage the pinpoint version. See the flink capatibility table at https://github.com/pinpoint-apm/pinpoint. image

minwoo-jung avatar Jul 25 '22 06:07 minwoo-jung

Flink has been modified to flink-1.7.2-bin-hadoop27-scala_ 2.12. Tgz reports the same error

sjkcdpc avatar Jul 25 '22 07:07 sjkcdpc

[root@pp-02 ~]# tail -f /var/log/pinpoint-collector.log 07-25 16:26:20.020 [nt-Worker(10-0)] INFO c.n.p.r.c.DefaultPinpointClientHandler : DefaultPinpointClientHandler@26cb8041 handleHandshakePacket() started. message:ControlHandshakeResponsePacket{requestId=0, payloadLength=72} 07-25 16:26:20.020 [nt-Worker(10-0)] INFO c.n.p.r.c.PinpointClientHandlerState : DefaultPinpointClientHandler@26cb8041 stateTo() completed. Socket state change success(updateWanted:RUN_DUPLEX ,before:RUN_WITHOUT_HANDSHAKE ,current:RUN_DUPLEX). 07-25 16:26:20.020 [nt-Worker(10-0)] INFO c.n.p.r.c.DefaultPinpointClientHandler : [id: 0xde233311, /192.168.5.206:36092 => /192.168.5.201:9997] handleHandshakePacket() completed. code:DUPLEX_COMMUNICATION 07-25 16:26:30.030 [pcStat-Worker-2] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:26:47.047 [2181@0x5ce8d869] INFO o.a.z.ZooKeeper : Session: 0x1002daa1f570122 closed 07-25 16:26:47.047 [869-EventThread] INFO o.a.z.ClientCnxn : EventThread shut down for session: 0x1002daa1f570122 07-25 16:27:00.000 [pcStat-Worker-3] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:27:20.020 [ler-Timer(25-0)] INFO c.n.p.r.c.PinpointClientHandshaker : PinpointClientHandshaker@d426da0 HandshakeJob completed. 07-25 16:27:30.030 [pcStat-Worker-4] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:28:00.000 [pcStat-Worker-5] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:28:30.030 [pcStat-Worker-6] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:29:00.000 [pcStat-Worker-7] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:29:30.030 [pcStat-Worker-8] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:30:00.000 [pcStat-Worker-9] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:30:30.030 [cStat-Worker-10] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:31:00.000 [cStat-Worker-11] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:31:30.030 [cStat-Worker-12] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:32:00.000 [cStat-Worker-13] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:32:30.030 [cStat-Worker-14] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:33:00.000 [cStat-Worker-15] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:33:30.030 [cStat-Worker-16] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:34:00.000 [cStat-Worker-17] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:34:30.030 [cStat-Worker-18] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:35:00.000 [cStat-Worker-19] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:35:30.030 [cStat-Worker-20] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null. 07-25 16:36:00.000 [cStat-Worker-21] WARN c.n.p.c.s.SendAgentStatService : not send flink server. Because FlinkTcpDataSender is null.

Please ask why the collector component prompts this message. Is it because the Flink server address is configured incorrectly or does not take effect

sjkcdpc avatar Jul 25 '22 08:07 sjkcdpc

@sjkcdpc It would be better to make a direct call to http://pp-01:8081/jobs/overview url and check if a response is received.

Also, looking at the collector log, the job does not appear to be running in flink.

minwoo-jung avatar Jul 27 '22 06:07 minwoo-jung

image image

sjkcdpc avatar Aug 02 '22 02:08 sjkcdpc

@sjkcdpc There is no flink job running. Run the job referring to how to run a job on the flink site.

minwoo-jung avatar Aug 09 '22 07:08 minwoo-jung