chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

chunjun mysql-hive同步性能问题 数据量8000万+

Open biandou1313 opened this issue 2 years ago • 6 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

1、chunjun master分支:执行mysql同步hive 我设置通道为20 但是在实际跑的过程中,还是单通道,也很快就报错 image

What you expected to happen

通过设置多通道 加快同步

How to reproduce

1、执行JSON { "job": { "content": [{ "reader": { "parameter": { "password": "123456", "dataSourceId": 14, "column": [{ "precision": 20, "name": "id", "columnDisplaySize": 20, "type": "BIGINT" }, { "precision": 32, "name": "device_id", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 12, "name": "point", "columnDisplaySize": 12, "type": "VARCHAR" }, { "precision": 32, "name": "hash", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 32, "name": "value", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 19, "name": "acq_time", "columnDisplaySize": 19, "type": "DATETIME" }, { "precision": 12, "name": "ratio", "columnDisplaySize": 12, "type": "FLOAT" }, { "precision": 32, "name": "max_val", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 32, "name": "min_val", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 32, "name": "out_param", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 8, "name": "item_code", "columnDisplaySize": 8, "type": "VARCHAR" }, { "precision": 32, "name": "descr", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 16, "name": "save_hst", "columnDisplaySize": 16, "type": "SMALLINT" }, { "precision": 1, "name": "del_flag", "columnDisplaySize": 1, "type": "TINYINT" }, { "precision": 20, "name": "create_user", "columnDisplaySize": 20, "type": "BIGINT" }, { "precision": 20, "name": "update_user", "columnDisplaySize": 20, "type": "BIGINT" }, { "precision": 19, "name": "create_time", "columnDisplaySize": 19, "type": "DATETIME" }, { "precision": 19, "name": "update_time", "columnDisplaySize": 19, "type": "DATETIME" }, { "precision": 50, "name": "ext_first", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_second", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_third", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_fourth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_fifth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_sixth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_seventh", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_eighth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_ninth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_tenth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "device_add_id", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 8, "name": "today_start_value", "columnDisplaySize": 10, "type": "DECIMAL" }, { "precision": 75, "name": "unit", "columnDisplaySize": 75, "type": "VARCHAR" }], "connection": [{ "jdbcUrl": ["jdbc:mysql://172.18.8.209:3306/Vasyslink_yag001?useSSL=false&useUnicode=true&characterEncoding=utf8&useCursorFetch=true"], "table": ["tb_device_point_data_hst_01"] }],

				"splitPk": "id",
				"username": "root"
			},
			"name": "mysqlreader"
		},
		"writer": {
			"parameter": {
				"tablesColumn": "{\"tb_device_point_data_hst_940\":[{\"key\":\"id\",\"type\":\"bigint\",\"precision\":19,\"columnDisplaySize\":20},{\"key\":\"device_id\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"point\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"hash\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"value\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"acq_time\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ratio\",\"type\":\"float\",\"precision\":7,\"columnDisplaySize\":24},{\"key\":\"max_val\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"min_val\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"out_param\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"item_code\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"descr\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"save_hst\",\"type\":\"smallint\",\"precision\":5,\"columnDisplaySize\":6},{\"key\":\"del_flag\",\"type\":\"tinyint\",\"precision\":3,\"columnDisplaySize\":4},{\"key\":\"create_user\",\"type\":\"bigint\",\"precision\":19,\"columnDisplaySize\":20},{\"key\":\"update_user\",\"type\":\"bigint\",\"precision\":19,\"columnDisplaySize\":20},{\"key\":\"create_time\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"update_time\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_first\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_second\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_third\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_fourth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_fifth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_sixth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_seventh\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_eighth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_ninth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_tenth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"device_add_id\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"today_start_value\",\"type\":\"decimal\",\"precision\":8,\"columnDisplaySize\":10},{\"key\":\"unit\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647}]}",
				"dataSourceId": 54,
				"partition": "pt",
				"jdbcUrl": "jdbc:hive2://172.18.8.208:10000/Vasyslink_yag001",
				"defaultFS": "hdfs://172.18.8.207:8020",
				"writeMode": "overwrite",
				"maxFileSize": 1073741824,
	
				"fieldDelimiter": "\t",
		
                     "partitionType" : "DAY",
				"fileType": "text",
				"charsetName": "UTF-8"
			},
			"name": "hivewriter"
		}
	}],
	"setting": {
		"log": {
			"isLogger": false
		},
		"errorLimit": {},
		"speed": {

			"channel": 20
		}
	}
}

}

2、flink confi配置 jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 4096m taskmanager.memory.process.size: 16384m #taskmanager.memory.process.size: 102400m #jobmanager.memory.process.size: 16384m #taskmanager.memory.process.size: 32768m taskmanager.numberOfTaskSlots: 2 parallelism.default: 5 #jobmanager.execution.failover-strategy: region restart-strategy: none #restart-strategy: failure-rate #restart-strategy.failure-rate.max-failures-per-interval: 3 #restart-strategy.failure-rate.failure-rate-interval: 5 min #restart-strategy.failure-rate.delay: 10 s

heartbeat.timeout: 1800000

rest.bind-port: 50031-50040 #断点续传的环境准备 state.checkpoints.dir: hdfs://hadoop02:8020/checkpoints/metadata state.checkpoints.num-retained: 10

与 Prometheus 集成配置

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

这里写PushGateway的主机名与端口号

#metrics.reporter.promgateway.host: pushgateway.software.dc metrics.reporter.promgateway.host: 172.18.8.211 #metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.port: 9091

Flink metric在前端展示的标签(前缀)与随机后缀

metrics.reporter.promgateway.jobName: flink-metrics metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.interval: 30 SECONDS

Anything else

No response

Version

master

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

biandou1313 avatar Aug 16 '22 06:08 biandou1313

看图上并行度是10 可以提供一下具体的报错信息么

Paddy0523 avatar Aug 17 '22 05:08 Paddy0523

image image

biandou1313 avatar Aug 17 '22 06:08 biandou1313

看图上并行度是10 可以提供一下具体的报错信息么

大佬 微信多少 一起解决这个性能问题 我的微信biandou1l1111

biandou1313 avatar Aug 17 '22 06:08 biandou1313

image image image image

biandou1313 avatar Aug 17 '22 06:08 biandou1313

image

biandou1313 avatar Aug 17 '22 06:08 biandou1313

2022-08-17 14:21:31 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: java.lang.Boolean cannot be cast to java.lang.Byte at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:135) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Byte at com.dtstack.chunjun.typeutil.serializer.base.ByteColumnSerializer.serialize(ByteColumnSerializer.java:85) at com.dtstack.chunjun.typeutil.serializer.base.ByteColumnSerializer.serialize(ByteColumnSerializer.java:38) at com.dtstack.chunjun.typeutil.serializer.ColumnRowDataSerializer.serialize(ColumnRowDataSerializer.java:142) at com.dtstack.chunjun.typeutil.serializer.ColumnRowDataSerializer.serialize(ColumnRowDataSerializer.java:47) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) ... 10 more

biandou1313 avatar Aug 17 '22 06:08 biandou1313