openai-java icon indicating copy to clipboard operation
openai-java copied to clipboard

Streaming support

Open n3bul4 opened this issue 1 year ago • 5 comments

Utilize retrofit2.http.Streaming and retrofit2.Call<ResponseBody> in additional OpenAIApi methods to enable a streamable ResponseBody.

Utilize retrofit2.Callback to get the streamable ResponseBody, parse Server Sent Events (SSE) and emit them using io.reactivex.FlowableEmitter.

Enable:

  • Streaming of raw bytes
  • Streaming of Java objects
  • Shutdown of OkHttp ExecutorService

Fixes: #51, #83, #182, #184

n3bul4 avatar Mar 17 '23 01:03 n3bul4

Looking forward to release.

kubecutle avatar Mar 21 '23 01:03 kubecutle

有办法保证向前端推流的稳定性吗,向前端推流的过程总是有卡顿感

Ruanandxian avatar Mar 21 '23 02:03 Ruanandxian

@Ruanandxian Hello, unfortunately I don't understand Chinese, but Google Translate gives me the following:

"Is there a way to ensure the stability of pushing the stream to the front end, the process of pushing the stream to the front end always has a sense of lag ."

I'm not sure if I understand you correctly, but it may be that you don't flush the OutputStream. Note that ServletOutputStreams and the like buffer bytes written to it and don't write them out immediately. To write the received data immediately you must flush the output stream after each write call.

Something like this could help:

service  
   .streamChatCompletionBytes(request)
   .doOnError(e -> {
       e.printStackTrace();
    })
    .blockingForEach(bytes -> {
       response.write(bytes);
       response.flush(); //flush to write out immediately
    });

Otherwise please provide some more information. Some code examples could help to understand what exactly you would like to achieve.

n3bul4 avatar Mar 21 '23 12:03 n3bul4

Thank you for providing the code that allows me to output results in stream mode in the project

DOOB-B avatar Mar 22 '23 02:03 DOOB-B

Thank you for providing the code that allows me to output results in stream mode in the project

You're welcome! I'm glad to hear that the code I provided was useful for you in your project.

n3bul4 avatar Mar 22 '23 12:03 n3bul4

Thank you! I'm going to add some tests to this and clean up a few things, but it'll be in the next release

TheoKanning avatar Mar 28 '23 01:03 TheoKanning

I use spring-boot-starter-webflux to recieve the stream, then to extract the response text and use Flux to renturn for Interface caller,but I feel the response is not async。 Is some thing wrong? there is my code webClient.post() .uri("https://api.openai.com/v1/chat/completions") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.TEXT_EVENT_STREAM) .header(HttpHeaders.CONNECTION, "keep-alive") .header("Authorization", String.format("Bearer %s", properties.getToken())) .bodyValue(body) .retrieve() .bodyToFlux(String.class).map(response -> buildResult(response, composeResult, request))

Mrblw avatar Mar 28 '23 04:03 Mrblw

If you look at @n3bul4 comment above, he was having issues w/ flushing the stream?

cryptoapebot avatar Mar 28 '23 13:03 cryptoapebot

Hey @Mrblw,

I haven't worked with webflux yet but I am pretty sure

.bodyValue(body)
.retrieve() 
.bodyToFlux(String.class)

will read the whole response body at once into a single String instance. If I am wrong with my assumption and you get multiple chunks of the response body as Strings it could be, that you do not flush each chunk after retrieval as @cryptoapebot has already stated.

n3bul4 avatar Mar 28 '23 18:03 n3bul4

Could someone please provide an example of how to utilize this?

phazei avatar Mar 28 '23 23:03 phazei

Could someone please provide an example of how to utilize this?

https://github.com/TheoKanning/openai-java/blob/main/example/src/main/java/example/OpenAiApiStreamExample.java

You might have to add: import java.time.Duration;

And when you create the service.

OpenAiService service = new OpenAiService(token, Duration.ofSeconds(35));

If you are using gradle, then in the top level directory just run: ./gradlew example:run

Also make sure you have OPENAI_TOKEN token set in your environment to your openAI license key.

cryptoapebot avatar Mar 28 '23 23:03 cryptoapebot

I haven't tried it yet, but you can find here some examples of how to normally handle streaming responses (Going to test when the version gets released): https://www.baeldung.com/spring-mvc-sse-streams

Hope this helps :-)

And thanks for the PR! 👍 🥇

TrautmannP avatar Mar 29 '23 00:03 TrautmannP

@phazei Another way to use this in spring looks like this:

