hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hoodie Delta streamer Job with Kafka Source fetching the same offset again and again Commiting the same offset again and again

Open ksrihari93 opened this issue 2 years ago • 5 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced I'm using Hudi Delta streamer in continuous mode with Kafka source.

we have 120 partitions in the Kafka topic and the ingestion rate is (200k) RPM

All of a sudden job has stopped pulling the new data from Kafka topic. It keeps on polling the same offset again and again.

Different issues on same JOB: https://github.com/apache/hudi/issues/5823 https://github.com/apache/hudi/issues/5822

A clear and concise description of the problem.

The Deltastreamer job with Kafka Source we are running for many days all of a sudden the job has started polling the new data from the Kafka topic.

we have some alerts on top of hudi table and some metrics setup on kafka lag as well , once we receive the alert that last record we have in table is more than 5 hours and after debugging through logs we came to know that it's always started fetching the data from same offset again and again.

Expected behavior It should start polling the new data and commit the offset it has consumed from kafka topic into target commit file .

Environment Description

  • Hudi version :0.9

  • Spark version :2.4

  • Storage (HDFS/S3/GCS..) :BLOB

  • Running on Docker? (yes/no) :Kubernetes

Additional context

Configuration for the JOB is

#base properties

hoodie.insert.shuffle.parallelism=50 hoodie.bulkinsert.shuffle.parallelism=200 hoodie.embed.timeline.server=true hoodie.filesystem.view.type=EMBEDDED_KV_STORE hoodie.compact.inline=false hoodie.bulkinsert.sort.mode=none

#cleaner properties hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=60 hoodie.clean.async=true

#archival hoodie.keep.min.commits=12 hoodie.keep.max.commits=15

#datasource properties hoodie.deltastreamer.schemaprovider.registry.url= hoodie.datasource.write.recordkey.field= hoodie.deltastreamer.source.kafka.topic= hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp:TIMESTAMP hoodie.deltastreamer.kafka.source.maxEvents=600000000 hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.input.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd hoodie.clustering.async.enabled=true hoodie.clustering.plan.strategy.target.file.max.bytes=3000000000 hoodie.clustering.plan.strategy.small.file.limit=200000001 hoodie.clustering.async.max.commits=1 hoodie.clustering.plan.strategy.max.num.groups=10

#kafka props bootstrap.servers= schema.registry.url=

Deltastreamer Class Arguments:

  • "--table-type"
  • "COPY_ON_WRITE"
  • "--props"
  • "/opt/spark/hudi/config/source.properties"
  • "--schemaprovider-class"
  • "org.apache.hudi.utilities.schema.SchemaRegistryProvider"
  • "--source-class"
  • "org.apache.hudi.utilities.sources.JsonKafkaSource"
  • "--target-base-path"
  • ""
  • "--target-table"
  • ""
  • "--op"
  • "BULK_INSERT"
  • "--source-ordering-field"
  • "timestamp"
  • "--continuous"
  • "--min-sync-interval-seconds"
  • "60"

Stacktrace

Attaching the Files

ksrihari93 avatar Jun 15 '22 12:06 ksrihari93

.hoodie.zip

ksrihari93 avatar Jun 15 '22 13:06 ksrihari93

@ksrihari93 Did you try with auto.offset.reset=LATEST?

codope avatar Jun 17 '22 14:06 codope

@ksrihari93 is this still an issue after trying auto.offset.reset=LATEST?

rmahindra123 avatar Jul 21 '22 08:07 rmahindra123

@ksrihari93 : again, can you respond to above clarifications. do you see new commits in timeline, but the kafka offset is not moving? is my understanding right.

nsivabalan avatar Aug 10 '22 02:08 nsivabalan

@ksrihari93 : gentle ping.

nsivabalan avatar Aug 16 '22 07:08 nsivabalan

Hi Team, Sorry for the late reply.

i have used this option only. auto.offset.reset=LATEST

But could not recover. So i wrote to some temporary path for few records and copied the checkpoint then it started running properly.

ksrihari93 avatar Aug 17 '22 08:08 ksrihari93

as I pointed out in the other github issue, if kafka checkpoint type is timestamp, we don't support auto resetting to LATEST yet.

nsivabalan avatar Aug 19 '22 04:08 nsivabalan

@ksrihari93 : hey did you get a chance to check out my response above. is there any more assistance you are looking for ?

nsivabalan avatar Sep 12 '22 05:09 nsivabalan

these are the latest checkpoints as per the logs shared. (logs_latest_commits.txt)

