chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

chunjun SQL模式下:kafka-hive,无法插入数据

Open biandou1313 opened this issue 2 years ago • 5 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

SQL语句: CREATE TABLE source ( id int, device_id string ) WITH ( 'connector' = 'kafka-x', 'topic' = 'test', 'properties.bootstrap.servers' = '172.18.8.203:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE test111 ( id int, device_id string ) WITH (

  'connector' = 'hive-x',
  'default-fs' = 'hdfs://hadoop02:8020',
  'file-type' = 'text',
  'url' = 'jdbc:hive2://172.18.8.208:10000/ljgk_dw',
  'field-delimiter' = ',',
  'write-mode' = 'append',
  'partition' = 'pt',
  'partition-type' = 'DAY',
  'sink.parallelism' = '1',
  'table-name' = 'test222221'
  );

insert into test111 select * from source

What you expected to happen

C:\develop\jdk8\bin\java.exe -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:49251,suspend=y,server=n -javaagent:C:\Users\admin\AppData\Local\JetBrains\IntelliJIdea2020.3\captureAgent\debugger-agent.jar -Dfile.encoding=UTF-8 -classpath C:\Users\admin\AppData\Local\Temp\classpath238535005.jar com.dtstack.flinkx.local.test.LocalTest Connected to the target VM, address: '127.0.0.1:49251', transport: 'socket' SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/C:/Users/admin/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/Users/admin/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/Users/admin/.m2/repository/com/dtstack/flinkx/flinkx-connector-hbase-1.4/1.12-SNAPSHOT/flinkx-connector-hbase-1.4-1.12-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2023-03-09 15:18:18,700 - 1 WARN [main] com.dtstack.flinkx.local.test.LocalTest:----- 2023-03-09 15:18:18,860 - 161 INFO [main] com.dtstack.flinkx.Main:------------program params------------------------- 2023-03-09 15:18:18,939 - 240 INFO [main] com.dtstack.flinkx.Main:-mode 2023-03-09 15:18:18,940 - 241 INFO [main] com.dtstack.flinkx.Main:local 2023-03-09 15:18:18,942 - 243 INFO [main] com.dtstack.flinkx.Main:-jobType 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:sql 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:9999 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:-job 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:CREATE+TABLE+source%0A%28%0A++++id++++++++int%2C%0A++++device_id+string%0A%29+WITH+%28%0A++++++%27connector%27+%3D+%27kafka-x%27%2C%0A++++++%27topic%27+%3D+%27test%27%2C%0A++++++%27properties.bootstrap.servers%27+%3D+%27172.18.8.203%3A9092%27%2C%0A++++++%27scan.startup.mode%27+%3D+%27earliest-offset%27%2C%0A++++++%27format%27+%3D+%27json%27%0A++++++%29%3B%0ACREATE+TABLE+test111%0A%28%0A++++id++++++++int%2C%0A++++device_id+string%0A%29+WITH+%28%0A%0A++++++%27connector%27+%3D+%27hive-x%27%2C%0A++++++%27default-fs%27+%3D+%27hdfs%3A%2F%2Fhadoop02%3A8020%27%2C%0A++++++%27file-type%27+%3D+%27text%27%2C%0A++++++%27url%27+%3D+%27jdbc%3Ahive2%3A%2F%2F172.18.8.208%3A10000%2Fljgk_dw%27%2C%0A++++++%27field-delimiter%27+%3D+%27%2C%27%2C%0A++++++%27write-mode%27+%3D+%27append%27%2C%0A++++++%27partition%27+%3D+%27pt%27%2C%0A++++++%27partition-type%27+%3D+%27DAY%27%2C%0A++++++%27sink.parallelism%27+%3D+%271%27%2C%0A++++++%27table-name%27+%3D+%27test222221%27%0A++++++%29%3B%0Ainsert+into+test111%0Aselect+%0Afrom+source%0A 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:-jobName 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:flinkStreamSQLLocalTest-catalog 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:-flinkxDistDir 2023-03-09 15:18:18,943 - 244 INFO [main] com.dtstack.flinkx.Main:F:\代码\Flinkx-git\flinkx-1.12_release/flinkx-dist 2023-03-09 15:18:18,944 - 245 INFO [main] com.dtstack.flinkx.Main:-remoteFlinkxDistDir 2023-03-09 15:18:18,944 - 245 INFO [main] com.dtstack.flinkx.Main:F:\代码\Flinkx-git\flinkx-1.12_release/flinkx-dist 2023-03-09 15:18:18,944 - 245 INFO [main] com.dtstack.flinkx.Main:-pluginLoadMode 2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:LocalTest 2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:-addjar 2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:[ "F:\app\original-engery-udf-1.0-SNAPSHOT.jar" ] 2023-03-09 15:18:18,946 - 247 INFO [main] com.dtstack.flinkx.Main:------------------------------------------- 2023-03-09 15:18:21,240 - 2541 INFO [main] com.dtstack.flinkx.Main:------------------------------SQL验证------------------------------------ 2023-03-09 15:18:22,103 - 3404 INFO [main] com.dtstack.flinkx.Main:********************* 2023-03-09 15:18:22,104 - 3405 INFO [main] com.dtstack.flinkx.Main:validateSQL name is flinkStreamSQLLocalTest-catalog . 2023-03-09 15:18:22,104 - 3405 INFO [main] com.dtstack.flinkx.Main:********************** 2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 2023-03-09 15:18:24,241 - 5542 INFO [main] org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils:The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 2023-03-09 15:18:24,315 - 5616 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting Flink Mini Cluster 2023-03-09 15:18:24,317 - 5618 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting Metrics Registry 2023-03-09 15:18:24,367 - 5668 INFO [main] org.apache.flink.runtime.metrics.MetricRegistryImpl:No metrics reporter configured, no metrics will be exposed/reported. 2023-03-09 15:18:24,368 - 5669 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting RPC Service(s) 2023-03-09 15:18:24,392 - 5693 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Trying to start local actor system 2023-03-09 15:18:24,741 - 6042 INFO [flink-akka.actor.default-dispatcher-3] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1:Slf4jLogger started 2023-03-09 15:18:25,141 - 6442 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Actor system started at akka://flink 2023-03-09 15:18:25,162 - 6463 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Trying to start local actor system 2023-03-09 15:18:25,171 - 6472 INFO [flink-metrics-2] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1:Slf4jLogger started 2023-03-09 15:18:25,527 - 6828 INFO [main] org.apache.flink.runtime.clusterframework.BootstrapTools:Actor system started at akka://flink-metrics 2023-03-09 15:18:25,542 - 6843 INFO [main] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService . 2023-03-09 15:18:25,619 - 6920 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting high-availability services 2023-03-09 15:18:25,637 - 6938 INFO [main] org.apache.flink.runtime.blob.BlobServer:Created BLOB server storage directory C:\Users\admin\AppData\Local\Temp\blobStore-b69f1605-3fff-4ec1-bacb-aa792480b73f 2023-03-09 15:18:25,650 - 6951 INFO [main] org.apache.flink.runtime.blob.BlobServer:Started BLOB server at 0.0.0.0:49266 - max concurrent requests: 50 - max backlog: 1000 2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.blob.AbstractBlobCache:Created BLOB cache storage directory C:\Users\admin\AppData\Local\Temp\blobStore-f6ac11c5-d503-43ca-8bf5-7828a329ca63 2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.blob.AbstractBlobCache:Created BLOB cache storage directory C:\Users\admin\AppData\Local\Temp\blobStore-0550fdbe-6a9b-4ae1-85ac-267e54d62b5d 2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Starting 1 TaskManger(s) 2023-03-09 15:18:25,659 - 6960 INFO [main] org.apache.flink.runtime.taskexecutor.TaskManagerRunner:Starting TaskManager with ResourceID: 1155f5bc-ef55-45e7-94cf-619071db1006 2023-03-09 15:18:25,685 - 6986 INFO [main] org.apache.flink.runtime.taskexecutor.TaskManagerServices:Temporary file directory 'C:\Users\admin\AppData\Local\Temp': total 476 GB, usable 212 GB (44.54% usable) 2023-03-09 15:18:25,685 - 6986 INFO [main] org.apache.flink.runtime.io.disk.FileChannelManagerImpl:FileChannelManager uses directory C:\Users\admin\AppData\Local\Temp\flink-io-9fb184f0-d249-466a-802d-13b3da3a53e6 for spill files. 2023-03-09 15:18:25,703 - 7004 INFO [main] org.apache.flink.runtime.io.disk.FileChannelManagerImpl:FileChannelManager uses directory C:\Users\admin\AppData\Local\Temp\flink-netty-shuffle-643d8669-c769-4198-8ba7-3df5dd91b199 for spill files. 2023-03-09 15:18:25,774 - 7075 INFO [main] org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768). 2023-03-09 15:18:25,788 - 7089 INFO [main] org.apache.flink.runtime.io.network.NettyShuffleEnvironment:Starting the network environment and its components. 2023-03-09 15:18:25,791 - 7092 INFO [main] org.apache.flink.runtime.taskexecutor.KvStateService:Starting the kvState service and its components. 2023-03-09 15:18:25,828 - 7129 INFO [main] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 . 2023-03-09 15:18:25,847 - 7148 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService:Start job leader service. 2023-03-09 15:18:25,849 - 7150 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.filecache.FileCache:User file cache uses directory C:\Users\admin\AppData\Local\Temp\flink-dist-cache-ef2f1fd6-3d61-496e-8649-1ae5e4470a98 2023-03-09 15:18:25,975 - 7276 INFO [main] org.apache.flink.runtime.rest.RestServerEndpoint:Starting rest endpoint. 2023-03-09 15:18:26,079 - 7380 WARN [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation:Log file environment variable 'log.file' is not set. 2023-03-09 15:18:26,079 - 7380 WARN [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation:JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 2023-03-09 15:18:26,368 - 7669 INFO [main] org.apache.flink.runtime.rest.RestServerEndpoint:Rest endpoint listening at localhost:49335 2023-03-09 15:18:26,370 - 7671 INFO [main] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender http://localhost:49335 2023-03-09 15:18:26,375 - 7676 INFO [main] org.apache.flink.runtime.webmonitor.WebMonitorEndpoint:Web frontend listening at http://localhost:49335. 2023-03-09 15:18:26,375 - 7676 INFO [mini-cluster-io-thread-1] org.apache.flink.runtime.webmonitor.WebMonitorEndpoint:http://localhost:49335 was granted leadership with leaderSessionID=d9f0be3f-6888-4da4-be88-1c90e354bc2b 2023-03-09 15:18:26,376 - 7677 INFO [mini-cluster-io-thread-1] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader http://localhost:49335 , session=d9f0be3f-6888-4da4-be88-1c90e354bc2b 2023-03-09 15:18:26,393 - 7694 INFO [main] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/rpc/resourcemanager_1 . 2023-03-09 15:18:26,409 - 7710 INFO [main] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender LeaderContender: DefaultDispatcherRunner 2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender LeaderContender: StandaloneResourceManager 2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.resourcemanager.ResourceManager:ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted leadership with fencing token 831bb6a791eadb69fef4407a536a4646 2023-03-09 15:18:26,409 - 7710 INFO [main] org.apache.flink.runtime.minicluster.MiniCluster:Flink Mini Cluster started successfully 2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-2] org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess:Start SessionDispatcherLeaderProcess. 2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl:Starting the SlotManager. 2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess:Recover all persisted job graphs. 2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess:Successfully recovered 0 persisted job graphs. 2023-03-09 15:18:26,409 - 7710 INFO [mini-cluster-io-thread-6] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader akka://flink/user/rpc/resourcemanager_1 , session=fef4407a-536a-4646-831b-b6a791eadb69 2023-03-09 15:18:26,409 - 7710 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.TaskExecutor:Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(831bb6a791eadb69fef4407a536a4646). 2023-03-09 15:18:26,427 - 7728 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_2 . 2023-03-09 15:18:26,434 - 7735 INFO [mini-cluster-io-thread-5] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader akka://flink/user/rpc/dispatcher_2 , session=756fd14e-b1e6-4362-8a4c-c46b2c922984 2023-03-09 15:18:26,442 - 7743 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.registration.RetryingRegistration:Resolved ResourceManager address, beginning registration 2023-03-09 15:18:26,448 - 7749 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.resourcemanager.ResourceManager:Registering TaskManager with ResourceID 1155f5bc-ef55-45e7-94cf-619071db1006 (akka://flink/user/rpc/taskmanager_0) at ResourceManager 2023-03-09 15:18:26,449 - 7750 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection:Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 60be916d7f7b7b5728ecd55309e58a7b. 2023-03-09 15:18:27,179 - 8480 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.dispatcher.Dispatcher:Received JobGraph submission 8069c47ac05f65366267e27277078b15 (flinkStreamSQLLocalTest-catalog). 2023-03-09 15:18:27,179 - 8480 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.dispatcher.Dispatcher:Submitting job 8069c47ac05f65366267e27277078b15 (flinkStreamSQLLocalTest-catalog). 2023-03-09 15:18:27,194 - 8495 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Proposing leadership to contender LeaderContender: JobManagerRunnerImpl 2023-03-09 15:18:27,201 - 8502 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.rpc.akka.AkkaRpcService:Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 . 2023-03-09 15:18:27,207 - 8508 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.jobmaster.JobMaster:Initializing job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15). 2023-03-09 15:18:27,224 - 8525 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.scheduler.DefaultSchedulerFactory:Using restart back off time strategy NoRestartBackoffTimeStrategy for flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15). 2023-03-09 15:18:27,237 - 8538 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:start to buildGraph for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15). 2023-03-09 15:18:27,255 - 8556 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:trying to download shipFile from blobServer for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15). 2023-03-09 15:18:27,256 - 8557 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:Running initialization on master for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15). 2023-03-09 15:18:27,258 - 8559 INFO [mini-cluster-io-thread-12] com.dtstack.flinkx.connector.hive.sink.HiveOutputFormat:timeZone = sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null] 2023-03-09 15:18:27,258 - 8559 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder:Successfully ran initialization on master in 2 ms. 2023-03-09 15:18:27,260 - 8561 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology:Built 1 pipelined regions in 0 ms 2023-03-09 15:18:27,284 - 8585 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.state.StateBackendLoader:No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2023-03-09 15:18:27,286 - 8587 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.checkpoint.CheckpointCoordinator:No checkpoint found during restore. 2023-03-09 15:18:27,286 - 8587 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.scheduler.DefaultScheduler:Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@206f6c42 for flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15). 2023-03-09 15:18:27,308 - 8609 INFO [mini-cluster-io-thread-12] org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl:JobManager runner for job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15) was granted leadership with session id 3b22c1fc-230a-4148-97a1-edf4f2a048ae at akka://flink/user/rpc/jobmanager_3. 2023-03-09 15:18:27,311 - 8612 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.jobmaster.JobMaster:Starting execution of job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15) under job master id 97a1edf4f2a048ae3b22c1fc230a4148. 2023-03-09 15:18:27,313 - 8614 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.scheduler.DefaultScheduler:Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] 2023-03-09 15:18:27,313 - 8614 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.executiongraph.ExecutionGraph:Job flinkStreamSQLLocalTest-catalog (8069c47ac05f65366267e27277078b15) switched from state CREATED to RUNNING. 2023-03-09 15:18:27,316 - 8617 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.executiongraph.Execution:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (a5839704277a19cbfeb99cbb5a69e6c0) switched from CREATED to SCHEDULED. 2023-03-09 15:18:27,326 - 8627 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl:Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{f881de42b93343db4a14a89466fe508e}] 2023-03-09 15:18:27,327 - 8628 INFO [jobmanager-future-thread-1] org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=3b22c1fc-230a-4148-97a1-edf4f2a048ae 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.jobmaster.JobMaster:Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(831bb6a791eadb69fef4407a536a4646) 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.registration.RetryingRegistration:Resolved ResourceManager address, beginning registration 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.resourcemanager.ResourceManager:Registering job manager 97a1edf4f2a048ae3b22c1fc230a4148@akka://flink/user/rpc/jobmanager_3 for job 8069c47ac05f65366267e27277078b15. 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.resourcemanager.ResourceManager:Registered job manager 97a1edf4f2a048ae3b22c1fc230a4148@akka://flink/user/rpc/jobmanager_3 for job 8069c47ac05f65366267e27277078b15. 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.jobmaster.JobMaster:JobManager successfully registered at ResourceManager, leader id: 831bb6a791eadb69fef4407a536a4646. 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl:Requesting new slot [SlotRequestId{f881de42b93343db4a14a89466fe508e}] and profile ResourceProfile{UNKNOWN} with allocation id 92f93fa071090ca40d0748c6a30235cb from resource manager. 2023-03-09 15:18:27,327 - 8628 INFO [flink-akka.actor.default-dispatcher-2] org.apache.flink.runtime.resourcemanager.ResourceManager:Request slot with profile ResourceProfile{UNKNOWN} for job 8069c47ac05f65366267e27277078b15 with allocation id 92f93fa071090ca40d0748c6a30235cb. 2023-03-09 15:18:27,344 - 8645 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Receive slot request 92f93fa071090ca40d0748c6a30235cb for job 8069c47ac05f65366267e27277078b15 from resource manager with leader id 831bb6a791eadb69fef4407a536a4646. 2023-03-09 15:18:27,350 - 8651 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Allocated slot for 92f93fa071090ca40d0748c6a30235cb. 2023-03-09 15:18:27,351 - 8652 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService:Add job 8069c47ac05f65366267e27277078b15 for job leader monitoring. 2023-03-09 15:18:27,353 - 8654 INFO [mini-cluster-io-thread-18] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService$JobManagerLeaderListener:Try to register at job manager akka://flink/user/rpc/jobmanager_3 with leader id 3b22c1fc-230a-4148-97a1-edf4f2a048ae. 2023-03-09 15:18:27,354 - 8655 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.registration.RetryingRegistration:Resolved JobManager address, beginning registration 2023-03-09 15:18:27,358 - 8659 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService$JobManagerLeaderListener$JobManagerRegisteredRpcConnection:Successful registration at job manager akka://flink/user/rpc/jobmanager_3 for job 8069c47ac05f65366267e27277078b15. 2023-03-09 15:18:27,359 - 8660 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Establish JobManager connection for job 8069c47ac05f65366267e27277078b15. 2023-03-09 15:18:27,364 - 8665 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Offer reserved slots to the leader of job 8069c47ac05f65366267e27277078b15. 2023-03-09 15:18:27,369 - 8670 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.executiongraph.Execution:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (a5839704277a19cbfeb99cbb5a69e6c0) switched from SCHEDULED to DEPLOYING. 2023-03-09 15:18:27,370 - 8671 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.executiongraph.Execution:Deploying Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (attempt #0) with attempt id a5839704277a19cbfeb99cbb5a69e6c0 to 1155f5bc-ef55-45e7-94cf-619071db1006 @ 127.0.0.1 (dataPort=-1) with allocation id 92f93fa071090ca40d0748c6a30235cb 2023-03-09 15:18:27,376 - 8677 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl:Activate slot 92f93fa071090ca40d0748c6a30235cb. 2023-03-09 15:18:27,401 - 8702 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.TaskExecutor:Received task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0), deploy into slot with allocation id 92f93fa071090ca40d0748c6a30235cb. 2023-03-09 15:18:27,402 - 8703 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) switched from CREATED to DEPLOYING. 2023-03-09 15:18:27,404 - 8705 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl:Activate slot 92f93fa071090ca40d0748c6a30235cb. 2023-03-09 15:18:27,406 - 8707 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) [DEPLOYING]. 2023-03-09 15:18:27,407 - 8708 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) [DEPLOYING]. 2023-03-09 15:18:27,407 - 8708 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Obtaining local cache file for 'class_path_1'. 2023-03-09 15:18:27,409 - 8710 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Obtaining local cache file for 'class_path_0'. 2023-03-09 15:18:27,412 - 8713 INFO [flink-file-cache-thread-1] org.apache.flink.runtime.blob.BlobClient:Downloading 8069c47ac05f65366267e27277078b15/p-4f550129062af8dbd019e6805c3af9d0bbd2e053-60d3852f92c3a7214e4e740e4b8308d1 from localhost/127.0.0.1:49266 2023-03-09 15:18:27,412 - 8713 INFO [flink-file-cache-thread-2] org.apache.flink.runtime.blob.BlobClient:Downloading 8069c47ac05f65366267e27277078b15/p-06f38bb20715f51f46ba89c897eb8275e7ab772f-9d048ba37a86e378e6bd2ff160c68900 from localhost/127.0.0.1:49266 2023-03-09 15:18:27,427 - 8728 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.state.StateBackendLoader:No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2023-03-09 15:18:27,443 - 8744 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.taskmanager.Task:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0 (a5839704277a19cbfeb99cbb5a69e6c0) switched from DEPLOYING to RUNNING. 2023-03-09 15:18:27,443 - 8744 INFO [flink-akka.actor.default-dispatcher-4] org.apache.flink.runtime.executiongraph.Execution:Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1) (a5839704277a19cbfeb99cbb5a69e6c0) switched from DEPLOYING to RUNNING. 2023-03-09 15:18:27,504 - 8805 WARN [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup:The operator name Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) exceeded the 80 characters length limit and was truncated. 2023-03-09 15:18:27,524 - 8825 WARN [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup:The operator name Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) exceeded the 80 characters length limit and was truncated. 2023-03-09 15:18:27,582 - 8883 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.DtOutputFormatSinkFunction:Start initialize output format state 2023-03-09 15:18:27,670 - 8971 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.DtOutputFormatSinkFunction:Is restored:false 2023-03-09 15:18:27,671 - 8972 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.DtOutputFormatSinkFunction:End initialize output format state 2023-03-09 15:18:27,671 - 8972 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hive.sink.HiveOutputFormat:timeZone = sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null] 2023-03-09 15:18:27,692 - 8993 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hive.sink.HiveOutputFormat:tablePath:test222221, rowData:null, even:{schema=null, table=test222221} 2023-03-09 15:18:29,879 - 11180 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Supplied authorities: 172.18.8.208:10000 2023-03-09 15:18:29,879 - 11180 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Resolved authority: 172.18.8.208:10000 2023-03-09 15:18:30,910 - 12211 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseRichOutputFormat:[HiveOutputFormat] open successfully, checkpointMode = AT_LEAST_ONCE, checkpointEnabled = false, flushIntervalMills = 10000, batchSize = 1, [HiveConf]: { "semantic" : "at-least-once", "autoCreateTable" : false, "errorRecord" : 0, "checkFormat" : true, "parallelism" : 1, "flushIntervalMills" : 10000, "writeMode" : "append", "fieldDelimiter" : ",", "tableInfos" : { "test222221" : { "columnNameList" : [ "id", "device_id" ], "columnTypeList" : [ "INT", "STRING" ], "createTableSql" : "CREATE TABLE %s (id INT,device_id STRING) PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE ", "tableName" : "test222221", "tablePath" : "test222221", "path" : "hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221", "store" : "TEXT", "delimiter" : ",", "partitionList" : [ "pt" ] } }, "tableName" : "test222221", "enableDictionary" : true, "tablesColumn" : "{"test222221":[{"key":"id","type":"INT"},{"key":"device_id","type":"STRING"}]}", "rowGroupSize" : 134217728, "partition" : "pt", "filterRegex" : "", "distributeTableMapping" : { }, "nextCheckRows" : 5000, "errorPercentage" : -1, "maxFileSize" : 1073741824, "encoding" : "UTF-8", "fieldNameList" : [ ], "partitionType" : "DAY", "hadoopConfig" : { }, "jdbcUrl" : "jdbc:hive2://172.18.8.208:10000/ljgk_dw", "defaultFS" : "hdfs://hadoop02:8020", "batchSize" : 1, "speedBytes" : 0, "metricPluginName" : "prometheus", "fileType" : "text", "partitionValue" : "" } 2023-03-09 15:18:30,913 - 12214 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase:Consumer subtask 0 has no restore state. 2023-03-09 15:18:30,914 - 12215 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.KafkaConsumer:Start initialize input format state, is restored:false 2023-03-09 15:18:30,917 - 12218 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.KafkaConsumer:End initialize input format state 2023-03-09 15:18:30,970 - 12271 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [172.18.8.203:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = null group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2023-03-09 15:18:31,093 - 12394 WARN [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config. 2023-03-09 15:18:31,099 - 12400 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka version: 2.4.1 2023-03-09 15:18:31,099 - 12400 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka commitId: c57222ae8cd7866b 2023-03-09 15:18:31,099 - 12400 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka startTimeMs: 1678346311093 2023-03-09 15:18:31,696 - 12997 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-1, groupId=null] Cluster ID: t18-k6MJQE6xY7xFvS42Fg 2023-03-09 15:18:31,696 - 12997 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase:Consumer subtask 0 will start reading the following 1 partitions from the earliest offsets: [KafkaTopicPartition{topic='test', partition=0}] 2023-03-09 15:18:31,696 - 12997 INFO [Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.DynamicKafkaDeserializationSchema:[DynamicKafkaDeserializationSchemaWrapper] open successfully, inputSplit = see other log, [Properties]: { "key.deserializer" : "org.apache.kafka.common.serialization.ByteArrayDeserializer", "value.deserializer" : "org.apache.kafka.common.serialization.ByteArrayDeserializer", "flink.partition-discovery.interval-millis" : "-9223372036854775808", "bootstrap.servers" : "172.18.8.203:9092" } 2023-03-09 15:18:31,718 - 13019 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase:Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='test', partition=0}=-915623761775}. 2023-03-09 15:18:31,757 - 13058 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [172.18.8.203:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = null group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2023-03-09 15:18:31,766 - 13067 WARN [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.config.AbstractConfig:The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config. 2023-03-09 15:18:31,767 - 13068 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka version: 2.4.1 2023-03-09 15:18:31,768 - 13069 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka commitId: c57222ae8cd7866b 2023-03-09 15:18:31,768 - 13069 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.common.utils.AppInfoParser$AppInfo:Kafka startTimeMs: 1678346311767 2023-03-09 15:18:31,773 - 13074 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.consumer.KafkaConsumer:[Consumer clientId=consumer-2, groupId=null] Subscribed to partition(s): test-0 2023-03-09 15:18:31,783 - 13084 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.consumer.internals.SubscriptionState:[Consumer clientId=consumer-2, groupId=null] Seeking to EARLIEST offset of partition test-0 2023-03-09 15:18:31,794 - 13095 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.Metadata:[Consumer clientId=consumer-2, groupId=null] Cluster ID: t18-k6MJQE6xY7xFvS42Fg 2023-03-09 15:18:31,806 - 13107 INFO [Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.kafka.clients.consumer.internals.SubscriptionState:[Consumer clientId=consumer-2, groupId=null] Resetting offset for partition test-0 to offset 51. 2023-03-09 15:18:31,855 - 13156 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.kafka.source.DynamicKafkaDeserializationSchema:receive source data:{ "id": 1, "device_id": "a" } 2023-03-09 15:18:31,877 - 13178 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Supplied authorities: 172.18.8.208:10000 2023-03-09 15:18:31,878 - 13179 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hive.jdbc.Utils:Resolved authority: 172.18.8.208:10000 2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'nErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'nullErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'duplicateErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'conversionErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,100 - 13401 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'otherErrors'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'numWrite'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'numWritePerSecond'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'snapshotWrite'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'byteWrite'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'byteWritePerSecond'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,101 - 13402 WARN [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.flink.runtime.metrics.groups.AbstractMetricGroup:Name collision: Group already contains a Metric with the name 'writeDuration'. Metric will not be reported.[, taskmanager, 1155f5bc-ef55-45e7-94cf-619071db1006, flinkStreamSQLLocalTest-catalog, Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_, 0, flinkx, output] 2023-03-09 15:18:32,102 - 13403 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseFileOutputFormat:Start current File Index:0 2023-03-09 15:18:32,102 - 13403 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseFileOutputFormat:Channel:[0], currentFileNamePrefix:[8069c47ac05f65366267e27277078b15_0] 2023-03-09 15:18:32,288 - 13589 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS 2023-03-09 15:18:32,307 - 13608 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS 2023-03-09 15:18:32,323 - 13624 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS 2023-03-09 15:18:33,301 - 14602 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.sink.format.BaseRichOutputFormat:[HdfsTextOutputFormat] open successfully, checkpointMode = AT_LEAST_ONCE, checkpointEnabled = false, flushIntervalMills = 10000, batchSize = 1, [HiveConf]: { "semantic" : "at-least-once", "autoCreateTable" : false, "errorRecord" : 0, "checkFormat" : true, "parallelism" : 1, "flushIntervalMills" : 10000, "writeMode" : "append", "fieldDelimiter" : ",", "tableInfos" : { "test222221" : { "columnNameList" : [ "id", "device_id" ], "columnTypeList" : [ "INT", "STRING" ], "createTableSql" : "CREATE TABLE %s (id INT,device_id STRING) PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE ", "tableName" : "test222221", "tablePath" : "test222221", "path" : "hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221", "store" : "TEXT", "delimiter" : ",", "partitionList" : [ "pt" ] } }, "fullColumnName" : [ "id", "device_id" ], "tableName" : "test222221", "enableDictionary" : true, "path" : "hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221/pt=20230309", "tablesColumn" : "{"test222221":[{"key":"id","type":"INT"},{"key":"device_id","type":"STRING"}]}", "rowGroupSize" : 134217728, "partition" : "pt", "filterRegex" : "", "distributeTableMapping" : { }, "nextCheckRows" : 5000, "fullColumnType" : [ "INT", "STRING" ], "column" : [ { "name" : "id", "type" : "INT", "index" : 0, "notNull" : false, "part" : false }, { "name" : "device_id", "type" : "STRING", "index" : 1, "notNull" : false, "part" : false } ], "errorPercentage" : -1, "maxFileSize" : 1073741824, "encoding" : "UTF-8", "fieldNameList" : [ ], "partitionType" : "DAY", "hadoopConfig" : { "fs.default.name" : "hdfs://hadoop02:8020", "fs.hdfs.impl.disable.cache" : "true" }, "jdbcUrl" : "jdbc:hive2://172.18.8.208:10000/ljgk_dw", "defaultFS" : "hdfs://hadoop02:8020", "batchSize" : 1, "speedBytes" : 0, "metricPluginName" : "prometheus", "fileType" : "text", "partitionValue" : "" } 2023-03-09 15:18:33,376 - 14677 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS 2023-03-09 15:18:33,512 - 14813 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS 2023-03-09 15:18:33,672 - 14973 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] org.apache.hadoop.conf.Configuration:fs.default.name is deprecated. Instead, use fs.defaultFS 2023-03-09 15:18:33,674 - 14975 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hdfs.sink.BaseHdfsOutputFormat:start to delete directory:hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221/pt=20230309.data 2023-03-09 15:18:33,865 - 15166 INFO [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, device_id]) -> Sink: Sink(table=[default_catalog.default_database.test111], fields=[id, device_id]) (1/1)#0] com.dtstack.flinkx.connector.hdfs.sink.HdfsTextOutputFormat:subtask:[0] create block file:hdfs://hadoop02:8020/data/hive/warehouse/ljgk_dw.db/test222221/pt=20230309.data\8069c47ac05f65366267e27277078b15_0_0

