aws-sdk-go-v2 icon indicating copy to clipboard operation
aws-sdk-go-v2 copied to clipboard

Kinesis sdk panicking

Open davidwin93 opened this issue 1 year ago • 6 comments

Acknowledgements

  • [X] I have searched (https://github.com/aws/aws-sdk/issues?q=is%3Aissue) for past instances of this issue
  • [X] I have verified all of my SDK modules are up-to-date (you can perform a bulk update with go get -u github.com/aws/aws-sdk-go-v2/...)

Describe the bug

A go routine is panicking when attempting to read outside of a slices boundary when reading from a kinesis stream.

Expected Behavior

There should be no panic in a library since we can't recover from this and the entire application crashes

Current Behavior

panic: runtime error: slice bounds out of range [31319:23127]

goroutine 937 [running]: compress/flate.(*decompressor).Read(0xc00054c008, {0xc0009ce000, 0x37e00, 0xc000498700?}) /Users/david/.asdf/installs/golang/1.22.0/go/src/compress/flate/inflate.go:339 +0x1e8 compress/gzip.(*Reader).Read(0xc000314588, {0xc0009ce000, 0x37e00, 0x37e00}) /Users/david/.asdf/installs/golang/1.22.0/go/src/compress/gzip/gunzip.go:252 +0xa2 net/http.(*gzipReader).Read(0xc000637160, {0xc0009ce000, 0x37e00, 0x37e00}) /Users/david/.asdf/installs/golang/1.22.0/go/src/net/http/transport.go:2896 +0x195 github.com/aws/aws-sdk-go-v2/aws/transport/http.(*timeoutReadCloser).Read.func1() /Users/david/.asdf/installs/golang/1.22.0/packages/pkg/mod/github.com/aws/[email protected]/aws/transport/http/timeout_read_closer.go:48 +0x43 created by github.com/aws/aws-sdk-go-v2/aws/transport/http.(*timeoutReadCloser).Read in goroutine 49 /Users/david/.asdf/installs/golang/1.22.0/packages/pkg/mod/github.com/aws/[email protected]/aws/transport/http/timeout_read_closer.go:47 +0xfb

Reproduction Steps

Call GetRecords on a busy stream with a slow internet connection? Not really sure how to cause this directly since it happens when the data stream Im reading from is under higher load.

Possible Solution

No response

Additional Information/Context

Same as this issue that was closed for being stale.

AWS Go SDK V2 Module Versions Used

github.com/aws/aws-sdk-go-v2 v1.30.4 github.com/aws/aws-sdk-go-v2/config v1.27.28 github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.4

Compiler and Version used

go1.22.0 darwin/amd64

Operating System and version

OSX 14

davidwin93 avatar Aug 17 '24 19:08 davidwin93

Hi @davidwin93 ,

Like with the other issue you linked, the reproduction steps here are vague, so it's difficult for us to root cause this. From the stack trace, the error seems to be with gzip itself rather than the SDK, and this seems to manifest under specific networking conditions that are not straightforward to reproduce.

I tried giving reproducing this a fair shake, but my reproduction did not yield the reported panic behavior.

In my reproduction I have the following setup:

  1. Kinesis stream with 5 shards, pre-populated with 5000 records.
  2. producer function that writes 500 records every 3 seconds into the stream
  3. consumer function that reads in concurrently from all shards.

Consumer

func consume() {
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}

	client := kinesis.NewFromConfig(cfg)
	streamName := "foo-stream"

	listShardsOutput, err := client.ListShards(context.TODO(), &kinesis.ListShardsInput{
		StreamName: &streamName,
	})
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	recordCount := 0

	for _, shard := range listShardsOutput.Shards {
		wg.Add(1)
		go func(shardId string) {
			defer wg.Done()

			shardIteratorOutput, err := client.GetShardIterator(context.TODO(), &kinesis.GetShardIteratorInput{
				StreamName:        &streamName,
				ShardId:           &shardId,
				ShardIteratorType: types.ShardIteratorTypeTrimHorizon,
			})
			if err != nil {
				log.Printf("failed to get shard iterator for shard %s, %v", shardId, err)
				return
			}

			shardIterator := shardIteratorOutput.ShardIterator
			if shardIterator == nil || *shardIterator == "" {
				log.Printf("No initial shard iterator for shard %s", shardId)
				return
			}

			for {
				if shardIterator == nil || *shardIterator == "" {
					log.Printf("Shard %s, no more records or iterator expired", shardId)
					break
				}

				getRecordsOutput, err := client.GetRecords(context.TODO(), &kinesis.GetRecordsInput{
					ShardIterator: shardIterator,
				})
				if err != nil {
					log.Printf("failed to get records from shard %s, %v", shardId, err)
					break
				}

				log.Printf("Shard %s, fetched %d records", shardId, len(getRecordsOutput.Records))
				for _, record := range getRecordsOutput.Records {
					fmt.Printf("Shard %s, Consumed: %s\n", shardId, string(record.Data))
					recordCount++
				}

				shardIterator = getRecordsOutput.NextShardIterator
				if len(getRecordsOutput.Records) == 0 {
					log.Printf("Shard %s, no new records, sleeping for a bit...", shardId)
					time.Sleep(1 * time.Second)
				}
			}
		}(aws.ToString(shard.ShardId))
	}

	wg.Wait()
}

