tapir icon indicating copy to clipboard operation
tapir copied to clipboard

Multipart upload content streamed

Open tg44 opened this issue 3 years ago • 9 comments

Hy!

Most of the clients handle multipart-form uploads as a convenient file upload mechanism. If I want to catch a file upload stream, and write it to S3 on the fly for example, in the current implementations I either need to parse the raw bytestream somehow, to actually stream the data, or I need to write down the file to the filesystem, and upload from there. The second method is an obvious attack surface, and also super slow for large files (multiple fast attackers -> big files -> full hdd -> dos).

We generally upload one file at a time, but with some tinkering, it could work with real multipart too (I don't want this right now, but the sake of completeness I write down my ideas). The API could look like something like

def multipartBodySinglePart(partName: String)(s: Streams[S])(schema: Schema[s.BinaryStream], format: CodecFormat, charset: Option[Charset] = None): StreamBodyIO[s.BinaryStream, s.BinaryStream, S]

which is almost the same as streamBody just seeks into the required part, send it downstream, and when the downstream finishes or the part is finished, seeks to the end of the request. The extended version could be something like a stream of (PartHeader | PartBytes) and the downstream could build a state machine or other logic to drop/seek the unneeded parts and process the PartBytes with the speed of the real downstream application logic. What we absolutely can't do is a Source[(PartHeader, Source[ByteString])] bcs things like mapAsyncUnordered(4) or buffer(10) would ruin the whole streaming. We need something like a Source[Either[PartHeader, PartBytes]] or something similar (like a parent sealed trait).

Prebuild stuff supported by the server interpreters:

  • Akka directive; https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/file-upload-directives/fileUpload.html
  • Play method; https://dzone.com/articles/stream-a-file-to-aws-s3-using-akka-streams-in-play
  • http4s; quick google didn't helped me out, it seems like it materializes a Part stream, but I never used http4s so no idea if that is really a stream or it is using filesystem caching
  • finatra; https://github.com/twitter/finatra/issues/538 no support yet?
  • aws; no idea

Also, we should somewhat document the accepted input multipart data format. At this point I would allow a non compile-time checked custom Schema.

tg44 avatar Jul 03 '21 11:07 tg44

This is precisely my use-case, I'm uploading multiple files in one multipart form and I'd like to stream each one directly to s3 without writing to disk. http4s supports streaming individual Parts out of the box, if that's any help

gerryfletch avatar Feb 08 '22 21:02 gerryfletch

@gerryfletch as far as I remember http4s writes each part to disk if they are larger than a given threshold and then streams data from there. But this might have changed or it might be a special case.

That said, a streaming multipart body would be nice to have. Probably not valid anymore, but @tg44 maybe you remember why a Stream[Part[Stream[Byte]] wouldn't work for you? (where Part comes from sttp.model and Stream is one of the supported streamign abstractions)

adamw avatar Feb 14 '22 16:02 adamw

If I remember well Part[Stream[Byte]] writes (or at least wrote) to the underlying volume, which can be an attack vector. Also, a Stream[Stream[Sth]] is not really good as an api in this case bcs we can't read the substreams parallel. (If I write a stream.mapAsyncUnordered(4)(_.runWith(???))), this should fail if we don't use filesystems.

tg44 avatar Feb 15 '22 15:02 tg44

@tg44 good points, thanks! But then with Source[Either[PartHeader, PartBytes]] we have to read whole parts into memory ... unless we do Source[Either[PartHeader, PartBytesChunk]] or sth like that.

adamw avatar Feb 16 '22 14:02 adamw

Yapp, for me PartBytes is a chunk of data, so the elements in the stream would be Left, Right, Right, Left, Right, Right, Right and so on. You could seek into the needed part with a simple state-machine, and can process the chunks as a Stream[Byte] with a little stream machinery if you really want to.

Probably we could add helpers to it like a consume(s: Source[...], handlers: Map[HeaderSomething, Sink[Byte]]) and help with the seeking logic. (Or not a Map but even a PartialFunction?)

The bigger problem I see is when I wrote the comment, not all server interpreters enabled an api like this.

tg44 avatar Feb 16 '22 15:02 tg44

True we'd have to resort to documentation to specify where this is possible.

adamw avatar Feb 16 '22 17:02 adamw

Implementing this would require:

  1. introducing a Stream[_] and ByteChunk type aliases into the Streams capability, so that we can represent streams of arbitrary data
  2. or, providing stream parsers for fs2/zio/etc. which would allow mapping a raw binary stream into a stream of part headers/bytes

The first option would require quite far-reaching changes in the whole sttp stack, while the second could be added at a later point without breaking binary compatibility.

adamw avatar May 17 '22 07:05 adamw

Hey there 👋 Any updates about wether this can be done? I have file upload needs but seems like my only option at this point is to use Akka Http to get access to the fileUpload directive, or else I have to strictly write the file in memory 😢 Are there any work arounds in the meantime?

NavidJalali avatar Aug 31 '22 21:08 NavidJalali

@NavidJalali well it can be done (using the second approach described above), but I'm afraid there's no progress on implementing this.

If you don't know the part names upfront, you can create a multipartBody which will by default read data into a file, instead of a byte array - then you'll get a sequence of Part[File]. Would that be acceptable?

adamw avatar Sep 01 '22 16:09 adamw