tink_io
tink_io copied to clipboard
Notify when sink ended
Trying to rewrite this code to use OutputSink + BytesOutput.
The idea is to return a sink from the function for the user to pipeTo
, when the sink is ended I will collect the bytes and send them to an api which does not support streaming.
But the problem is after handing the sink to outside, there seems no way to know that the sink as ended.
I think I can extend BytesOutput
and add a Future
that gets triggered when its close()
is called. Or it is better to add something to Sink itself?
Also, how do I incorporate the API call result into the sink itself so that the user knows if the operation is a success or failure?
I think this all boils down to a problem: we lack a way to define a sink that produces a result only when it seals.
@back2dos any idea on this one?
Here is the code extracted from my implementation for AWS S3:
But I couldn't think of a good name for the class. It is basically a Sink
that will produce a result if and only if it is end
ed. (that's why I am not willing to make a PR yet)
As an example, a simple byte collector (equivalent to source.all()
) can be implemented by create a BytesBuffer
, then add data to it during progress()
and then getBytes()
from it in end()
class UsefulSink<Result> extends SinkBase<Error, Result> {
var ended = false;
var result:PromiseTrigger<Result> = Promise.trigger();
override function get_sealed() return ended;
override function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Error, Result>> {
return
if(ended)
result.asPromise().map(function(o) return switch o {
case Success(result): SinkEnded(result, source);
case Failure(e): SinkFailed(e, source);
});
else
source.forEach(function(chunk) {
return progress(chunk).map(function(o) return switch o {
case Success(_): Resume;
case Failure(e): Clog(e);
});
}).flatMap(function(o):Future<PipeResult<EIn, Error, Result>> return switch o {
case Depleted:
if(options.end) {
ended = true;
end().map(function(o) {
result.trigger(o);
return switch o {
case Success(result): SinkEnded(result, Source.EMPTY);
case Failure(e): SinkFailed(e, Source.EMPTY);
}
});
} else {
Future.sync(AllWritten);
}
case Clogged(e, rest):
Future.sync(SinkFailed(e, rest));
case Failed(e):
Future.sync(cast SourceFailed(e));
case Halted(rest):
throw 'unreachable';
});
}
function progress(chunk:Chunk):Promise<Noise> {
throw 'abstract';
}
function end():Promise<Result> {
throw 'abstract';
}
}