Producer

func produce() {
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
	if err != nil {
		panic(err)
	}

	client := kinesis.NewFromConfig(cfg)
	for {
		var wg sync.WaitGroup

		for i := 0; i < 500; i++ {
			wg.Add(1)
			go func(i int) {
				defer wg.Done()
				data := fmt.Sprintf("record-%d-%d", i, rand.Intn(100))
				_, err := client.PutRecord(context.TODO(), &kinesis.PutRecordInput{
					StreamName:   aws.String("foo-stream"),
					PartitionKey: aws.String(fmt.Sprintf("partition-%d", i%10)),
					Data:         []byte(data),
				})
				if err != nil {
					log.Printf("failed to put record, %v", err)
					return
				}
				log.Printf("Produced: %s", data)
			}(i)
		}

		wg.Wait()
		time.Sleep(3 * time.Second)
	}
}

I'm running this from a Docker container using golang:1.20-alpine as the image. I'm also using the tc (traffic control) tool to simulate slower connection:

$ tc qdisc add dev eth0 root tbf rate 30mbit burst 32kbit latency 50ms

And then running my code with the race flag:

$ go run main.go -race

I let this app run for about 45min and I'm not seeing any panics / errors happening.

If you have any pointers on how to tweak this example to raise the reported behavior I can take another look.

Thanks, Ran~

RanVaknin avatar Aug 21 '24 20:08 RanVaknin

This issue has not received a response in 1 week. If you want to keep this issue open, please just leave a comment below and auto-close will be canceled.

github-actions[bot] avatar Sep 01 '24 00:09 github-actions[bot]

Hey Ran thanks for looking into this. Im able to recreate this by pushing realtime logs from cloudfront to Kinesis and attaching a consumer on a "limited" internet connection. In this case the connection is stable at around 300Mb/sec. Running this with our EKS cluster I don't see any issues which makes sense given the higher networking performance.

The reason I raised this issue is that I don't expect a library to panic but instead I would expect an error to be raised to my GetRecords call for example.

davidwin93 avatar Sep 03 '24 19:09 davidwin93

To be clear, a panic in the stdlib is a panic. There's nothing we can do once that happens. The best we can do is ensure we're not setting up the scenario in which that panic occurs. Whether or not we're actively doing that remains to be seen.

lucix-aws avatar Sep 06 '24 19:09 lucix-aws

@lucix-aws in your last post you mentioned "whether or not we're actively doing that remains to be seen". Any new findings regarding that?

We think we are experiencing a similar issue. Here are the panic logs:

`panic: runtime error: index out of range [4096] with length 4096

goroutine 7796548085 [running]: bufio.(*Reader).ReadByte(0xc1d96594a0) bufio/bufio.go:271 +0xa5 compress/flate.(*decompressor).huffSym(0xc1760dd908, 0xc1760dd938) compress/flate/inflate.go:720 +0x102 compress/flate.(*decompressor).huffmanBlock(0x697d440?) compress/flate/inflate.go:495 +0x45 compress/flate.(*decompressor).Read(0xc1760dd908, {0xc0020f4000, 0x2000, 0x52171a?}) compress/flate/inflate.go:348 +0x5b compress/gzip.(*Reader).Read(0xc171ea6588, {0xc0020f4000, 0x2000, 0x2000}) compress/gzip/gunzip.go:252 +0xa2 net/http.(*gzipReader).Read(0xc1723792e0, {0xc0020f4000, 0x2000, 0x2000}) net/http/transport.go:2911 +0x195 github.com/aws/aws-sdk-go-v2/aws/transport/http.(*timeoutReadCloser).Read.func1() github.com/aws/[email protected]/aws/transport/http/timeout_read_closer.go:48 +0x43 created by github.com/aws/aws-sdk-go-v2/aws/transport/http.(*timeoutReadCloser).Read in goroutine 204173 github.com/aws/[email protected]/aws/transport/http/timeout_read_closer.go:47 +0xfb `

