okhttp icon indicating copy to clipboard operation
okhttp copied to clipboard

okhttp's client can't be used for processing continuous streaming since Response body can't be read unbuffered

Open lfarkas opened this issue 2 years ago • 19 comments

I'd like to read a http client's response unbuffered mode, but with okhttp is not possible or at least i can't find any solution. why is it needed?

i've got a http event stream (it's actually comes from a hikvision camera /ISAPI/Event/notification/alertStream). which has a multipart response stream. so the response header is something like this:

HTTP/1.1 200 OK
MIME-Version: 1.0
Connection: close
Content-Type: multipart/mixed; boundary=boundary

while the response content is something like this:

--boundary
Content-Type: application/xml; charset="UTF-8"
Content-Length: 521

<EventNotificationAlert version="2.0" xmlns="http://www.std-cgi.com/ver20/XMLSchema">
<ipAddress>192.168.91.11</ipAddress>
<portNo>80</portNo>
<protocol>HTTP</protocol>
<macAddress>44:a6:42:57:42:77</macAddress>
<channelID>1</channelID>
<dateTime>2022-04-28T15:30:40+01:00</dateTime>
<activePostCount>0</activePostCount>
<channelName>PRO_PC_1080P_</channelName>
<eventType>videoloss</eventType>
<eventState>inactive</eventState>
<eventDescription>videoloss alarm</eventDescription>
</EventNotificationAlert>
--boundary
Content-Type: application/xml; charset="UTF-8"
Content-Length: 521

<EventNotificationAlert version="2.0" xmlns="http://www.std-cgi.com/ver20/XMLSchema">
<ipAddress>192.168.91.11</ipAddress>
<portNo>80</portNo>
<protocol>HTTP</protocol>
<macAddress>44:a6:42:57:42:77</macAddress>
<channelID>1</channelID>
<dateTime>2022-04-28T15:30:50+01:00</dateTime>
<activePostCount>0</activePostCount>
<channelName>PRO_PC_1080P_</channelName>
<eventType>videoloss</eventType>
<eventState>inactive</eventState>
<eventDescription>videoloss alarm</eventDescription>
</EventNotificationAlert>

I'd like to parse it with okhttp3.MultipartReader which seems easy and obvious, but NOT! at the first glance it's working, but there are many problem with the processing! it seems okhttp ALWAYS buffer the response stream, with is not acceptable in my case. why? since the above example each boundary is come at every 10 second. and since each section is 521 byte and okhttp's buiffer size is 512 byte i never get the first okhttp3.MultipartReader.Part just AFTER the second arrive (and this is true for all other event s too). so i always in 10 second late.

the best solution would be to be able to turn off buffering. and even better solution would be if the Part has Content-Length then the buffer size should have to be the same.

so after many hours of debugging we decided to change to other http client because with okhttp it's not possible to process continuous streaming http response in a timely manner.

lfarkas avatar Apr 28 '22 13:04 lfarkas

Executable test case?

swankjesse avatar Apr 29 '22 03:04 swankjesse

I attached a simple http server with a bug example: bug.zip

The server send on multipart every 10 seconds.

If you run both then in the client side you'll see such output:

Fri Apr 29 13:09:24 CEST 2022 ============= part: 1
Fri Apr 29 13:09:24 CEST 2022 read: 510 byte in part: 1
Fri Apr 29 13:09:34 CEST 2022 read: 519 byte in part: 1
Fri Apr 29 13:09:34 CEST 2022 after body close part: 1
Fri Apr 29 13:09:34 CEST 2022 ============= part: 2
Fri Apr 29 13:09:34 CEST 2022 read: 510 byte in part: 2
Fri Apr 29 13:09:44 CEST 2022 read: 519 byte in part: 2
Fri Apr 29 13:09:44 CEST 2022 after body close part: 2
Fri Apr 29 13:09:44 CEST 2022 ============= part: 3
Fri Apr 29 13:09:44 CEST 2022 read: 510 byte in part: 3

as you can see the last few byte of each part only arrives when the next part already arrive. so we've to wait for the next part even if the current already streamed and arrived.