@GetMapping("/")
@Streaming
public ResponseEntity<StreamingResponseBody> stream() {
  StreamingResponseBody responseBody = response -> {
	ChatCompletionRequest request = ChatCompletionRequest.builder()
		.messages(messages)
		.stream(true)
		.model("gpt-3.5-turbo")
		.maxTokens(MAX_TOKENS)
		.temperature(1.0)
		.frequencyPenalty(0.3)
		.presencePenalty(0.3)
		.build();

	service
		.streamChatCompletionBytes(request)
		.doOnError(e -> {
			e.printStackTrace();
		})
		.doOnComplete(new Action() {
			@Override
			public void run() throws Exception {
				//do something on completion
			}
		})
		.blockingForEach(bytes -> {
			response.write(bytes);
			response.flush(); //immediately write out buffered bytes
		});
  };

  return ResponseEntity.ok()
           .contentType(MediaType.TEXT_EVENT_STREAM)
           .body(responseBody);
}

n3bul4 avatar Mar 31 '23 13:03 n3bul4

@phazei Another way to use this in spring looks like this:

@GetMapping("/")
@Streaming
public ResponseEntity<StreamingResponseBody> stream() {
  StreamingResponseBody responseBody = response -> {
	ChatCompletionRequest request = ChatCompletionRequest.builder()
		.messages(messages)
		.stream(true)
		.model("gpt-3.5-turbo")
		.maxTokens(MAX_TOKENS)
		.temperature(1.0)
		.frequencyPenalty(0.3)
		.presencePenalty(0.3)
		.build();

	service
		.streamChatCompletionBytes(request)
		.doOnError(e -> {
			e.printStackTrace();
		})
		.doOnComplete(new Action() {
			@Override
			public void run() throws Exception {
				//do something on completion
			}
		})
		.blockingForEach(bytes -> {
			response.write(bytes);
			response.flush(); //immediately write out buffered bytes
		});
  };

  return ResponseEntity.ok()
           .contentType(MediaType.TEXT_EVENT_STREAM)
           .body(responseBody);
}

@n3bul4 hi, I used this code, but there is a difference compared to directly calling OpenAI. The first data response time from OpenAI is about 3 seconds, while using the above code, the first response time is about 17 seconds. I am confused about why there is such a difference.

an9xyz avatar Apr 25 '23 07:04 an9xyz

@phazei Another way to use this in spring looks like this:

@GetMapping("/")
@Streaming
public ResponseEntity<StreamingResponseBody> stream() {
  StreamingResponseBody responseBody = response -> {
	ChatCompletionRequest request = ChatCompletionRequest.builder()
		.messages(messages)
		.stream(true)
		.model("gpt-3.5-turbo")
		.maxTokens(MAX_TOKENS)
		.temperature(1.0)
		.frequencyPenalty(0.3)
		.presencePenalty(0.3)
		.build();

	service
		.streamChatCompletionBytes(request)
		.doOnError(e -> {
			e.printStackTrace();
		})
		.doOnComplete(new Action() {
			@Override
			public void run() throws Exception {
				//do something on completion
			}
		})
		.blockingForEach(bytes -> {
			response.write(bytes);
			response.flush(); //immediately write out buffered bytes
		});
  };

  return ResponseEntity.ok()
           .contentType(MediaType.TEXT_EVENT_STREAM)
           .body(responseBody);
}

@n3bul4 hi, I used this code, but there is a difference compared to directly calling OpenAI. The first data response time from OpenAI is about 3 seconds, while using the above code, the first response time is about 17 seconds. I am confused about why there is such a difference.

@an9xyz hi, what do you mean by "directly calling OpenAI"? The code is actually directly calling the streaming part of the OpenAI API. I am using about the same code in a project and have not encountered any issues with abnormal delays. Notice, that sometimes OpenAI API is overloaded (especially when using a trial account) and response times can vary strongly on peek times but it should not be related to above code.

n3bul4 avatar Apr 25 '23 10:04 n3bul4

My point is to use the OpenAI example -> 3. How much time is saved by streaming a chat completion(Link) An example of using the Python library requests is as follows:

Message received 2.10 seconds after request: {
  "role": "assistant"
}
Message received 2.10 seconds after request: {
  "content": "\n\n"
}
Message received 2.10 seconds after request: {
  "content": "1"
}
Message received 2.11 seconds after request: {
  "content": ","
}
....

Modify openai.api_base=http://127.0.0.1:10008/stream to point to the Java server endpoint. result:

Message received 10.10 seconds after request: {
  "role": "assistant"
}
Message received 10.10 seconds after request: {
  "content": "\n\n"
}
Message received 10.10 seconds after request: {
  "content": "1"
}
Message received 10.11 seconds after request: {
  "content": ","
}
....

@n3bul4 I noticed that your code uses @GetMapping("/"), should I be using Post instead? Thank you for your reply.

an9xyz avatar Apr 25 '23 13:04 an9xyz

Just a note, I don't think streaming is mean to be a time saving feature. It can start delivering partial results to a user quicker for better UX so there is less wait for first responses, but overall, any request will take longer overall. So it's meant to be a usability tradeoff.

cryptoapebot avatar Apr 25 '23 14:04 cryptoapebot

