aws-sdk-go-v2 icon indicating copy to clipboard operation
aws-sdk-go-v2 copied to clipboard

Downloader Concurrency of 1 does NOT download parts sequentially

Open david-christ opened this issue 3 years ago • 9 comments

Documentation

Describe the bug

https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/s3/manager#Downloader states

Concurrency of 1 will download the parts sequentially.

It seems that in about 0.5% of cases this is not the case (sample size of 3400 calls).

Expected behavior

Download should always be sequential, as per documentation.

Current behavior

Download is not sequential in about 0.5% of calls.

Steps to Reproduce

Client usage (error handling omitted for brevity)

cfg, err := config.LoadDefaultConfig(ctx)
s3Client, err := s3.NewFromConfig(cfg)

downloader := manager.NewDownloader(s3Client, func(d *manager.Downloader) {
	d.Concurrency = 1
})
// With: bucket string, key string, writer pipe.PipeWriter
_, err := downloader.Download(ctx, NewSequentialWriterAt(writer), &s3.GetObjectInput{
	Bucket: &bucket,
	Key:    &key,
})
return err

With a wrapper around the pipe writer to ensure sequential writes by verifying offsets are sequential.

func (pwa *SequentialWriterAt) WriteAt(p []byte, offset int64) (n int, err error)

Possible Solution

No response

AWS Go SDK version used

1.7.2

Compiler and Version used

1.x

Operating System and version

AWS Lambda

david-christ avatar Dec 14 '21 06:12 david-christ

Would you be able to provide information on what the upload object sizes (in bytes) are when this happens?

skmcgrail avatar Dec 14 '21 18:12 skmcgrail

Hi @skmcgrail , one file I observed this happening was around 25 MB. Generally, our artifacts are around 10 MB to 200 MB.

david-christ avatar Dec 14 '21 22:12 david-christ

For reference, I am storing the offset expected to be written to next like this

func (swa *SequentialWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
    // ...
    swa.offset = swa.offset + int64(len(p)) // swa.offset starts at 0
    // ...
}

Some of the error messages I am seeing:

  • Expected:5296343, Attempted:5242880
  • Expected:348967595, Attempted:346030080
  • Expected:18832252, Attempted:15728640
  • Expected:1150408, Attempted:0
  • Expected:5296324, Attempted:5242880

david-christ avatar Dec 15 '21 00:12 david-christ

OK, new insights: After checking the error logs, I noticed that the attempted writes were always smaller offsets, i.e. chunks we have already seen and never ahead. I also saw in my logs that my consumer finishes before the downloader finishes. Hence I conclude that when the consumer closed the writer, it was causing an error in the Manager, prompting it to do a retry, thus attempting writes of chunks we have already seen.

This opens up another possible issue though: I changed the code so that only the S3 Manager Downloader would end the process. Now, the consumer clearly logs an EOF after seconds, signalling the whole files has been processed, but still the Lambda that's running this code times out after minutes.

So my new assumption is that the Manager for some reason does not return, even though it has downloaded the whole file. I have added log statements right before and after calling downloader.Download, and I do see the start, but never the finish statement. Looking at the processed output, it clearly got all of the file and processing succeeded.

So my question: What are the scenarios where the Downloader would not return, and are there ways on the SDK user's end to mitigate this (or might it be a bug in the manager)?

Edit: I should also mention that the Downloader not returning happens deterministically for certain files, i.e. a retry does not circumvent the issue. This happens for roughly 0.1% of the files that the system handles.

david-christ avatar Jan 17 '22 01:01 david-christ

@david-christ Is this still an issue? We were looking at this recently for streaming s3 objects and were concerned when we came across this issue.

microbioticajon avatar Aug 18 '22 15:08 microbioticajon

Hello @microbioticajon , we have worked around the issues using a few tricks, so the application itself is working. But I wouldn't be able to tell how often the error itself occurs at me moment.

david-christ avatar Aug 19 '22 01:08 david-christ

Can confirm this is still an issue - it occurs intermittently - and definitely seems to do with order of operations. We're using an io.Pipe rather than a byte array - keeps the memory usage way down and usually works beautifully. safetypes is just a unit64 with a RWMutex wrapped around it. I originally went with atomics and thought that's where the issue was, but no dice.

func NewStreamingBuffer() *StreamingBuffer {
	pipeReader, pipeWriter := io.Pipe()
	return &StreamingBuffer{
		pipeReader: pipeReader,
		pipeWriter: pipeWriter,
		byteCount:  safetypes.NewSafeUInt(0),
		currOffset: safetypes.NewSafeUInt(0),
	}
}