22/06/14 18:27:12 INFO DeltaSync: Checkpoint to resume from : Option{val=dp.packet,0:23816747280,1:23766354517,2:23605513356,3:23434358328,4:23529418069,5:13014012706,6:13111477149,7:13326541492,8:13418171123,9:12818988564,10:13199646406,11:13018719613,12:13171335493,13:13202912834,14:13139217975,15:13177133210,16:12855677774,17:13190680738,18:13384909168,19:13076367894,20:12931040506,21:13179604368,22:13103929138,23:13129130931,24:12962399070,25:13209169847,26:13261971379,27:13223540895,28:12800622145,29:13135630345,30:13286314622,31:12910064270,32:13012723305,33:12942700314,34:13300903762,35:13452813697,36:12774627787,37:13149143084,38:13397339159,39:12943639180,40:12850660061,41:13287830095,42:13416968091,43:13251840311,44:12975405300,45:13129020620,46:13319529463,47:13645113762,48:13171132949,49:13341802693,50:13160916594,51:12797360849,52:13231051973,53:13159710596,54:13462835274,55:13218075514,56:13228939350,57:13026346757,58:13197365542,59:12782050600,60:13274602048,61:13019911553,62:13093034410,63:12946710535,64:12735821947,65:13521932586,66:12885611345,67:12804964853,68:13190226613,69:13119906383,70:13037133163,71:13037077649,72:13249184425,73:13034553149,74:12596466583,75:13197572654,76:13068376212,77:13394048883,78:12949166912,79:12947874565,80:12766226593,81:12887001480,82:12961256747,83:12640403833,84:13209947935,85:12990821869,86:12967972824,87:13062323012,88:12801634102,89:13377026742,90:13075492590,91:12899740426,92:13105955253,93:12811456735,94:13018855871,95:12837481047,96:13143601548,97:12797197623,98:12990305191,99:13092561101,100:13133162523,101:12559759129,102:13091848951,103:12889825622,104:12749143212,105:13041769115,106:13023952197,107:13081277534,108:13043234272,109:13020451301,110:12607811366,111:13056149918,112:13283818745,113:12922522456,114:12828248592,115:12997400759,116:12837921515,117:13035132730,118:12979892771,119:13093824502}
22/06/14 20:08:29 INFO DeltaSync: Checkpoint to resume from : Option{val=dp.packet,0:23821747280,1:23771354517,2:23610513356,3:23439358328,4:23534418069,5:13019012706,6:13116477149,7:13331541492,8:13423171123,9:12823988564,10:13204646406,11:13023719613,12:13176335493,13:13207912834,14:13144217975,15:13182133210,16:12860677774,17:13195680738,18:13389909168,19:13081367894,20:12936040506,21:13184604368,22:13108929138,23:13134130931,24:12967399070,25:13214169847,26:13266971379,27:13228540895,28:12805622145,29:13140630345,30:13291314622,31:12915064270,32:13017723305,33:12947700314,34:13305903762,35:13457813697,36:12779627787,37:13154143084,38:13402339159,39:12948639180,40:12855660061,41:13292830095,42:13421968091,43:13256840311,44:12980405300,45:13134020620,46:13324529463,47:13650113762,48:13176132949,49:13346802693,50:13165916594,51:12802360849,52:13236051973,53:13164710596,54:13467835274,55:13223075514,56:13233939350,57:13031346757,58:13202365542,59:12787050600,60:13279602048,61:13024911553,62:13098034410,63:12951710535,64:12740821947,65:13526932586,66:12890611345,67:12809964853,68:13195226613,69:13124906383,70:13042133163,71:13042077649,72:13254184425,73:13039553149,74:12601466583,75:13202572654,76:13073376212,77:13399048883,78:12954166912,79:12952874565,80:12771226593,81:12892001480,82:12966256747,83:12645403833,84:13214947935,85:12995821869,86:12972972824,87:13067323012,88:12806634102,89:13382026742,90:13080492590,91:12904740426,92:13110955253,93:12816456735,94:13023855871,95:12842481047,96:13148601548,97:12802197623,98:12995305191,99:13097561101,100:13138162523,101:12564759129,102:13096848951,103:12894825622,104:12754143212,105:13046769115,106:13028952197,107:13086277534,108:13048234272,109:13025451301,110:12612811366,111:13061149918,112:13288818745,113:12927522456,114:12833248592,115:13002400759,116:12842921515,117:13040132730,118:12984892771,119:13098824502}
22/06/14 20:53:50 INFO DeltaSync: Checkpoint to resume from : Option{val=dp.packet,0:23826747280,1:23776354517,2:23615513356,3:23444358328,4:23539418069,5:13024012706,6:13121477149,7:13336541492,8:13428171123,9:12828988564,10:13209646406,11:13028719613,12:13181335493,13:13212912834,14:13149217975,15:13187133210,16:12865677774,17:13200680738,18:13394909168,19:13086367894,20:12941040506,21:13189604368,22:13113929138,23:13139130931,24:12972399070,25:13219169847,26:13271971379,27:13233540895,28:12810622145,29:13145630345,30:13296314622,31:12920064270,32:13022723305,33:12952700314,34:13310903762,35:13462813697,36:12784627787,37:13159143084,38:13407339159,39:12953639180,40:12860660061,41:13297830095,42:13426968091,43:13261840311,44:12985405300,45:13139020620,46:13329529463,47:13655113762,48:13181132949,49:13351802693,50:13170916594,51:12807360849,52:13241051973,53:13169710596,54:13472835274,55:13228075514,56:13238939350,57:13036346757,58:13207365542,59:12792050600,60:13284602048,61:13029911553,62:13103034410,63:12956710535,64:12745821947,65:13531932586,66:12895611345,67:12814964853,68:13200226613,69:13129906383,70:13047133163,71:13047077649,72:13259184425,73:13044553149,74:12606466583,75:13207572654,76:13078376212,77:13404048883,78:12959166912,79:12957874565,80:12776226593,81:12897001480,82:12971256747,83:12650403833,84:13219947935,85:13000821869,86:12977972824,87:13072323012,88:12811634102,89:13387026742,90:13085492590,91:12909740426,92:13115955253,93:12821456735,94:13028855871,95:12847481047,96:13153601548,97:12807197623,98:13000305191,99:13102561101,100:13143162523,101:12569759129,102:13101848951,103:12899825622,104:12759143212,105:13051769115,106:13033952197,107:13091277534,108:13053234272,109:13030451301,110:12617811366,111:13066149918,112:13293818745,113:12932522456,114:12838248592,115:13007400759,116:12847921515,117:13045132730,118:12989892771,119:13103824502}
22/06/14 21:47:05 INFO DeltaSync: Checkpoint to resume from : Option{val=dp.packet,0:23831747280,1:23781354517,2:23620513356,3:23449358328,4:23544418069,5:13029012706,6:13126477149,7:13341541492,8:13433171123,9:12833988564,10:13214646406,11:13033719613,12:13186335493,13:13217912834,14:13154217975,15:13192133210,16:12870677774,17:13205680738,18:13399909168,19:13091367894,20:12946040506,21:13194604368,22:13118929138,23:13144130931,24:12977399070,25:13224169847,26:13276971379,27:13238540895,28:12815622145,29:13150630345,30:13301314622,31:12925064270,32:13027723305,33:12957700314,34:13315903762,35:13467813697,36:12789627787,37:13164143084,38:13412339159,39:12958639180,40:12865660061,41:13302830095,42:13431968091,43:13266840311,44:12990405300,45:13144020620,46:13334529463,47:13660113762,48:13186132949,49:13356802693,50:13175916594,51:12812360849,52:13246051973,53:13174710596,54:13477835274,55:13233075514,56:13243939350,57:13041346757,58:13212365542,59:12797050600,60:13289602048,61:13034911553,62:13108034410,63:12961710535,64:12750821947,65:13536932586,66:12900611345,67:12819964853,68:13205226613,69:13134906383,70:13052133163,71:13052077649,72:13264184425,73:13049553149,74:12611466583,75:13212572654,76:13083376212,77:13409048883,78:12964166912,79:12962874565,80:12781226593,81:12902001480,82:12976256747,83:12655403833,84:13224947935,85:13005821869,86:12982972824,87:13077323012,88:12816634102,89:13392026742,90:13090492590,91:12914740426,92:13120955253,93:12826456735,94:13033855871,95:12852481047,96:13158601548,97:12812197623,98:13005305191,99:13107561101,100:13148162523,101:12574759129,102:13106848951,103:12904825622,104:12764143212,105:13056769115,106:13038952197,107:13096277534,108:13058234272,109:13035451301,110:12622811366,111:13071149918,112:13298818745,113:12937522456,114:12843248592,115:13012400759,116:12852921515,117:13050132730,118:12994892771,119:13108824502}
22/06/14 22:38:14 INFO DeltaSync: Checkpoint to resume from : Option{val=dp.packet,0:23836747280,1:23786354517,2:23625513356,3:23454358328,4:23549418069,5:13034012706,6:13131477149,7:13346541492,8:13438171123,9:12838988564,10:13219646406,11:13038719613,12:13191335493,13:13222912834,14:13159217975,15:13197133210,16:12875677774,17:13210680738,18:13404909168,19:13096367894,20:12951040506,21:13199604368,22:13123929138,23:13149130931,24:12982399070,25:13229169847,26:13281971379,27:13243540895,28:12820622145,29:13155630345,30:13306314622,31:12930064270,32:13032723305,33:12962700314,34:13320903762,35:13472813697,36:12794627787,37:13169143084,38:13417339159,39:12963639180,40:12870660061,41:13307830095,42:13436968091,43:13271840311,44:12995405300,45:13149020620,46:13339529463,47:13665113762,48:13191132949,49:13361802693,50:13180916594,51:12817360849,52:13251051973,53:13179710596,54:13482835274,55:13238075514,56:13248939350,57:13046346757,58:13217365542,59:12802050600,60:13294602048,61:13039911553,62:13113034410,63:12966710535,64:12755821947,65:13541932586,66:12905611345,67:12824964853,68:13210226613,69:13139906383,70:13057133163,71:13057077649,72:13269184425,73:13054553149,74:12616466583,75:13217572654,76:13088376212,77:13414048883,78:12969166912,79:12967874565,80:12786226593,81:12907001480,82:12981256747,83:12660403833,84:13229947935,85:13010821869,86:12987972824,87:13082323012,88:12821634102,89:13397026742,90:13095492590,91:12919740426,92:13125955253,93:12831456735,94:13038855871,95:12857481047,96:13163601548,97:12817197623,98:13010305191,99:13112561101,100:13153162523,101:12579759129,102:13111848951,103:12909825622,104:12769143212,105:13061769115,106:13043952197,107:13101277534,108:13063234272,109:13040451301,110:12627811366,111:13076149918,112:13303818745,113:12942522456,114:12848248592,115:13017400759,116:12857921515,117:13055132730,118:12999892771,119:13113824502}
22/06/14 23:56:22 INFO DeltaSync: Checkpoint to resume from : Option{val=dp.packet,0:23841747280,1:23791354517,2:23630513356,3:23459358328,4:23554418069,5:13039012706,6:13136477149,7:13351541492,8:13443171123,9:12843988564,10:13224646406,11:13043719613,12:13196335493,13:13227912834,14:13164217975,15:13202133210,16:12880677774,17:13215680738,18:13409909168,19:13101367894,20:12956040506,21:13204604368,22:13128929138,23:13154130931,24:12987399070,25:13234169847,26:13286971379,27:13248540895,28:12825622145,29:13160630345,30:13311314622,31:12935064270,32:13037723305,33:12967700314,34:13325903762,35:13477813697,36:12799627787,37:13174143084,38:13422339159,39:12968639180,40:12875660061,41:13312830095,42:13441968091,43:13276840311,44:13000405300,45:13154020620,46:13344529463,47:13670113762,48:13196132949,49:13366802693,50:13185916594,51:12822360849,52:13256051973,53:13184710596,54:13487835274,55:13243075514,56:13253939350,57:13051346757,58:13222365542,59:12807050600,60:13299602048,61:13044911553,62:13118034410,63:12971710535,64:12760821947,65:13546932586,66:12910611345,67:12829964853,68:13215226613,69:13144906383,70:13062133163,71:13062077649,72:13274184425,73:13059553149,74:12621466583,75:13222572654,76:13093376212,77:13419048883,78:12974166912,79:12972874565,80:12791226593,81:12912001480,82:12986256747,83:12665403833,84:13234947935,85:13015821869,86:12992972824,87:13087323012,88:12826634102,89:13402026742,90:13100492590,91:12924740426,92:13130955253,93:12836456735,94:13043855871,95:12862481047,96:13168601548,97:12822197623,98:13015305191,99:13117561101,100:13158162523,101:12584759129,102:13116848951,103:12914825622,104:12774143212,105:13066769115,106:13048952197,107:13106277534,108:13068234272,109:13045451301,110:12632811366,111:13081149918,112:13308818745,113:12947522456,114:12853248592,115:13022400759,116:12862921515,117:13060132730,118:13004892771,119:13118824502}

Specifically: for partition 7: 13326541492 13331541492 13336541492 13341541492 13346541492 13351541492

for partition 16: 12855677774 12860677774 12865677774 12870677774 12875677774 12880677774

So, it is making progress from what I can infer.

nsivabalan avatar Sep 20 '22 01:09 nsivabalan

what happened was that within one partition offset has expired. When tried with offset based on timestamp (this doesn't support as suggested above) this also did not work out. So recovery of the job has been done by writing a few records on a temporary path and copying the offset to the original directory and recovered the job.

For now we can close this issue . we are upgrading the Hudi version, post-testing will get back to you if needed.

ksrihari93 avatar Sep 20 '22 06:09 ksrihari93

thanks man! appreciate your response.

nsivabalan avatar Sep 20 '22 22:09 nsivabalan