lfarkas avatar Apr 29 '22 11:04 lfarkas

Executable test case?

i hope it's enough "Executable":-)

lfarkas avatar Apr 29 '22 11:04 lfarkas

I think our action is to not await the end of part delimiter before returning bytes. One further question is whether to do this if the content-length header is absent. I suspect doing it only when content-length is present is safe and simple, and we should enforce the the content-length is correct.

swankjesse avatar Apr 29 '22 11:04 swankjesse

Anyway it'd be useful if I could implement the server for this demo with mockWebServer but I can't find a way to do a stream web server from it...

lfarkas avatar Apr 29 '22 11:04 lfarkas

I think our action is to not await the end of part delimiter before returning bytes. One further question is whether to do this if the content-length header is absent. I suspect doing it only when content-length is present is safe and simple, and we should enforce the the content-length is correct.

imho if you run the test you can simple see that you DO wait for it.

anyway there is no such thing as "end of part delimiter"!

lfarkas avatar Apr 29 '22 12:04 lfarkas

the main problem is probably the buffered reader, since it's always wait the read buffer to be full.. an to reflect to your suggestion about the content-length...imho it's not the best way to base on that's correctness and existence...

lfarkas avatar Apr 29 '22 12:04 lfarkas

after further testing it seems there are only some kind of indexing problem which may be fixable easier then I assume. I attached a new client executable code.

  • if you run the readFromResponse function ie. every time you get the next part you read the body from the original response's body you can read the whole body part immediately, but you loose every second part:-)
  • if you read the part body from the parsed Part.body() you can read less byte, dut the number of the missing part is exactly the number of the header of the part. so it seems there should have to be some miss calculation somewhere. this remaining few bytes only arrive at the next part time...
import java.io.InputStream;
import java.util.Date;
import okhttp3.Headers;
import okhttp3.MultipartReader;
import okhttp3.MultipartReader.Part;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSource;

public class Bug {
  private static int readFromResponse(ResponseBody body, Part part, byte[] bytes) throws Exception {
    int read = 0;
    InputStream inputStream = body.byteStream();
    // for (read += inputStream.read(bytes); read < bytes.length; read =
    // inputStream.read(bytes)) {
    // System.err.println(new Date() + " read: " + read);
    // }
    // System.err.print(new String(bytes));
    // System.err.println(new Date() + " total");
    int c = -1;
    while ((read < bytes.length) && ((c = inputStream.read()) != -1)) {
      ++read;
      System.err.print((char) c);
    }
    System.err.println(new Date() + " last: " + c);
    return read;
  }

  private static int readFromPart(Part part, byte[] bytes) throws Exception {
    BufferedSource partBody = part.body();
    // partBody.require(bytes.length);
    int read = partBody.read(bytes);
    System.err.print(new String(bytes));
    System.err.println(new Date() + " total");
    return read;
    // while (read < contentLength) {
    // for (String line = partBody.readUtf8Line(); line != null; line =
    // partBody.readUtf8Line()) {
    // System.err.println(new Date() + " read: " + line + " line: " + i);
    // read += partBody.read(bytes, read, contentLength - read);
    // System.err.println(new Date() + " read: " + read + " byte in part: " + i);
    // }
  }

