dio icon indicating copy to clipboard operation
dio copied to clipboard

How do we listen to Server Sent Event(SSE) Stream using Dio package

Open dewbambs opened this issue 4 years ago • 10 comments

Issue Info

Info Value
Platform Name flutter
Platform Version 2.5
Dio Version 4.0.0
Android Studio 4.1

Issue Description

I was trying to subscribe to server sent event(SSE) stream. I was able to connect to the stream using this code which uses HTTP package. It works totally as expected and returns steam of data. But generally I use Dio package. Hence I wanted to implement the same using dio flutter. But unfortunately I'm unable to do that.

subscribe() async {
    print("Subscribing..");
    try {
      _client = http.Client();

      var request = new http.Request("GET", Uri.parse("http://192.168.1.11:8080/myserver/events"));
      request.headers["Cache-Control"] = "no-cache";
      request.headers["Accept"] = "text/event-stream";

      Future<http.StreamedResponse> response = _client.send(request);
      print("Subscribed!");

      response.asStream().listen((streamedResponse) {
        print("Received streamedResponse.statusCode:${streamedResponse.statusCode}");
        streamedResponse.stream.listen((data) {
          print("Received data:${utf8.decode(data)}");
        });
      });
    } catch(e) {
      print("Caught $e");
    }
  }

This is what I tried with Dio package. But this returns 404 request not found. Mostly I'm not able to figure out how do I get a StreamedResponse using dio package.

var response = dio.request(url, options: Options(headers: {"Accept": "*/*", "Cache-Control": "no-cache"}));
    print("Subscribed!");

    response.asStream().listen((streamedResponse) {
      print("Received streamedResponse.statusCode:${streamedResponse.statusCode}");
      print("data:----------------------> ${streamedResponse.data}");
    });

I did search for example implementing SSE streams using dio but, I'm unable to find it.

dewbambs avatar Sep 20 '21 05:09 dewbambs

I don't think dio allows for this use case at the moment

jgoyvaerts avatar Sep 20 '21 09:09 jgoyvaerts

That's very unfortunate . It's an important feature. As many apps use sse events now a days. e.g. stock broking apps etc. So if dio can add the feature it will be helpful.

dewbambs avatar Sep 20 '21 15:09 dewbambs

Do you have a free example api which offers this kind of stream?

jgoyvaerts avatar Sep 20 '21 15:09 jgoyvaerts

I don't have a free example. I'm testing on my servers stream

dewbambs avatar Sep 21 '21 11:09 dewbambs

That's very unfortunate . It's an important feature. As many apps use sse events now a days. e.g. stock broking apps etc. So if dio can add the feature it will be helpful.

I agree with it. SSE is a very important and popular feature, haha.

acn001 avatar Sep 23 '21 03:09 acn001

You can use Socket.IO for server-side events for now.

alphamanorg avatar Oct 03 '21 09:10 alphamanorg

Yes, I'm using http package itself for now. As it gives expected result.

dewbambs avatar Oct 07 '21 19:10 dewbambs

You can use SSE with ResponseType.stream in dio :

  Response<ResponseBody>  rs = await Dio().get<ResponseBody>(
      "https://server/stream",
      options: Options(headers: {
        "Authorization":
            'vhdrjb token"',
        "Accept": "text/event-stream",
        "Cache-Control": "no-cache",
      }, responseType: ResponseType.stream), // set responseType to `stream`
    );

it gets result as Uint8List and you should first transform it to List<int> :

    StreamTransformer<Uint8List, List<int>> unit8Transformer =
        StreamTransformer.fromHandlers(
      handleData: (data, sink) {
        sink.add(List<int>.from(data));
      },
    );

and then you can transform it to json :

    rs.data?.stream
        .transform(unit8Transformer)
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .listen((event) {
      log(event);
    });

This worked for me and now i'm getting data from server

vhdrjb avatar Jun 09 '22 03:06 vhdrjb

Wow, this would be amazing to have built into Dio. The other solutions to this problem in Dart don't handle errors well, like retrying disconnections. For testing, there's a free SSE endpoint at https://hacker-news.firebaseio.com/v0/topstories.json. the documentation is at https://github.com/HackerNews/API

blopker avatar Jun 09 '22 18:06 blopker

Yup that will be great we need to include all the functionality that eventsource package provides for js.

Wow, this would be amazing to have built into Dio. The other solutions to this problem in Dart don't handle errors well, like retrying disconnections. For testing, there's a free SSE endpoint at https://hacker-news.firebaseio.com/v0/topstories.json. the documentation is at https://github.com/HackerNews/API