We're using https://github.com/aws/aws-sdk-go-v2/tree/main/service/kinesis v1.29.8.

Looking at the changelog I see the latest version is v1.33.2. Maybe we simply need to upgrade?

sky-sharma avatar Apr 16 '25 18:04 sky-sharma

Is it possible that this is caused by a race condition in the timeout_read_closer.go? I don't believe that the io.ReadCloser interface is guaranteed to be concurrency-safe, yet in timeout_read_closer.go, the read is happening in a goroutine:

func (r *timeoutReadCloser) Read(b []byte) (int, error) {
    timer := time.NewTimer(r.duration)
    c := make(chan readResult, 1)

    go func() {
        n, err := r.reader.Read(b)
        timer.Stop()
        c <- readResult{n: n, err: err}
    }()

    select {
    case data := <-c:
        return data.n, data.err
    case <-timer.C:
        return 0, &ResponseTimeoutError{TimeoutDur: r.duration}
    }
}

If Read hits the timeout, it will return from the function, but the goroutine will continue to run r.reader.Read in the background. Any subsequent calls to Read may result in a race condition. By default, the kinesis GetRecords function will attempt to fetch 3 times, so this scenario is possible.

I've written a middleware to add locks around the reading to stop this race condition. I believe this should fix the issue for now:

func withAtomicReadCloser(o *kinesis.Options) {
	o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error {
		return stack.Deserialize.Add(&atomicReadCloserMiddleware{}, middleware.After)
	})
}

type atomicReadCloserMiddleware struct{}

func (m *atomicReadCloserMiddleware) ID() string {
	return "atomicReadCloser"
}

// HandleDeserialize implements the DeserializeMiddleware interface
func (m *atomicReadCloserMiddleware) HandleDeserialize(
	ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler,
) (
	out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
) {
	out, metadata, err = next.HandleDeserialize(ctx, in)
	if err != nil {
		return out, metadata, err
	}

	response, ok := out.RawResponse.(*smithyhttp.Response)
	if !ok {
		return out, metadata, &smithy.DeserializationError{Err: fmt.Errorf("unknown transport type %T", out.RawResponse)}
	}

	response.Body = &atomicReadCloser{
		reader: response.Body,
	}
	out.RawResponse = response

	return out, metadata, err
}

type atomicReadCloser struct {
	lock   sync.Mutex
	reader io.ReadCloser
}

func (m *atomicReadCloser) Read(b []byte) (int, error) {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.reader.Read(b)
}

func (m *atomicReadCloser) Close() error {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.reader.Close()
}

However, I'm wondering if we should be adding this locking logic directly to the timeoutReadCloser.

Would like to hear some feedback from maintainers. Let me know if you think that this is a plausible reason for the panic and if this is a good solution. If so, I can open up a PR for the timeoutReadCloser.

mattwalo32 avatar May 16 '25 15:05 mattwalo32

So this "read timeout" customization is only applied to kinesis GetRecords. There's absolutely no information about why it's there - in THIS repository - but if you go look at v1, you can see the PR that adds it: https://github.com/aws/aws-sdk-go/pull/1166. The PR doesn't link any issues directly, but if you look on that page and follow some links you'll find several:

  • https://github.com/aws/aws-sdk-go/issues/301
  • https://github.com/aws/aws-sdk-go/issues/1037
  • https://github.com/aws/aws-sdk-go/issues/1141

So basically, people were having hangup issues with GetRecords 10 years ago and they decided to throw this custom behavior in there. There doesn't appear to be any record of cross-SDK discussion around this or an attempt to standardize this custom behavior. It was likely added into the v2 SDK simply because it was there in v1.

It could be a concurrency/locking issue, which we could maybe fix, but frankly I think it just needs to be taken out back. This was a decade ago, this custom timeout logic doesn't appear in any other SDK to my knowledge and I haven't heard even a whisper of this being an issue for anyone. In my experience, electing to impose timeouts on the caller's behalf (especially just based on anecdotes of "it takes a long time") is almost always the wrong decision and tends to create more problems that it solves.

So I think it makes the most sense to simply revert to neutral in this case. If people are having issues with kinesis API timeouts in this SDK after that, we can bring it up for discussion internally and do a proper customization this time that all SDKs will benefit from. The caller can always wrap their outer operation call in a context deadline/timeout as a quick workaround to a hanging operation call.

lucix-aws avatar Oct 29 '25 17:10 lucix-aws

This issue is now closed. Comments on closed issues are hard for our team to see. If you need more assistance, please open a new issue that references this one.

github-actions[bot] avatar Oct 29 '25 21:10 github-actions[bot]