elixir-google-api
elixir-google-api copied to clipboard
Google Cloud Storage stream upload
I've noticed in my metrics that I see ram spikes whenever we schedule uploads of large files to GCS. It seems the file is being loaded all into memory first before being uploaded, wanted to confirm if this was the case and if we could enable streaming
Are you using the storage_objects_insert_simple
method?
It receives the file content as an arg.
Can you provide me an example use case, how you're using it now, and how you would like to see it, as a stream?
(I agree with this request... but I'll have to see what streaming/chunking options I can wire in)
I'm currently passing a file path pointing to the file to storage_objects_insert_simple
Can I pass a stream instead?
No. It would still consume the entire file into a string to send as a POST to the Google API.
I would love to research this and determine how to either stream over a socket, or multi-part chunk the file... But I'm not sure when I'll get to it.
Is this a blocker for you?
Not a blocker yet. File sizes aren't unreasonably big yet but probably will become an issue in a few months. No rush
:+1: I'd also like to see this feature added.
I've run into this problem now, the data I want to upload as a single file is too large for memory. It's actually coming out of the DB as a lazy stream, but as mentioned above it seems the upload function (I'm using storage_objects_insert_iodata
) is consuming the whole stream into memory before doing any network transfers.
Are there APIs for streaming/chunked uploads on the gcloud side? Is it just a matter of implementing an elixir interface for these if they exist?
I'd really benefit from this too. It seems like it's already supported in APIs for other languages 🤔
This is a pretty big shortcoming. Fortunately just a shortcoming of the library and not the API. I was thinking we'd have to move to a different cloud provider.
It also looks like even if you do have quite a lot of memory, storage_objects_insert_iodata
just returns {:error, :closed}
for large files, because google has an absolute timeout on the connection, rather than an idle timeout + connect timeout.
here's a workaround. the docs were inconsistent about whether the API required you to have exactly a multiple of a 256kib chunk or "close enough" (except for the final chunk) so i chose exactly. note that this does 0 integrity checks, so you'll maybe want to adapt it.
def put(bucket, name, bytes, progress \\ fn _ -> :ok end) do
obj = %GoogleApi.Storage.V1.Model.Object{name: name, contentType: "application/octet-stream"}
{:ok, %Tesla.Env{headers: headers}} =
Objects.storage_objects_insert_resumable(connection(), bucket, "resumable", body: obj)
{_, location} =
Enum.find(headers, fn
{"location", _} -> true
_ -> false
end)
chunk_size = (4 * (256 * 1024)) * 8
bytes
|> Stream.concat([:end])
|> Stream.transform(<<>>, fn
:end, buf ->
{[{true, buf}], <<>>}
chunk, buf ->
buf = buf <> chunk
if byte_size(buf) >= chunk_size do
emit = binary_part(buf, 0, chunk_size)
keep = binary_part(buf, chunk_size, byte_size(buf) - chunk_size)
{[{false, emit}], keep}
else
{[], buf}
end
end)
|> Enum.reduce(0, fn {is_final, chunk}, from_byte ->
to_byte = from_byte + byte_size(chunk)
final_size =
if is_final do
to_byte
else
"*"
end
Finch.build(
:put,
location,
[
{"Content-Length", to_string(byte_size(chunk))},
{"Content-Range", "bytes #{from_byte}-#{to_byte - 1}/#{final_size}"}
],
chunk
)
|> Finch.request(__MODULE__)
|> case do
{:ok, %{status: 308}} when not is_final -> :ok
{:ok, %{status: 200}} when is_final -> :ok
end
progress.({from_byte, to_byte})
to_byte
end)
:ok
end