dewbambs avatar Jun 10 '22 07:06 dewbambs

SSE messages must be split by a double new line. Does anyone implement such a splitter?

hurelhuyag avatar Nov 24 '22 07:11 hurelhuyag

Based on @vhdrjb's code, I have implemented one more transformer to extract the correct SSE message. But I believe there must be a more efficient transformer. But with my current dart/flutter knowledge, that is what I can do.

rs.data?.stream
        .transform(unit8Transformer)
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .transform(const SseTransformer())
        .listen((event) {
          debugPrint("Event: ${event.id}, ${event.event}, ${event.retry}, ${event.data}");
        });

class SseTransformer extends StreamTransformerBase<String, SseMessage> {
  const SseTransformer();
  @override
  Stream<SseMessage> bind(Stream<String> stream) {
    return Stream.eventTransformed(stream, (sink) => SseEventSink(sink));
  }
}

class SseEventSink extends EventSink<String> {

  final EventSink<SseMessage> _eventSink;

  String? _id;
  String _event = "message";
  String _data = "";
  int? _retry;

  SseEventSink(this._eventSink);

  @override
  void add(String event) {
    if (event.startsWith("id:")) {
      _id = event.substring(3);
      return;
    }
    if (event.startsWith("event:")) {
      _event = event.substring(6);
      return;
    }
    if (event.startsWith("data:")) {
      _data = event.substring(5);
      return;
    }
    if (event.startsWith("retry:")) {
      _retry = int.tryParse(event.substring(6));
      return;
    }
    if (event.isEmpty) {
      _eventSink.add(SseMessage(id: _id, event: _event, data: _data, retry: _retry));
      _id = null;
      _event = "message";
      _data = "";
      _retry = null;
    }
  }

  @override
  void addError(Object error, [StackTrace? stackTrace]) {
    _eventSink.addError(error, stackTrace);
  }

  @override
  void close() {
    _eventSink.close();
  }
}

class SseMessage {
  final String? id;
  final String event;
  final String data;
  final int? retry;

  const SseMessage({
    this.id,
    required this.event,
    required this.data,
    this.retry,
  });
}

hurelhuyag avatar Nov 24 '22 08:11 hurelhuyag

Based on @vhdrjb's code, I have implemented one more transformer to extract the correct SSE message. But I believe there must be a more efficient transformer. But with my current dart/flutter knowledge, that is what I can do.

rs.data?.stream
        .transform(unit8Transformer)
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .transform(const SseTransformer())
        .listen((event) {
          debugPrint("Event: ${event.id}, ${event.event}, ${event.retry}, ${event.data}");
        });

class SseTransformer extends StreamTransformerBase<String, SseMessage> {
  const SseTransformer();
  @override
  Stream<SseMessage> bind(Stream<String> stream) {
    return Stream.eventTransformed(stream, (sink) => SseEventSink(sink));
  }
}

class SseEventSink extends EventSink<String> {

  final EventSink<SseMessage> _eventSink;

  String? _id;
  String _event = "message";
  String _data = "";
  int? _retry;

  SseEventSink(this._eventSink);

  @override
  void add(String event) {
    if (event.startsWith("id:")) {
      _id = event.substring(3);
      return;
    }
    if (event.startsWith("event:")) {
      _event = event.substring(6);
      return;
    }
    if (event.startsWith("data:")) {
      _data = event.substring(5);
      return;
    }
    if (event.startsWith("retry:")) {
      _retry = int.tryParse(event.substring(6));
      return;
    }
    if (event.isEmpty) {
      _eventSink.add(SseMessage(id: _id, event: _event, data: _data, retry: _retry));
      _id = null;
      _event = "message";
      _data = "";
      _retry = null;
    }
  }

  @override
  void addError(Object error, [StackTrace? stackTrace]) {
    _eventSink.addError(error, stackTrace);
  }

  @override
  void close() {
    _eventSink.close();
  }
}

class SseMessage {
  final String? id;
  final String event;
  final String data;
  final int? retry;

  const SseMessage({
    this.id,
    required this.event,
    required this.data,
    this.retry,
  });
}

Can you share how to disconnect the stream connection?

quangtuyen1993 avatar Feb 21 '23 04:02 quangtuyen1993

I am on flutter web and when I use this code, the stream only starts when the whole response was recieved. I get all events, but I would like to receive them live. Is there some missing support for Web, do I need configure something or is this expected?

clotodex avatar Apr 19 '23 08:04 clotodex

The stream reading based on the XHR is not available. As of workaround, see https://github.com/dart-lang/http/issues/595.

AlexV525 avatar Apr 19 '23 09:04 AlexV525