kinesis-sql icon indicating copy to clipboard operation
kinesis-sql copied to clipboard

startingPosition AT_TIMESTAMP iteratorOption NumberFormatException

Open adamlbailey opened this issue 5 years ago • 9 comments

Excellent addition for reading from stream at specific positions per #78

However, I'm having trouble using the "AT_TIMESTAMP" option to read from shards at specific timestamps.

The shardInfo object I'm using is below:

{
  "metadata": {
    "streamName": "QSR-data-stream-production",
    "batchId": "1"
  },
  "shardId-000000000002": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000003": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000004": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000005": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  }
}

Here is the exception:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 9, ip-10-0-9-81.us-west-2.compute.internal, executor 5): java.lang.NumberFormatException: For input string: "2020-09-30T19:58:46.480-00:00"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.parseLong(Long.java:631)
	at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
	at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
	at org.apache.spark.sql.kinesis.KinesisReader.getShardIterator(KinesisReader.scala:120)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getShardIterator(KinesisSourceRDD.scala:146)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:197)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:138)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

adamlbailey avatar Oct 01 '20 15:10 adamlbailey

Update:

Using timestamp instead of iso formatted strings solves the problem. Would be nice to support iso strings as well for readability and compatibility with the options afforded by other kinesis libraries. Will consider adding a PR to address this soon.

{
  "metadata": {
    "streamName": "QSR-data-stream-production",
    "batchId": "1"
  },
  "shardId-000000000002": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000003": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000004": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000005": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  }
}

adamlbailey avatar Oct 01 '20 16:10 adamlbailey

@adamlbailey - Looking forward to the PR.

itsvikramagr avatar Oct 01 '20 16:10 itsvikramagr

Hi @adamlbailey,

I am trying to read the data from Kinesis using at_timestamp as option in startingposition. Here is my below piece of code to achieve this

pos = json.dumps({"at_timestamp": "02/26/2021 3:07:13 PDT"}) kinesisDF = spark.readStream.format("kinesis").option("streamName", name).option("endpointUrl", URL).option("awsAccessKeyId",key).option("awsSecretKey",sKey).option("startingposition",pos).load()

Here is the ERROR message I am receiving pyspark.sql.utils.IllegalArgumentException: 'org.json4s.package$MappingException: Expected object but got JString(02/26/2021 3:07:13 PDT)'

I am new to use this kinesis connector and I know the way I am passing value for the starting position is wrong, could you help me how to pass the at_timestamp as the value for the startposition option.

Thanks in Advance!

gopi-t2s avatar Feb 26 '21 14:02 gopi-t2s

Hi @gopi-t2s, I'm somewhat removed from this work now but if memory serves:

You're going to want to construct an object exemplified in my previous comment.. Practically, I did this by writing a helper that described the stream so I could list each shard with the right timestamp Long value.

adamlbailey avatar Feb 27 '21 18:02 adamlbailey

Thanks @adamlbailey for your inputs..

gopi-t2s avatar Feb 28 '21 05:02 gopi-t2s

I ran into this as well @gopi-t2s - were you able to make it work? I was unsure if pyspark was going to be supported for this,

chadlagore avatar Mar 12 '21 04:03 chadlagore

No @chadlagore, I am still looking for the ways to attain this..

gopi-t2s avatar Mar 12 '21 06:03 gopi-t2s

I got it working with the upper example from @adamlbailey

minimal example in pyspark:

now_ts = datetime.now().strftime("%s") + "000"  # timestamp in epoch time format, e.g. "1601495926000"
from_timestamp = {
  "metadata": {
    "streamName": "my-stream",
    "batchId": "1"
  },
  "shardId-000000000000": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition":  now_ts
  }
}

starting_position = json.dumps(from_timestamp)

my_stream = (spark
                       .readStream
                       .format('kinesis')
                       .option('streamName', "my-stream")
                       .option('endpointUrl', KINESIS_ENDPOINT)
                       .option('region', KINESIS_REGION)
                       .option('startingposition', starting_position)

hope this helps @chadlagore @gopi-t2s

nikitira avatar Apr 29 '21 09:04 nikitira

Thank you @nikitira , I will try this..

gopi-t2s avatar Apr 29 '21 12:04 gopi-t2s