aws-sdk-go-v2
aws-sdk-go-v2 copied to clipboard
Downloader Concurrency of 1 does NOT download parts sequentially
Documentation
- [X] I've gone though the API reference
- [X] I've checked AWS Forums and StackOverflow for answers
- [X] I've searched for previous similar issues and didn't find any solution
Describe the bug
https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/s3/manager#Downloader states
Concurrency of 1 will download the parts sequentially.
It seems that in about 0.5% of cases this is not the case (sample size of 3400 calls).
Expected behavior
Download should always be sequential, as per documentation.
Current behavior
Download is not sequential in about 0.5% of calls.
Steps to Reproduce
Client usage (error handling omitted for brevity)
cfg, err := config.LoadDefaultConfig(ctx)
s3Client, err := s3.NewFromConfig(cfg)
downloader := manager.NewDownloader(s3Client, func(d *manager.Downloader) {
d.Concurrency = 1
})
// With: bucket string, key string, writer pipe.PipeWriter
_, err := downloader.Download(ctx, NewSequentialWriterAt(writer), &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
return err
With a wrapper around the pipe writer to ensure sequential writes by verifying offsets are sequential.
func (pwa *SequentialWriterAt) WriteAt(p []byte, offset int64) (n int, err error)
Possible Solution
No response
AWS Go SDK version used
1.7.2
Compiler and Version used
1.x
Operating System and version
AWS Lambda
Would you be able to provide information on what the upload object sizes (in bytes) are when this happens?
Hi @skmcgrail , one file I observed this happening was around 25 MB. Generally, our artifacts are around 10 MB to 200 MB.
For reference, I am storing the offset expected to be written to next like this
func (swa *SequentialWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
// ...
swa.offset = swa.offset + int64(len(p)) // swa.offset starts at 0
// ...
}
Some of the error messages I am seeing:
- Expected:5296343, Attempted:5242880
- Expected:348967595, Attempted:346030080
- Expected:18832252, Attempted:15728640
- Expected:1150408, Attempted:0
- Expected:5296324, Attempted:5242880
OK, new insights: After checking the error logs, I noticed that the attempted writes were always smaller offsets, i.e. chunks we have already seen and never ahead. I also saw in my logs that my consumer finishes before the downloader finishes. Hence I conclude that when the consumer closed the writer, it was causing an error in the Manager, prompting it to do a retry, thus attempting writes of chunks we have already seen.
This opens up another possible issue though: I changed the code so that only the S3 Manager Downloader would end the process. Now, the consumer clearly logs an EOF after seconds, signalling the whole files has been processed, but still the Lambda that's running this code times out after minutes.
So my new assumption is that the Manager for some reason does not return, even though it has downloaded the whole file. I have added log statements right before and after calling downloader.Download
, and I do see the start, but never the finish statement. Looking at the processed output, it clearly got all of the file and processing succeeded.
So my question: What are the scenarios where the Downloader would not return, and are there ways on the SDK user's end to mitigate this (or might it be a bug in the manager)?
Edit: I should also mention that the Downloader not returning happens deterministically for certain files, i.e. a retry does not circumvent the issue. This happens for roughly 0.1% of the files that the system handles.
@david-christ Is this still an issue? We were looking at this recently for streaming s3 objects and were concerned when we came across this issue.
Hello @microbioticajon , we have worked around the issues using a few tricks, so the application itself is working. But I wouldn't be able to tell how often the error itself occurs at me moment.
Can confirm this is still an issue - it occurs intermittently - and definitely seems to do with order of operations. We're using an io.Pipe rather than a byte array - keeps the memory usage way down and usually works beautifully. safetypes is just a unit64 with a RWMutex wrapped around it. I originally went with atomics and thought that's where the issue was, but no dice.
func NewStreamingBuffer() *StreamingBuffer {
pipeReader, pipeWriter := io.Pipe()
return &StreamingBuffer{
pipeReader: pipeReader,
pipeWriter: pipeWriter,
byteCount: safetypes.NewSafeUInt(0),
currOffset: safetypes.NewSafeUInt(0),
}
}
func (wab *StreamingBuffer) Write(p []byte) (n int, err error) {
if wab.isDecoder {
// atomic.AddUint64(&wab.byteCount, uint64(len(p)))
wab.byteCount.Add(uint64(len(p)))
}
n, err = wab.pipeWriter.Write(p)
wab.currOffset.Add(uint64(n))
// atomic.AddUint64(&wab.currOffset, uint64(n))
return n, err
}
// WriteAt is used exclusively by S3
// Since this is a buffer that streams directly into a processor pipe, order matters.
// This should be used with an S3Downloader concurrency of 1 to preserve order.
// This function panics if the bytes are returned out of order
func (wab *StreamingBuffer) WriteAt(p []byte, offset int64) (n int, err error) {
if wab.CurrentOffset() != uint64(offset) && offset != 0 {
// If we are using S3 with concurrency, offset is not provided, which results in 0
msg := fmt.Sprintf("StreamingBuffer does not support concurrency as it streams into an I/O pipe: currOffset=%d, providedOffset=%d", wab.CurrentOffset(), offset)
panic(msg)
}
return wab.Write(p)
}
func (wab *StreamingBuffer) Read(p []byte) (n int, err error) {
n, err = wab.pipeReader.Read(p)
// atomic.AddUint64(&wab.byteCount, uint64(n))
wab.byteCount.Add(uint64(n))
return n, err
}
func (wab *StreamingBuffer) Close() error {
return wab.pipeWriter.Close()
}
func (wab *StreamingBuffer) CloseWithError(err error) error {
if err == nil {
return nil
}
if wab.err != io.EOF {
// Only store errors we care about programmatically
wab.err = err
}
// Still pass io.EOF through
return wab.pipeWriter.CloseWithError(err)
}
func (wab *StreamingBuffer) NewJSONDecoder() *json.Decoder {
wab.isDecoder = true
return json.NewDecoder(wab.pipeReader)
}
This approach allows us to pipe the download stream from S3 straight into a JSON decoder and intermittently, we see panicks, but different sizes of files - different data - different buckets.
e.g.
panic: StreamingBuffer does not support concurrency as it streams into an I/O pipe: currOffset=34278266, providedOffset=20971520
It definitely seems to relate to the WriteAt implementation - but I'm not certain what else to try to get it working correctly.
Example usage:
// Create a new buffer for downloading
jsonBuf := NewStreamingBuffer()
jsonDecoder := jsonBuf.NewJSONDecoder()
jsonBuf.Add(1)
go func() {
// Spawn a routine which starts loading up the writer on the io.Pipe
defer jsonBuf.Done()
defer jsonBuf.Close()
t.Log("Started file download...")
is.NoError(kts3.DownloadFile(fullS3Uri, jsonBuf))
}()
// Loop over the jsonDecoder (attached to the io.Pipe reader)
for jsonDecoder.More() {
foundSomething = true
var resp responsetypes.CFResponse
err := jsonDecoder.Decode(&resp)
var e *json.SyntaxError
if errors.As(err, &e) && strings.ContainsRune(e.Error(), ']') {
// Not an actual error - just the end of an error
break
} else {
is.NoError(err, "Could not decode the file into a CFResponse")
}
is.NotEmpty(resp.TestID, "The test ID wasn't populated")
t.Log("Decoded response...")
}
^^ That code works great usually, it just intermittently gets this out of order bytes issue.
I agree with @david-christ 's assessment that it's possible that somehow S3 is getting an error and retrying a prior chunk, but I'm at a loss as to how to fix it.
I agree with @david-christ 's assessment that it's possible that somehow S3 is getting an error and retrying a prior chunk, but I'm at a loss as to how to fix it.
You could try setting the retries in the SDK to 0 and see if that "fixes it", as in: the SDK will no longer retry and give you the out of order issue, but instead you'll get an error and have to retry yourself.
Cheers!
Should become OBE when we address #2247, since exposing the autoparallelized object output through io.Reader
will guarantee the object content is pulled from the wire in order.