  public static void main(String[] args) {
    final OkHttpClient client = new OkHttpClient.Builder()
        .hostnameVerifier((s, sslSession) -> true)
        .build();
    Request request = new Request.Builder()
        .url("http://127.0.0.1:8000/stream")
        .build();
    try (Response response = client.newCall(request).execute()) {
      ResponseBody body = response.body();
      try (MultipartReader reader = new MultipartReader(response.body())) {
        int i = 0;
        while (true) {
          try (Part part = reader.nextPart()) {
            if (part == null) {
              break;
            }
            System.err.println(new Date() + " ============= part: " + ++i);
            Headers partHeaders = part.headers();
            int contentLength = Integer.parseInt(partHeaders.get("Content-Length"));
            // String typeHeader = partHeaders.get("Content-Type");
            // String contentType = typeHeader.substring(0, typeHeader.indexOf(";"));
            // String charEncoding = typeHeader.substring(typeHeader.indexOf("charset=")
            // + 8).replaceAll("^\"|\"$", "");
            System.err.println(new Date() + " ======= Content-Length: " + contentLength);
            byte[] bytes = new byte[contentLength];
            int read = readFromResponse(body, part, bytes);
            // int read = readFromPart(part, bytes);
            System.err.println(new Date() + " --------------- after body close part: " + i);
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

lfarkas avatar May 02 '22 21:05 lfarkas

I dug into this and I couldn’t find anything that specifies Content-Length as a header in a multipart body. I even stripped out that behavior from OkHttp’s MultipartBody class ’cause it seemed to make our users grumpy.

@lfarkas – you wouldn’t happen to have a spec that recommends that by chance?

I think we could fix this for you if the server is sending --boundary with the preceding part. We’d need to split some of our code to read --boundary and then either \r\n or -- rather than reading the two things at once.

swankjesse avatar Jun 13 '22 01:06 swankjesse

I dug into this and I couldn’t find anything that specifies Content-Length as a header in a multipart body. I even stripped out that behavior from OkHttp’s MultipartBody class ’cause it seemed to make our users grumpy.

@lfarkas – you wouldn’t happen to have a spec that recommends that by chance?

I think we could fix this for you if the server is sending --boundary with the preceding part. We’d need to split some of our code to read --boundary and then either \r\n or -- rather than reading the two things at once.

We don't have. actually the Hikvision camera's (which is the largest camera producer in the world) alert stream just work this way.

lfarkas avatar Jun 13 '22 05:06 lfarkas

of course the spec usually talks about when we know in advance the whole content and the whole length of the content, but it in case of a continuous event driven stream, the response's Content-Length is not known. and in this case the part's Content-Length is very useful.

lfarkas avatar Jun 13 '22 05:06 lfarkas

Would you consider forking MultipartReader to specialize it for your use case? I'm reluctant to add non-standard features.

swankjesse avatar Jun 13 '22 12:06 swankjesse

the problem here is not the non-standard feature, the real problem is that even if all bytes of the given part already arrive okhttp still not give back the part just when the next part start to arrive. this is because the buffering. the Content-Length in the part is only a helper with which you can speed up the whole thing. if it's present you can know in advance the part size. if it's not present you're not allowed to buffer the input read since otherwise fully arrived part not give back to the user! and imho it's a bug!

lfarkas avatar Jun 13 '22 12:06 lfarkas

is there any progress? or do you need any further info?

lfarkas avatar Aug 09 '22 12:08 lfarkas

Closing in favour of @swankjesse suggestion

Would you consider forking MultipartReader to specialize it for your use case

yschimke avatar May 21 '23 09:05 yschimke

this means okhttp can't be used to parse multiapart event stream? because the current client only usable if all parts come a the same time. ie if all part can be arrived at different time then it's not usable.

lfarkas avatar May 21 '23 12:05 lfarkas

I thought the suggestion was to use OkHttp Response stream. But use it with a customised implementation of multipart stream.

OkHttp definitely streams responses generally. I thought the issue was with the specific multipart classes?

yschimke avatar May 21 '23 13:05 yschimke

I'll take a second look to confirm and make sure I haven't misunderstood.

yschimke avatar May 21 '23 13:05 yschimke

so simple describe the problem:

  • suppose we've got an event stream which is a multipart http stream,
  • each event is one part (it can be xml or json or enything),
  • each event arrive in every 5 minutes,
  • each so each part start with a multipart boundary element,
  • suppose the first element arrive at time 0 (which means the first boundary at time 0)
  • this means the second part will arrive at 5min and this also means that the second boundary arrive at 5min (and this is important!)
  • the current implementation do not parse (or report) the arrival of first event just at time 5min since the parser only found the next boundary element at time 5min
  • even if the part has Content-Length so we exactly knows how many bytes should have to arrive.
  • this means all events arrive to the caller with 5 minutes delay even if all bytes already arrived and read from the stream.

lfarkas avatar May 22 '23 15:05 lfarkas