How do we listen to Server Sent Event(SSE) Stream using Dio package
Issue Info
| Info | Value |
|---|---|
| Platform Name | flutter |
| Platform Version | 2.5 |
| Dio Version | 4.0.0 |
| Android Studio | 4.1 |
Issue Description
I was trying to subscribe to server sent event(SSE) stream. I was able to connect to the stream using this code which uses HTTP package. It works totally as expected and returns steam of data. But generally I use Dio package. Hence I wanted to implement the same using dio flutter. But unfortunately I'm unable to do that.
subscribe() async {
print("Subscribing..");
try {
_client = http.Client();
var request = new http.Request("GET", Uri.parse("http://192.168.1.11:8080/myserver/events"));
request.headers["Cache-Control"] = "no-cache";
request.headers["Accept"] = "text/event-stream";
Future<http.StreamedResponse> response = _client.send(request);
print("Subscribed!");
response.asStream().listen((streamedResponse) {
print("Received streamedResponse.statusCode:${streamedResponse.statusCode}");
streamedResponse.stream.listen((data) {
print("Received data:${utf8.decode(data)}");
});
});
} catch(e) {
print("Caught $e");
}
}
This is what I tried with Dio package. But this returns 404 request not found. Mostly I'm not able to figure out how do I get a StreamedResponse using dio package.
var response = dio.request(url, options: Options(headers: {"Accept": "*/*", "Cache-Control": "no-cache"}));
print("Subscribed!");
response.asStream().listen((streamedResponse) {
print("Received streamedResponse.statusCode:${streamedResponse.statusCode}");
print("data:----------------------> ${streamedResponse.data}");
});
I did search for example implementing SSE streams using dio but, I'm unable to find it.
I don't think dio allows for this use case at the moment
That's very unfortunate . It's an important feature. As many apps use sse events now a days. e.g. stock broking apps etc. So if dio can add the feature it will be helpful.
Do you have a free example api which offers this kind of stream?
I don't have a free example. I'm testing on my servers stream
That's very unfortunate . It's an important feature. As many apps use sse events now a days. e.g. stock broking apps etc. So if dio can add the feature it will be helpful.
I agree with it. SSE is a very important and popular feature, haha.
You can use Socket.IO for server-side events for now.
Yes, I'm using http package itself for now. As it gives expected result.
You can use SSE with ResponseType.stream in dio :
Response<ResponseBody> rs = await Dio().get<ResponseBody>(
"https://server/stream",
options: Options(headers: {
"Authorization":
'vhdrjb token"',
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
}, responseType: ResponseType.stream), // set responseType to `stream`
);
it gets result as Uint8List and you should first transform it to List<int> :
StreamTransformer<Uint8List, List<int>> unit8Transformer =
StreamTransformer.fromHandlers(
handleData: (data, sink) {
sink.add(List<int>.from(data));
},
);
and then you can transform it to json :
rs.data?.stream
.transform(unit8Transformer)
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen((event) {
log(event);
});
This worked for me and now i'm getting data from server
Wow, this would be amazing to have built into Dio. The other solutions to this problem in Dart don't handle errors well, like retrying disconnections. For testing, there's a free SSE endpoint at https://hacker-news.firebaseio.com/v0/topstories.json. the documentation is at https://github.com/HackerNews/API
Yup that will be great we need to include all the functionality that eventsource package provides for js.
Wow, this would be amazing to have built into Dio. The other solutions to this problem in Dart don't handle errors well, like retrying disconnections. For testing, there's a free SSE endpoint at https://hacker-news.firebaseio.com/v0/topstories.json. the documentation is at https://github.com/HackerNews/API
SSE messages must be split by a double new line. Does anyone implement such a splitter?
Based on @vhdrjb's code, I have implemented one more transformer to extract the correct SSE message. But I believe there must be a more efficient transformer. But with my current dart/flutter knowledge, that is what I can do.
rs.data?.stream
.transform(unit8Transformer)
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.transform(const SseTransformer())
.listen((event) {
debugPrint("Event: ${event.id}, ${event.event}, ${event.retry}, ${event.data}");
});
class SseTransformer extends StreamTransformerBase<String, SseMessage> {
const SseTransformer();
@override
Stream<SseMessage> bind(Stream<String> stream) {
return Stream.eventTransformed(stream, (sink) => SseEventSink(sink));
}
}
class SseEventSink extends EventSink<String> {
final EventSink<SseMessage> _eventSink;
String? _id;
String _event = "message";
String _data = "";
int? _retry;
SseEventSink(this._eventSink);
@override
void add(String event) {
if (event.startsWith("id:")) {
_id = event.substring(3);
return;
}
if (event.startsWith("event:")) {
_event = event.substring(6);
return;
}
if (event.startsWith("data:")) {
_data = event.substring(5);
return;
}
if (event.startsWith("retry:")) {
_retry = int.tryParse(event.substring(6));
return;
}
if (event.isEmpty) {
_eventSink.add(SseMessage(id: _id, event: _event, data: _data, retry: _retry));
_id = null;
_event = "message";
_data = "";
_retry = null;
}
}
@override
void addError(Object error, [StackTrace? stackTrace]) {
_eventSink.addError(error, stackTrace);
}
@override
void close() {
_eventSink.close();
}
}
class SseMessage {
final String? id;
final String event;
final String data;
final int? retry;
const SseMessage({
this.id,
required this.event,
required this.data,
this.retry,
});
}
Based on @vhdrjb's code, I have implemented one more transformer to extract the correct SSE message. But I believe there must be a more efficient transformer. But with my current dart/flutter knowledge, that is what I can do.
rs.data?.stream .transform(unit8Transformer) .transform(const Utf8Decoder()) .transform(const LineSplitter()) .transform(const SseTransformer()) .listen((event) { debugPrint("Event: ${event.id}, ${event.event}, ${event.retry}, ${event.data}"); }); class SseTransformer extends StreamTransformerBase<String, SseMessage> { const SseTransformer(); @override Stream<SseMessage> bind(Stream<String> stream) { return Stream.eventTransformed(stream, (sink) => SseEventSink(sink)); } } class SseEventSink extends EventSink<String> { final EventSink<SseMessage> _eventSink; String? _id; String _event = "message"; String _data = ""; int? _retry; SseEventSink(this._eventSink); @override void add(String event) { if (event.startsWith("id:")) { _id = event.substring(3); return; } if (event.startsWith("event:")) { _event = event.substring(6); return; } if (event.startsWith("data:")) { _data = event.substring(5); return; } if (event.startsWith("retry:")) { _retry = int.tryParse(event.substring(6)); return; } if (event.isEmpty) { _eventSink.add(SseMessage(id: _id, event: _event, data: _data, retry: _retry)); _id = null; _event = "message"; _data = ""; _retry = null; } } @override void addError(Object error, [StackTrace? stackTrace]) { _eventSink.addError(error, stackTrace); } @override void close() { _eventSink.close(); } } class SseMessage { final String? id; final String event; final String data; final int? retry; const SseMessage({ this.id, required this.event, required this.data, this.retry, }); }
Can you share how to disconnect the stream connection?
I am on flutter web and when I use this code, the stream only starts when the whole response was recieved. I get all events, but I would like to receive them live. Is there some missing support for Web, do I need configure something or is this expected?
The stream reading based on the XHR is not available. As of workaround, see https://github.com/dart-lang/http/issues/595.