msgpack_dart
msgpack_dart copied to clipboard
feat: serializing / deserializing over streams
trafficstars
Currently, this library provides no way to perform:
- Serializing many Dart objects into msgpack binary output stream
- Deserializing msgpack binary stream into Dart objects
- Deserializing Dart object(s) from one or many
Uint8List
This PR contains:
- Depend on
package:async/async.dart - Introducing
StreamSerializer, which implementsStreamTransformer<dynamic, Uint8List> - Introducing
StreamDeserializer, which implementsStreamTransformer<List<int>, dynamic> - Introducing tests covering serializing and deserializing using the new
StreamTransformer
The discrepancy between the implemented StreamTransformer interface between the two is because:
- Serializing arbitrary Dart objects into msgpack format already yield
Uint8List, which implementsList<int>, it does not make sense to downgrade the type information here. - Msgpack bytes streams to deserialize can come from various sources, some exposing their interface directly as
Stream<Uint8List>while someStream<List<int>>, loosening the type constraint here should increase compatibility and avoid scenarios ofStream<List<int>>.map((e) => Uint8List.fromList(e)). - ~
StreamTransformer.cast()can properly recast bothStreamSerializerandStreamDeserializerbetweenUint8ListandList<int>for compatibility in bothStream<List<int>>andStream<Uint8List>if the underlying stream elements have correct types during runtime.~ UsingStreamTransformer.binddirectly rather thanStream.transformallows to properly bypass usingStream.castorStreamTransformer.castwhich has performance cost
Noteworthy stream behaviors:
StreamDeserializerwill wait for more bytes upstream if it requires more bytes in order to complete deserializationStreamDeserializerwill emitUpstreamClosedErrorif upstream closes unexpected (in the midst of deserializing a dart object that spans over many bytes)- Both
StreamSerializerandStreamDeserializerhave similar constructor arguments to their non-stream counterparts.
This PR should close #6, closely relates to #10 and should be generally a better solution than exposing Deserializer.offset.
Serializing Dart objects into msgpack binary and outputting into a stream is already something any user can implement on their own with the existing Serializer class, but semantics of deserializing from a byte stream or multiple Uint8List into multiple Dart objects are not possible with the current Deserializer interface.
Usage:
// Deserializing msgpack bytes from another process
Stream<List<int>> byteStream = process.stdout;
Stream<dynamic> objectStream = byteStream.transform(StreamDeserializer());
await for(final object in objectStream) {
print(object);
}
// Deserializing chunks of Uint8List
List<Uint8List> chunks = ...;
Stream<Uint8List> byteStream = Stream.fromIterable(chunks);
// If the byte stream is of type `Stream<Uint8List>`, it is almost certain you have to use `StreamTransformer.bind` directly
Stream<dynamic> objectStream = StreamDeserializer().bind(byteStream);
List<dynamic> objectsList = await objectStream.toList();
// Serializing objects stream into msgpack bytes stream
Stream<dynamic> objectStream = ...;
Stream<Uint8List> bytesStream = objectStream.transform(StreamSerializer());
await for(final bytesChunk in bytesStream) {
print(bytesChunk);
}
// Serializing many objects at once
List<dynamic> objects = ...;
Stream<dynamic> objectStream = Stream.fromIterable(objects);
Stream<Uint8List> bytesStream = objectStream.transform(StreamSerializer());
// To get 1-1 correspondence between input and output
List<Uint8List> bytesChunks = await bytesStream.toList(); // bytesChunk.length == objects.length
// To get M-1 correspondence between input and output
Uint8List bytes = await collectBytes(bytesStream); // collectBytes from package:async/async.dart