chunjun SQL模式下:kafka-hive,无法插入数据
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
SQL语句: CREATE TABLE source ( id int, device_id string ) WITH ( 'connector' = 'kafka-x', 'topic' = 'test', 'properties.bootstrap.servers' = '172.18.8.203:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE test111 ( id int, device_id string ) WITH (
'connector' = 'hive-x',
'default-fs' = 'hdfs://hadoop02:8020',
'file-type' = 'text',
'url' = 'jdbc:hive2://172.18.8.208:10000/ljgk_dw',
'field-delimiter' = ',',
'write-mode' = 'append',
'partition' = 'pt',
'partition-type' = 'DAY',
'sink.parallelism' = '1',
'table-name' = 'test222221'
);
insert into test111 select * from source
What you expected to happen
C:\develop\jdk8\bin\java.exe -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:49251,suspend=y,server=n -javaagent:C:\Users\admin\AppData\Local\JetBrains\IntelliJIdea2020.3\captureAgent\debugger-agent.jar -Dfile.encoding=UTF-8 -classpath C:\Users\admin\AppData\Local\Temp\classpath238535005.jar com.dtstack.flinkx.local.test.LocalTest
Connected to the target VM, address: '127.0.0.1:49251', transport: 'socket'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/admin/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/admin/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/admin/.m2/repository/com/dtstack/flinkx/flinkx-connector-hbase-1.4/1.12-SNAPSHOT/flinkx-connector-hbase-1.4-1.12-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2023-03-09 15:18:18,700 - 1 WARN [main] com.dtstack.flinkx.local.test.LocalTest:-----
2023-03-09 15:18:18,860 - 161 INFO [main] com.dtstack.flinkx.Main:------------program params-------------------------
2023-03-09 15:18:18,939 - 240 INFO [main] com.dtstack.flinkx.Main:-mode
2023-03-09 15:18:18,940 - 241 INFO [main] com.dtstack.flinkx.Main:local
2023-03-09 15:18:18,942 - 243 INFO [main] com.dtstack.flinkx.Main:-jobType
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:sql
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:9999
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:-job
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:CREATE+TABLE+source%0A%28%0A++++id++++++++int%2C%0A++++device_id+string%0A%29+WITH+%28%0A++++++%27connector%27+%3D+%27kafka-x%27%2C%0A++++++%27topic%27+%3D+%27test%27%2C%0A++++++%27properties.bootstrap.servers%27+%3D+%27172.18.8.203%3A9092%27%2C%0A++++++%27scan.startup.mode%27+%3D+%27earliest-offset%27%2C%0A++++++%27format%27+%3D+%27json%27%0A++++++%29%3B%0ACREATE+TABLE+test111%0A%28%0A++++id++++++++int%2C%0A++++device_id+string%0A%29+WITH+%28%0A%0A++++++%27connector%27+%3D+%27hive-x%27%2C%0A++++++%27default-fs%27+%3D+%27hdfs%3A%2F%2Fhadoop02%3A8020%27%2C%0A++++++%27file-type%27+%3D+%27text%27%2C%0A++++++%27url%27+%3D+%27jdbc%3Ahive2%3A%2F%2F172.18.8.208%3A10000%2Fljgk_dw%27%2C%0A++++++%27field-delimiter%27+%3D+%27%2C%27%2C%0A++++++%27write-mode%27+%3D+%27append%27%2C%0A++++++%27partition%27+%3D+%27pt%27%2C%0A++++++%27partition-type%27+%3D+%27DAY%27%2C%0A++++++%27sink.parallelism%27+%3D+%271%27%2C%0A++++++%27table-name%27+%3D+%27test222221%27%0A++++++%29%3B%0Ainsert+into+test111%0Aselect+%0Afrom+source%0A
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:-jobName
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:flinkStreamSQLLocalTest-catalog
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:-flinkxDistDir
2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:F:\代码\Flinkx-git\flinkx-1.12_release/flinkx-dist
2023-03-09 15:18:18,944 - 245 INFO [main] com.dtstack.flinkx.Main:-remoteFlinkxDistDir
2023-03-09 15:18:18,944 - 245 INFO [main] com.dtstack.flinkx.Main:F:\代码\Flinkx-git\flinkx-1.12_release/flinkx-dist
2023-03-09 15:18:18,944 - 245 INFO [main] com.dtstack.flinkx.Main:-pluginLoadMode
2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:LocalTest
2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:-addjar
2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:[
"F:\app\original-engery-udf-1.0-SNAPSHOT.jar"
]
2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:-------------------------------------------
2023-03-09 15:18:21,240 - 2541 INFO [main] com.dtstack.flinkx.Main:------------------------------SQL验证------------------------------------
2023-03-09 15:18:22,103 - 3404 INFO [main] com.dtstack.flinkx.Main:*********************
2023-03-09 15:18:22,104 - 3405 INFO [main] com.dtstack.flinkx.Main:validateSQL name is flinkStreamSQLLocalTest-catalog .
2023-03-09 15:18:22,104 - 3405 INFO [main] com.dtstack.flinkx.Main:**********************
2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
2023-03-09 15:18:24,315 - 5616 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting Flink Mini Cluster
2023-03-09 15:18:24,317 - 5618 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting Metrics Registry
2023-03-09 15:18:24,367 - 5668 INFO [main] org.apache.flink.runtime.metrics.MetricRegistryImpl:No metrics reporter configured, no metrics will be exposed/reported.
2023-03-09 15:18:24,368 - 5669 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting RPC Service(s)
2023-03-09 15:18:24,392 - 5693 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Trying to start local actor system
2023-03-09 15:18:24,741 - 6042 INFO [flink-akka.actor.default-dispatcher-3] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1:Slf4jLogger started
2023-03-09 15:18:25,141 - 6442 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Actor system started at akka://flink
2023-03-09 15:18:25,162 - 6463 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Trying to start local actor system
2023-03-09 15:18:25,171 - 6472 INFO [flink-metrics-2] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1:Slf4jLogger started
2023-03-09 15:18:25,527 - 6828 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Actor system started at akka://flink-metrics
2023-03-09 15:18:25,542 - 6843 INFO [main] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService .
2023-03-09 15:18:25,619 - 6920 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting high-availability services
2023-03-09 15:18:25,637 - 6938 INFO [main] org.apache.flink.runtime.blob.BlobServer:Created BLOB server storage directory C:\Users\admin\AppData\Local\Temp\blobStore-b69f1605-3fff-4ec1-bacb-aa792480b73f
2023-03-09 15:18:25,650 - 6951 INFO [main] org.apache.flink.runtime.blob.BlobServer:Started BLOB server at 0.0.0.0:49266 - max concurrent requests: 50 - max backlog: 1000
2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.blob.AbstractBlobCache:Created BLOB cache storage directory C:\Users\admin\AppData\Local\Temp\blobStore-f6ac11c5-d503-43ca-8bf5-7828a329ca63
2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.blob.AbstractBlobCache:Created BLOB cache storage directory C:\Users\admin\AppData\Local\Temp\blobStore-0550fdbe-6a9b-4ae1-85ac-267e54d62b5d
2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting 1 TaskManger(s)
2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.taskexecutor.TaskManagerRunner:Starting TaskManager with ResourceID: 1155f5bc-ef55-45e7-94cf-619071db1006
2023-03-09 15:18:25,685 - 6986 INFO [main] org.apache.flink.runtime.taskexecutor.TaskManagerServices:Temporary file directory 'C:\Users\admin\AppData\Local\Temp': total 476 GB, usable 212 GB (44.54% usable)
2023-03-09 15:18:25,685 - 6986 INFO [main] org.apache.flink.runtime.io.disk.FileChannelManagerImpl:FileChannelManager uses directory C:\Users\admin\AppData\Local\Temp\flink-io-9fb184f0-d249-466a-802d-13b3da3a53e6 for spill files.
2023-03-09 15:18:25,703 - 7004 INFO [main] org.apache.flink.runtime.io.disk.FileChannelManagerImpl:FileChannelManager uses directory C:\Users\admin\AppData\Local\Temp\flink-netty-shuffle-643d8669-c769-4198-8ba7-3df5dd91b199 for spill files.
2023-03-09 15:18:25,774 - 7075 INFO [main] org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
2023-03-09 15:18:25,788 - 7089 INFO [main] org.apache.flink.runtime.io.network.NettyShuffleEnvironment:Starting the network environment and its components.
2023-03-09 15:18:25,791 - 7092 INFO [main] org.apache.flink.runtime.taskexecutor.KvStateService:Starting the kvState service and its components.
2023-03-09 15:18:25,828 - 7129 INFO [main] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 .
2023-03-09 15:18:25,847 - 7148 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService:Start job leader service.
2023-03-09 15:18:25,849 - 7150 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.filecache.FileCache:User file cache uses directory C:\Users\admin\AppData\Local\Temp\flink-dist-cache-ef2f1fd6-3d61-496e-8649-1ae5e4470a98
2023-03-09 15:18:25,975 - 7276 INFO [main] org.apache.flink.runtime.rest.RestServerEndpoint:Starting rest endpoint.
2023-03-09 15:18:26,079 - 7380 WARN [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation:Log file environment variable 'log.file' is not set.
2023-03-09 15:18:26,079 - 7380 WARN [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation:JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
2023-03-09 15:18:26,368 - 7669 INFO [main] org.apache.flink.runtime.rest.RestServerEndpoint:Rest endpoint listening at localhost:49335
2023-03-09 15:18:26,370 - 7671 INFO [main] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender http://localhost:49335
2023-03-09 15:18:26,375 - 7676 INFO [main] org.apache.flink.runtime.webmonitor.WebMonitorEndpoint:Web frontend listening at http://localhost:49335.
2023-03-09 15:18:26,375 - 7676 INFO [mini-cluster-io-thread-1] org.apache.flink.runtime.webmonitor.WebMonitorEndpoint:http://localhost:49335 was granted leadership with leaderSessionID=d9f0be3f-6888-4da4-be88-1c90e354bc2b
2023-03-09 15:18:26,376 - 7677 INFO [mini-cluster-io-thread-1] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader http://localhost:49335 , session=d9f0be3f-6888-4da4-be88-1c90e354bc2b
2023-03-09 15:18:26,393 - 7694 INFO [main] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/rpc/resourcemanager_1 .
2023-03-09 15:18:26,409 - 7710 INFO [main] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender LeaderContender: DefaultDispatcherRunner
2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender LeaderContender: StandaloneResourceManager
2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.resourcemanager.ResourceManager:ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted leadership with fencing token 831bb6a791eadb69fef4407a536a4646
2023-03-09 15:18:26,409 - 7710 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Flink Mini Cluster started successfully
2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-2] org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess:Start SessionDispatcherLeaderProcess.
2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl:Starting the SlotManager.
2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess:Recover all persisted job graphs.
2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess:Successfully recovered 0 persisted job graphs.
2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-6] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader akka://flink/user/rpc/resourcemanager_1 , session=fef4407a-536a-4646-831b-b6a791eadb69
2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.TaskExecutor:Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(831bb6a791eadb69fef4407a536a4646).
2023-03-09 15:18:26,427 - 7728 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_2 .
2023-03-09 15:18:26,434 - 7735 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader akka://flink/user/rpc/dispatcher_2 , session=756fd14e-b1e6-4362-8a4c-c46b2c922984
2023-03-09 15:18:26,442 - 7743 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.registration.RetryingRegistration:Resolved ResourceManager address, beginning registration
2023-03-09 15:18:26,448 - 7749 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.resourcemanager.ResourceManager:Registering TaskManager with ResourceID 1155f5bc-ef55-45e7-94cf-619071db1006 (akka://flink/user/rpc/taskmanager_0) at ResourceManager
2023-03-09 15:18:26,449 - 7750 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection:Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 60be916d7f7b7b5728ecd55309e58a7b.
2023-03-09 15:18:27,179 - 8480 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.dispatcher.Dispatcher:Received JobGraph submission 8069c47ac05f65366267e27277078b15 (flinkStreamSQLLocalTest-catalog).
2023-03-09 15:18:27,179 - 8480 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.dispatcher.Dispatcher:Submitting job 8069c47ac05f65366267e27277078b15 (flinkStreamSQLLocalTest-catalog).
2023-03-09 15:18:27,194 - 8495 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender LeaderContender: JobManagerRunnerImpl
2023-03-09 15:18:27,201 - 8502 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 .
2023-03-09 15:18:27,207 - 8508 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.jobmaster.JobMaster:Initializing job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15).
2023-03-09 15:18:27,224 - 8525 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.scheduler.DefaultSchedulerFactory:Using restart back off time strategy NoRestartBackoffTimeStrategy for flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15).
2023-03-09 15:18:27,237 - 8538 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:start to buildGraph for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15).
2023-03-09 15:18:27,255 - 8556 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:trying to download shipFile from blobServer for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15).
2023-03-09 15:18:27,256 - 8557 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:Running initialization on master for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15).
2023-03-09 15:18:27,258 - 8559 INFO [mini-cluster-io-thread-12] com.dtstack.flinkx.connector.hive.sink.HiveOutputFormat:timeZone = sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]
2023-03-09 15:18:27,258 - 8559 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:Successfully ran initialization on master in 2 ms.
2023-03-09 15:18:27,260 - 8561 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology:Built 1 pipelined regions in 0 ms
2023-03-09 15:18:27,284 - 8585 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.state.StateBackendLoader:No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2023-03-09 15:18:27,286 - 8587 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.checkpoint.CheckpointCoordinator:No checkpoint found during restore.
2023-03-09 15:18:27,286 - 8587 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.scheduler.DefaultScheduler:Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@206f6c42 for flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15).
2023-03-09 15:18:27,308 - 8609 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl:JobManager runner for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15) was granted leadership with session id 3b22c1fc-230a-4148-97a1-edf4f2a048ae at akka://flink/user/rpc/jobmanager_3.
2023-03-09 15:18:27,311 - 8612 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.jobmaster.JobMaster:Starting execution of job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15) under job master id 97a1edf4f2a048ae3b22c1fc230a4148.
2023-03-09 15:18:27,313 - 8614 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.scheduler.DefaultScheduler:Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2023-03-09 15:18:27,313 - 8614 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.executiongraph.ExecutionGraph:Job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15) switched from state CREATED to RUNNING.
2023-03-09 15:18:27,316 - 8617 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.executiongraph.Execution:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (a5839704277a19cbfeb99cbb5a69e6c0) switched from CREATED to SCHEDULED.
2023-03-09 15:18:27,326 - 8627 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl:Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{f881de42b93343db4a14a89466fe508e}]
2023-03-09 15:18:27,327 - 8628 INFO [jobmanager-future-thread-1] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=3b22c1fc-230a-4148-97a1-edf4f2a048ae
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.jobmaster.JobMaster:Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(831bb6a791eadb69fef4407a536a4646)
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.registration.RetryingRegistration:Resolved ResourceManager address, beginning registration
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.resourcemanager.ResourceManager:Registering job manager 97a1edf4f2a048ae3b22c1fc230a4148@akka://flink/user/rpc/jobmanager_3 for job 8069c47ac05f65366267e27277078b15.
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.resourcemanager.ResourceManager:Registered job manager 97a1edf4f2a048ae3b22c1fc230a4148@akka://flink/user/rpc/jobmanager_3 for job 8069c47ac05f65366267e27277078b15.
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.jobmaster.JobMaster:JobManager successfully registered at ResourceManager, leader id: 831bb6a791eadb69fef4407a536a4646.
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl:Requesting new slot [SlotRequestId{f881de42b93343db4a14a89466fe508e}] and profile ResourceProfile{UNKNOWN} with allocation id 92f93fa071090ca40d0748c6a30235cb from resource manager.
2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.resourcemanager.ResourceManager:Request slot with profile ResourceProfile{UNKNOWN} for job 8069c47ac05f65366267e27277078b15 with allocation id 92f93fa071090ca40d0748c6a30235cb.
2023-03-09 15:18:27,344 - 8645 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Receive slot request 92f93fa071090ca40d0748c6a30235cb for job 8069c47ac05f65366267e27277078b15 from resource manager with leader id 831bb6a791eadb69fef4407a536a4646.
2023-03-09 15:18:27,350 - 8651 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Allocated slot for 92f93fa071090ca40d0748c6a30235cb.
2023-03-09 15:18:27,351 - 8652 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService:Add job 8069c47ac05f65366267e27277078b15 for job leader monitoring.
2023-03-09 15:18:27,353 - 8654 INFO [mini-cluster-io-thread-18] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService$JobManagerLeaderListener:Try to register at job manager akka://flink/user/rpc/jobmanager_3 with leader id 3b22c1fc-230a-4148-97a1-edf4f2a048ae.
2023-03-09 15:18:27,354 - 8655 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.registration.RetryingRegistration:Resolved JobManager address, beginning registration
2023-03-09 15:18:27,358 - 8659 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService$JobManagerLeaderListener$JobManagerRegisteredRpcConnection:Successful registration at job manager akka://flink/user/rpc/jobmanager_3 for job 8069c47ac05f65366267e27277078b15.
2023-03-09 15:18:27,359 - 8660 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Establish JobManager connection for job 8069c47ac05f65366267e27277078b15.
2023-03-09 15:18:27,364 - 8665 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Offer reserved slots to the leader of job 8069c47ac05f65366267e27277078b15.
2023-03-09 15:18:27,369 - 8670 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.executiongraph.Execution:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (a5839704277a19cbfeb99cbb5a69e6c0) switched from SCHEDULED to DEPLOYING.
2023-03-09 15:18:27,370 - 8671 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.executiongraph.Execution:Deploying Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (attempt #0) with attempt id a5839704277a19cbfeb99cbb5a69e6c0 to 1155f5bc-ef55-45e7-94cf-619071db1006 @ 127.0.0.1 (dataPort=-1) with allocation id 92f93fa071090ca40d0748c6a30235cb
2023-03-09 15:18:27,376 - 8677 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl:Activate slot 92f93fa071090ca40d0748c6a30235cb.
2023-03-09 15:18:27,401 - 8702 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Received task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0), deploy into slot with allocation id 92f93fa071090ca40d0748c6a30235cb.
2023-03-09 15:18:27,402 - 8703 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) switched from CREATED to DEPLOYING.
2023-03-09 15:18:27,404 - 8705 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl:Activate slot 92f93fa071090ca40d0748c6a30235cb.
2023-03-09 15:18:27,406 - 8707 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) [DEPLOYING].
2023-03-09 15:18:27,407 - 8708 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) [DEPLOYING].
2023-03-09 15:18:27,407 - 8708 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Obtaining local cache file for 'class_path_1'.
2023-03-09 15:18:27,409 - 8710 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Obtaining local cache file for 'class_path_0'.
2023-03-09 15:18:27,412 - 8713 INFO [flink-file-cache-thread-1] org.apache.flink.runtime.blob.BlobClient:Downloading 8069c47ac05f65366267e27277078b15/p-4f550129062af8dbd019e6805c3af9d0bbd2e053-60d3852f92c3a7214e4e740e4b8308d1 from localhost/127.0.0.1:49266
2023-03-09 15:18:27,412 - 8713 INFO [flink-file-cache-thread-2] org.apache.flink.runtime.blob.BlobClient:Downloading 8069c47ac05f65366267e27277078b15/p-06f38bb20715f51f46ba89c897eb8275e7ab772f-9d048ba37a86e378e6bd2ff160c68900 from localhost/127.0.0.1:49266
2023-03-09 15:18:27,427 - 8728 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.state.StateBackendLoader:No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2023-03-09 15:18:27,443 - 8744 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) switched from DEPLOYING to RUNNING.
2023-03-09 15:18:27,443 - 8744 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.executiongraph.Execution:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (a5839704277a19cbfeb99cbb5a69e6c0) switched from DEPLOYING to RUNNING.
2023-03-09 15:18:27,504 - 8805 WARN [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup:The operator name Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) exceeded the 80 characters length limit and was truncated.
2023-03-09 15:18:27,524 - 8825 WARN [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup:The operator name Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) exceeded the 80 characters length limit and was truncated.
2023-03-09 15:18:27,582 - 8883 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.DtOutputFormatSinkFunction:Start initialize output format state
2023-03-09 15:18:27,670 - 8971 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.DtOutputFormatSinkFunction:Is restored:false
2023-03-09 15:18:27,671 - 8972 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.DtOutputFormatSinkFunction:End initialize output format state
2023-03-09 15:18:27,671 - 8972 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hive.sink.HiveOutputFormat:timeZone = sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]
2023-03-09 15:18:27,692 - 8993 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hive.sink.HiveOutputFormat:tablePath:test222221, rowData:null, even:{schema=null, table=test222221}
2023-03-09 15:18:29,879 - 11180 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Supplied authorities: 172.18.8.208:10000
2023-03-09 15:18:29,879 - 11180 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Resolved authority: 172.18.8.208:10000
2023-03-09 15:18:30,910 - 12211 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseRichOutputFormat:[HiveOutputFormat] open successfully,
checkpointMode = AT_LEAST_ONCE,
checkpointEnabled = false,
flushIntervalMills = 10000,
batchSize = 1,
[HiveConf]:
{
"semantic" : "at-least-once",
"autoCreateTable" : false,
"errorRecord" : 0,
"checkFormat" : true,
"parallelism" : 1,
"flushIntervalMills" : 10000,
"writeMode" : "append",
"fieldDelimiter" : ",",
"tableInfos" : {
"test222221" : {
"columnNameList" : [ "id", "device_id" ],
"columnTypeList" : [ "INT", "STRING" ],
"createTableSql" : "CREATE TABLE %s (id INT,device_id STRING) PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE ",
"tableName" : "test222221",
"tablePath" : "test222221",
"path" : "hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221",
"store" : "TEXT",
"delimiter" : ",",
"partitionList" : [ "pt" ]
}
},
"tableName" : "test222221",
"enableDictionary" : true,
"tablesColumn" : "{"test222221":[{"key":"id","type":"INT"},{"key":"device_id","type":"STRING"}]}",
"rowGroupSize" : 134217728,
"partition" : "pt",
"filterRegex" : "",
"distributeTableMapping" : { },
"nextCheckRows" : 5000,
"errorPercentage" : -1,
"maxFileSize" : 1073741824,
"encoding" : "UTF-8",
"fieldNameList" : [ ],
"partitionType" : "DAY",
"hadoopConfig" : { },
"jdbcUrl" : "jdbc:hive2://172.18.8.208:10000/ljgk_dw",
"defaultFS" : "hdfs://hadoop02:8020",
"batchSize" : 1,
"speedBytes" : 0,
"metricPluginName" : "prometheus",
"fileType" : "text",
"partitionValue" : ""
}
2023-03-09 15:18:30,913 - 12214 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase:Consumer subtask 0 has no restore state.
2023-03-09 15:18:30,914 - 12215 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.KafkaConsumer:Start initialize input format state, is restored:false
2023-03-09 15:18:30,917 - 12218 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.KafkaConsumer:End initialize input format state
2023-03-09 15:18:30,970 - 12271 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [172.18.8.203:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2023-03-09 15:18:31,093 - 12394 WARN [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config. 2023-03-09 15:18:31,099 - 12400 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka version: 2.4.1 2023-03-09 15:18:31,099 - 12400 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka commitId: c57222ae8cd7866b 2023-03-09 15:18:31,099 - 12400 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka startTimeMs: 1678346311093 2023-03-09 15:18:31,696 - 12997 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-1, groupId=null] Cluster ID: t18-k6MJQE6xY7xFvS42Fg 2023-03-09 15:18:31,696 - 12997 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase:Consumer subtask 0 will start reading the following 1 partitions from the earliest offsets: [KafkaTopicPartition{topic='test', partition=0}] 2023-03-09 15:18:31,696 - 12997 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.DynamicKafkaDeserializationSchema:[DynamicKafkaDeserializationSchemaWrapper] open successfully, inputSplit = see other log, [Properties]: { "key.deserializer" : "org.apache.kafka.common.serialization.ByteArrayDeserializer", "value.deserializer" : "org.apache.kafka.common.serialization.ByteArrayDeserializer", "flink.partition-discovery.interval-millis" : "-9223372036854775808", "bootstrap.servers" : "172.18.8.203:9092" } 2023-03-09 15:18:31,718 - 13019 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase:Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='test', partition=0}=-915623761775}. 2023-03-09 15:18:31,757 - 13058 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [172.18.8.203:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = null group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2023-03-09 15:18:31,766 - 13067 WARN [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.
2023-03-09 15:18:31,767 - 13068 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka version: 2.4.1
2023-03-09 15:18:31,768 - 13069 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka commitId: c57222ae8cd7866b
2023-03-09 15:18:31,768 - 13069 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka startTimeMs: 1678346311767
2023-03-09 15:18:31,773 - 13074 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-2, groupId=null] Subscribed to partition(s): test-0
2023-03-09 15:18:31,783 - 13084 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.consumer.internals.SubscriptionState:[Consumer clientId=consumer-2, groupId=null] Seeking to EARLIEST offset of partition test-0
2023-03-09 15:18:31,794 - 13095 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-2, groupId=null] Cluster ID: t18-k6MJQE6xY7xFvS42Fg
2023-03-09 15:18:31,806 - 13107 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.consumer.internals.SubscriptionState:[Consumer clientId=consumer-2, groupId=null] Resetting offset for partition test-0 to offset 51.
2023-03-09 15:18:31,855 - 13156 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.DynamicKafkaDeserializationSchema:receive source data:{
"id": 1,
"device_id": "a"
}
2023-03-09 15:18:31,877 - 13178 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Supplied authorities: 172.18.8.208:10000
2023-03-09 15:18:31,878 - 13179 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Resolved authority: 172.18.8.208:10000
2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'nErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'nullErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'duplicateErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'conversionErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'otherErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'numWrite'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'numWritePerSecond'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'snapshotWrite'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'byteWrite'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'byteWritePerSecond'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'writeDuration'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output]
2023-03-09 15:18:32,102 - 13403 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseFileOutputFormat:Start current File Index:0
2023-03-09 15:18:32,102 - 13403 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseFileOutputFormat:Channel:[0], currentFileNamePrefix:[8069c47ac05f65366267e27277078b15_0]
2023-03-09 15:18:32,288 - 13589 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS
2023-03-09 15:18:32,307 - 13608 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS
2023-03-09 15:18:32,323 - 13624 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS
2023-03-09 15:18:33,301 - 14602 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseRichOutputFormat:[HdfsTextOutputFormat] open successfully,
checkpointMode = AT_LEAST_ONCE,
checkpointEnabled = false,
flushIntervalMills = 10000,
batchSize = 1,
[HiveConf]:
{
"semantic" : "at-least-once",
"autoCreateTable" : false,
"errorRecord" : 0,
"checkFormat" : true,
"parallelism" : 1,
"flushIntervalMills" : 10000,
"writeMode" : "append",
"fieldDelimiter" : ",",
"tableInfos" : {
"test222221" : {
"columnNameList" : [ "id", "device_id" ],
"columnTypeList" : [ "INT", "STRING" ],
"createTableSql" : "CREATE TABLE %s (id INT,device_id STRING) PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE ",
"tableName" : "test222221",
"tablePath" : "test222221",
"path" : "hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221",
"store" : "TEXT",
"delimiter" : ",",
"partitionList" : [ "pt" ]
}
},
"fullColumnName" : [ "id", "device_id" ],
"tableName" : "test222221",
"enableDictionary" : true,
"path" : "hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221/pt=20230309",
"tablesColumn" : "{"test222221":[{"key":"id","type":"INT"},{"key":"device_id","type":"STRING"}]}",
"rowGroupSize" : 134217728,
"partition" : "pt",
"filterRegex" : "",
"distributeTableMapping" : { },
"nextCheckRows" : 5000,
"fullColumnType" : [ "INT", "STRING" ],
"column" : [ {
"name" : "id",
"type" : "INT",
"index" : 0,
"notNull" : false,
"part" : false
}, {
"name" : "device_id",
"type" : "STRING",
"index" : 1,
"notNull" : false,
"part" : false
} ],
"errorPercentage" : -1,
"maxFileSize" : 1073741824,
"encoding" : "UTF-8",
"fieldNameList" : [ ],
"partitionType" : "DAY",
"hadoopConfig" : {
"fs.default.name" : "hdfs://hadoop02:8020",
"fs.hdfs.impl.disable.cache" : "true"
},
"jdbcUrl" : "jdbc:hive2://172.18.8.208:10000/ljgk_dw",
"defaultFS" : "hdfs://hadoop02:8020",
"batchSize" : 1,
"speedBytes" : 0,
"metricPluginName" : "prometheus",
"fileType" : "text",
"partitionValue" : ""
}
2023-03-09 15:18:33,376 - 14677 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS
2023-03-09 15:18:33,512 - 14813 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS
2023-03-09 15:18:33,672 - 14973 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS
2023-03-09 15:18:33,674 - 14975 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hdfs.sink.BaseHdfsOutputFormat:start to delete directory:hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221/pt=20230309.data
2023-03-09 15:18:33,865 - 15166 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hdfs.sink.HdfsTextOutputFormat:subtask:[0] create block file:hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221/pt=20230309.data\8069c47ac05f65366267e27277078b15_0_0
How to reproduce
CREATE TABLE source ( id int, device_id string ) WITH ( 'connector' = 'kafka-x', 'topic' = 'test', 'properties.bootstrap.servers' = '172.18.8.203:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE test111 ( id int, device_id string ) WITH (
'connector' = 'hive-x',
'default-fs' = 'hdfs://hadoop02:8020',
'file-type' = 'text',
'url' = 'jdbc:hive2://172.18.8.208:10000/ljgk_dw',
'field-delimiter' = ',',
'write-mode' = 'append',
'partition' = 'pt',
'partition-type' = 'DAY',
'sink.parallelism' = '1',
'table-name' = 'test222221'
);
insert into test111 select * from source
Anything else
No response
Version
1.12_release
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
看上去是windows运行的任务,文件分隔符导致的
表是自动创建的 服务器上执行也是如此 我把表切分键改为逗号 CREATE TABLE source ( id int, device_id string ) WITH ( 'connector' = 'kafka-x', 'topic' = 'test', 'properties.bootstrap.servers' = '172.18.8.203:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE test111 ( id int, device_id string ) WITH (
'connector' = 'hive-x',
'default-fs' = 'hdfs://hadoop02:8020',
'file-type' = 'text',
'url' = 'jdbc:hive2://172.18.8.208:10000/ljgk_dw',
'field-delimiter' = ',',
'partition' = 'pt',
'partition-type' = 'DAY',
'sink.parallelism' = '1',
'table-name' = 'test33333'
);
insert into test111 select * from source
hdfs上如下:

下面是服务器上执行结果:

服务器上的路径才是对的,但是最后没有将数据从.data移动到真实目录,可以去看下执行这任务时的jobmanager日志,定位下原因。
我们通过File.separatorChar 获取分隔符的,所以你本地windows执行目录是有问题的