How to reproduce

CREATE TABLE source ( id int, device_id string ) WITH ( 'connector' = 'kafka-x', 'topic' = 'test', 'properties.bootstrap.servers' = '172.18.8.203:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE test111 ( id int, device_id string ) WITH (

  'connector' = 'hive-x',
  'default-fs' = 'hdfs://hadoop02:8020',
  'file-type' = 'text',
  'url' = 'jdbc:hive2://172.18.8.208:10000/ljgk_dw',
  'field-delimiter' = ',',
  'write-mode' = 'append',
  'partition' = 'pt',
  'partition-type' = 'DAY',
  'sink.parallelism' = '1',
  'table-name' = 'test222221'
  );

insert into test111 select * from source

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

biandou1313 avatar Mar 09 '23 07:03 biandou1313

看上去是windows运行的任务,文件分隔符导致的

ll076110 avatar Mar 10 '23 02:03 ll076110

表是自动创建的 服务器上执行也是如此 我把表切分键改为逗号 CREATE TABLE source ( id int, device_id string ) WITH ( 'connector' = 'kafka-x', 'topic' = 'test', 'properties.bootstrap.servers' = '172.18.8.203:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE test111 ( id int, device_id string ) WITH (

  'connector' = 'hive-x',
  'default-fs' = 'hdfs://hadoop02:8020',
  'file-type' = 'text',
  'url' = 'jdbc:hive2://172.18.8.208:10000/ljgk_dw',
  'field-delimiter' = ',',
  'partition' = 'pt',
  'partition-type' = 'DAY',
  'sink.parallelism' = '1',
  'table-name' = 'test33333'
  );

insert into test111 select * from source

hdfs上如下: image

biandou1313 avatar Mar 10 '23 02:03 biandou1313

下面是服务器上执行结果: image

biandou1313 avatar Mar 10 '23 02:03 biandou1313

服务器上的路径才是对的,但是最后没有将数据从.data移动到真实目录,可以去看下执行这任务时的jobmanager日志,定位下原因。

ll076110 avatar Mar 14 '23 01:03 ll076110

我们通过File.separatorChar 获取分隔符的,所以你本地windows执行目录是有问题的

ll076110 avatar Mar 14 '23 01:03 ll076110