func (wab *StreamingBuffer) Write(p []byte) (n int, err error) {
	if wab.isDecoder {
		// atomic.AddUint64(&wab.byteCount, uint64(len(p)))
		wab.byteCount.Add(uint64(len(p)))
	}
	n, err = wab.pipeWriter.Write(p)
	wab.currOffset.Add(uint64(n))
	// atomic.AddUint64(&wab.currOffset, uint64(n))
	return n, err
}

// WriteAt is used exclusively by S3
// Since this is a buffer that streams directly into a processor pipe, order matters.
// This should be used with an S3Downloader concurrency of 1 to preserve order.
// This function panics if the bytes are returned out of order
func (wab *StreamingBuffer) WriteAt(p []byte, offset int64) (n int, err error) {
	if wab.CurrentOffset() != uint64(offset) && offset != 0 {
		// If we are using S3 with concurrency, offset is not provided, which results in 0
		msg := fmt.Sprintf("StreamingBuffer does not support concurrency as it streams into an I/O pipe: currOffset=%d, providedOffset=%d", wab.CurrentOffset(), offset)
		panic(msg)
	}
	return wab.Write(p)
}

func (wab *StreamingBuffer) Read(p []byte) (n int, err error) {
	n, err = wab.pipeReader.Read(p)
	// atomic.AddUint64(&wab.byteCount, uint64(n))
	wab.byteCount.Add(uint64(n))
	return n, err
}
func (wab *StreamingBuffer) Close() error {
	return wab.pipeWriter.Close()
}
func (wab *StreamingBuffer) CloseWithError(err error) error {
	if err == nil {
		return nil
	}
	if wab.err != io.EOF {
		// Only store errors we care about programmatically
		wab.err = err
	}
	// Still pass io.EOF through
	return wab.pipeWriter.CloseWithError(err)
}
func (wab *StreamingBuffer) NewJSONDecoder() *json.Decoder {
	wab.isDecoder = true
	return json.NewDecoder(wab.pipeReader)
}

This approach allows us to pipe the download stream from S3 straight into a JSON decoder and intermittently, we see panicks, but different sizes of files - different data - different buckets.

e.g.

panic: StreamingBuffer does not support concurrency as it streams into an I/O pipe: currOffset=34278266, providedOffset=20971520

It definitely seems to relate to the WriteAt implementation - but I'm not certain what else to try to get it working correctly.

Example usage:

// Create a new buffer for downloading
jsonBuf := NewStreamingBuffer()
jsonDecoder := jsonBuf.NewJSONDecoder()
jsonBuf.Add(1)
go func() {
        // Spawn a routine which starts loading up the writer on the io.Pipe
	defer jsonBuf.Done()
	defer jsonBuf.Close()
	t.Log("Started file download...")
	is.NoError(kts3.DownloadFile(fullS3Uri, jsonBuf))
}()
// Loop over the jsonDecoder (attached to the io.Pipe reader)
for jsonDecoder.More() {
	foundSomething = true
	var resp responsetypes.CFResponse
	err := jsonDecoder.Decode(&resp)
	var e *json.SyntaxError
	if errors.As(err, &e) && strings.ContainsRune(e.Error(), ']') {
		// Not an actual error - just the end of an error
		break
	} else {
		is.NoError(err, "Could not decode the file into a CFResponse")
	}
	is.NotEmpty(resp.TestID, "The test ID wasn't populated")
	t.Log("Decoded response...")
}

^^ That code works great usually, it just intermittently gets this out of order bytes issue.

I agree with @david-christ 's assessment that it's possible that somehow S3 is getting an error and retrying a prior chunk, but I'm at a loss as to how to fix it.

TopherGopher avatar Aug 16 '23 18:08 TopherGopher

I agree with @david-christ 's assessment that it's possible that somehow S3 is getting an error and retrying a prior chunk, but I'm at a loss as to how to fix it.

You could try setting the retries in the SDK to 0 and see if that "fixes it", as in: the SDK will no longer retry and give you the out of order issue, but instead you'll get an error and have to retry yourself.

Cheers!

alexaandru avatar Aug 17 '23 06:08 alexaandru

Should become OBE when we address #2247, since exposing the autoparallelized object output through io.Reader will guarantee the object content is pulled from the wire in order.

lucix-aws avatar May 10 '24 16:05 lucix-aws