elixir-google-api icon indicating copy to clipboard operation
elixir-google-api copied to clipboard

Google Cloud Storage stream upload

Open thdxr opened this issue 6 years ago • 8 comments

I've noticed in my metrics that I see ram spikes whenever we schedule uploads of large files to GCS. It seems the file is being loaded all into memory first before being uploaded, wanted to confirm if this was the case and if we could enable streaming

thdxr avatar Feb 05 '19 17:02 thdxr

Are you using the storage_objects_insert_simple method?
It receives the file content as an arg.

Can you provide me an example use case, how you're using it now, and how you would like to see it, as a stream?

(I agree with this request... but I'll have to see what streaming/chunking options I can wire in)

zeroasterisk avatar Mar 16 '19 04:03 zeroasterisk

I'm currently passing a file path pointing to the file to storage_objects_insert_simple

Can I pass a stream instead?

thdxr avatar Mar 18 '19 14:03 thdxr

No. It would still consume the entire file into a string to send as a POST to the Google API.

I would love to research this and determine how to either stream over a socket, or multi-part chunk the file... But I'm not sure when I'll get to it.

Is this a blocker for you?

zeroasterisk avatar Mar 18 '19 16:03 zeroasterisk

Not a blocker yet. File sizes aren't unreasonably big yet but probably will become an issue in a few months. No rush

thdxr avatar Mar 18 '19 16:03 thdxr

:+1: I'd also like to see this feature added.

col avatar Jun 02 '20 04:06 col

I've run into this problem now, the data I want to upload as a single file is too large for memory. It's actually coming out of the DB as a lazy stream, but as mentioned above it seems the upload function (I'm using storage_objects_insert_iodata) is consuming the whole stream into memory before doing any network transfers.

Are there APIs for streaming/chunked uploads on the gcloud side? Is it just a matter of implementing an elixir interface for these if they exist?

mathiasose avatar Sep 20 '21 19:09 mathiasose

I'd really benefit from this too. It seems like it's already supported in APIs for other languages 🤔

spencerkent avatar Oct 26 '22 21:10 spencerkent

This is a pretty big shortcoming. Fortunately just a shortcoming of the library and not the API. I was thinking we'd have to move to a different cloud provider.

It also looks like even if you do have quite a lot of memory, storage_objects_insert_iodata just returns {:error, :closed} for large files, because google has an absolute timeout on the connection, rather than an idle timeout + connect timeout.

here's a workaround. the docs were inconsistent about whether the API required you to have exactly a multiple of a 256kib chunk or "close enough" (except for the final chunk) so i chose exactly. note that this does 0 integrity checks, so you'll maybe want to adapt it.

def put(bucket, name, bytes, progress \\ fn _ -> :ok end) do
    obj = %GoogleApi.Storage.V1.Model.Object{name: name, contentType: "application/octet-stream"}

    {:ok, %Tesla.Env{headers: headers}} =
      Objects.storage_objects_insert_resumable(connection(), bucket, "resumable", body: obj)

    {_, location} =
      Enum.find(headers, fn
        {"location", _} -> true
        _ -> false
      end)

    chunk_size = (4 * (256 * 1024)) * 8

    bytes
    |> Stream.concat([:end])
    |> Stream.transform(<<>>, fn
      :end, buf ->
        {[{true, buf}], <<>>}

      chunk, buf ->
        buf = buf <> chunk

        if byte_size(buf) >= chunk_size do
          emit = binary_part(buf, 0, chunk_size)
          keep = binary_part(buf, chunk_size, byte_size(buf) - chunk_size)
          {[{false, emit}], keep}
        else
          {[], buf}
        end
    end)
    |> Enum.reduce(0, fn {is_final, chunk}, from_byte ->
      to_byte = from_byte + byte_size(chunk)

      final_size =
        if is_final do
          to_byte
        else
          "*"
        end

      Finch.build(
        :put,
        location,
        [
          {"Content-Length", to_string(byte_size(chunk))},
          {"Content-Range", "bytes #{from_byte}-#{to_byte - 1}/#{final_size}"}
        ],
        chunk
      )
      |> Finch.request(__MODULE__)
      |> case do
        {:ok, %{status: 308}} when not is_final -> :ok
        {:ok, %{status: 200}} when is_final -> :ok
      end

      progress.({from_byte, to_byte})

      to_byte
    end)

    :ok
  end

rozap avatar Feb 02 '23 18:02 rozap