My point is to use the OpenAI example -> 3. How much time is saved by streaming a chat completion(Link) An example of using the Python library requests is as follows:

Message received 2.10 seconds after request: {
  "role": "assistant"
}
Message received 2.10 seconds after request: {
  "content": "\n\n"
}
Message received 2.10 seconds after request: {
  "content": "1"
}
Message received 2.11 seconds after request: {
  "content": ","
}
....

Modify openai.api_base=http://127.0.0.1:10008/stream to point to the Java server endpoint. result:

Message received 10.10 seconds after request: {
  "role": "assistant"
}
Message received 10.10 seconds after request: {
  "content": "\n\n"
}
Message received 10.10 seconds after request: {
  "content": "1"
}
Message received 10.11 seconds after request: {
  "content": ","
}
....

@n3bul4 I noticed that your code uses @GetMapping("/"), should I be using Post instead? Thank you for your reply.

@an9xyz The example code I provided is using GetMapping annotation, because the EventSource Browser API only supports GET requests and I am using the servlet endpoint with javascript EventSource.

You actually must not use POST for the example servlet, because the servlet is annotated as GetMapping. What I find a bit strange is, that you should actually see a spring error if you are POSTing to a GetMapping as far as there is no PostMapping annotation for the same path (i.e. "/").

What is happening if you just simply enter the URL (http://127.0.0.1:10008/stream) into the browser? Do you experience any delays? This test would at least perform an HTTP GET request.

I think the problem here is, that you are POSTing to a GET mapping (openai-python uses POST) although I wonder why spring is not complaining about it.

I am not sure what exactly you would like to achieve. If you want to use an EventSource with javascript to read the response than the code I provided is one way to go. In this case you should not test the servlet with the openai-python library, because it is using POST.

I you don't have to use EventSource with javascript you can change the GetMapping annotation to PostMapping. I would try it out and look at the results.

I hope this helps.

n3bul4 avatar Apr 25 '23 14:04 n3bul4

@an9xyz The example code I provided is using GetMapping annotation, because the EventSource Browser API only supports GET requests and I am using the servlet endpoint with javascript EventSource.

You actually must not use POST for the example servlet, because the servlet is annotated as GetMapping. What I find a bit strange is, that you should actually see a spring error if you are POSTing to a GetMapping as far as there is no PostMapping annotation for the same path (i.e. "/").

What is happening if you just simply enter the URL (http://127.0.0.1:10008/stream) into the browser? Do you experience any delays? This test would at least perform an HTTP GET request.

I think the problem here is, that you are POSTing to a GET mapping (openai-python uses POST) although I wonder why spring is not complaining about it.

I am not sure what exactly you would like to achieve. If you want to use an EventSource with javascript to read the response than the code I provided is one way to go. In this case you should not test the servlet with the openai-python library, because it is using POST.

I you don't have to use EventSource with javascript you can change the GetMapping annotation to PostMapping. I would try it out and look at the results.

I hope this helps.

Sorry, I didn't explain clearly. Your response was really helpful 👍. @n3bul4 Actually, what I meant to express was how to achieve the stream effect of OpenAI API without using EventSource. Thanks again.

an9xyz avatar Apr 25 '23 15:04 an9xyz

@an9xyz you can use the example code to achieve streaming. EventSource is the javascript way of handling content-type text/event-stream and it requires GET to work. It is basically up to you which HTTP method you use or which content-type you choose. I would say it depends on the use case.

If your goal is something like ChatGPT use the code I provided, as ChatGPT is using EventSource.

Otherwise please try to describe what you would like to achieve. Where should the data of the stream go? If you just want to put it somewhere into a database or file I wouldn't use streaming at all.

n3bul4 avatar Apr 25 '23 16:04 n3bul4

@n3bul4 what I want to achieve is to encapsulate the OpenAI interface and provide a public service internally. Your example code works well for local debugging with streaming response, but when I deploy it to the test environment, the request is blocked and there is no streaming response effect, and the result is like returning all data as with a regular API request. I don't know where the blocking occurs. We are not considering ChatGPT Web here, only the server-side.

UPDATE: It is likely a configuration problem with Nginx, and I am still trying to solve it.

an9xyz avatar Apr 26 '23 15:04 an9xyz

@n3bul4 what I want to achieve is to encapsulate the OpenAI interface and provide a public service internally. Your example code works well for local debugging with streaming response, but when I deploy it to the test environment, the request is blocked and there is no streaming response effect, and the result is like returning all data as with a regular API request. I don't know where the blocking occurs. We are not considering ChatGPT Web here, only the server-side.

UPDATE: It is likely a configuration problem with Nginx, and I am still trying to solve it.

I encountered the same issue, suspecting it was caused by Nginx buffering. I changed the configuration according to For Server-Sent Events (SSE) what Nginx proxy configuration is appropriate?, but the problem still persists. May I ask how you resolved it?

h2cone avatar Jul 14 '23 02:07 h2cone