tink_io icon indicating copy to clipboard operation
tink_io copied to clipboard

Notify when sink ended

Open kevinresol opened this issue 6 years ago • 4 comments

Trying to rewrite this code to use OutputSink + BytesOutput.

The idea is to return a sink from the function for the user to pipeTo, when the sink is ended I will collect the bytes and send them to an api which does not support streaming.

But the problem is after handing the sink to outside, there seems no way to know that the sink as ended. I think I can extend BytesOutput and add a Future that gets triggered when its close() is called. Or it is better to add something to Sink itself?

kevinresol avatar Oct 06 '18 02:10 kevinresol

Also, how do I incorporate the API call result into the sink itself so that the user knows if the operation is a success or failure?

kevinresol avatar Oct 06 '18 02:10 kevinresol

I think this all boils down to a problem: we lack a way to define a sink that produces a result only when it seals.

kevinresol avatar Oct 06 '18 10:10 kevinresol

@back2dos any idea on this one?

kevinresol avatar Jan 15 '19 09:01 kevinresol

Here is the code extracted from my implementation for AWS S3:

But I couldn't think of a good name for the class. It is basically a Sink that will produce a result if and only if it is ended. (that's why I am not willing to make a PR yet)

As an example, a simple byte collector (equivalent to source.all()) can be implemented by create a BytesBuffer, then add data to it during progress() and then getBytes() from it in end()

class UsefulSink<Result> extends SinkBase<Error, Result> {
  var ended = false;
  var result:PromiseTrigger<Result> = Promise.trigger();
  
  override function get_sealed() return ended;
  
  override function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Error, Result>> {
    return
      if(ended)
        result.asPromise().map(function(o) return switch o {
          case Success(result): SinkEnded(result, source);
          case Failure(e): SinkFailed(e, source);
        });
      else
        source.forEach(function(chunk) {
          return progress(chunk).map(function(o) return switch o {
            case Success(_): Resume;
            case Failure(e): Clog(e);
          });
        }).flatMap(function(o):Future<PipeResult<EIn, Error, Result>> return switch o {
          case Depleted:
            if(options.end) {
              ended = true;
              end().map(function(o) {
                result.trigger(o);
                return switch o {
                  case Success(result): SinkEnded(result, Source.EMPTY);
                  case Failure(e): SinkFailed(e, Source.EMPTY);
                }
              });
            } else {
              Future.sync(AllWritten);
            }
          case Clogged(e, rest):
            Future.sync(SinkFailed(e, rest));
          case Failed(e):
            Future.sync(cast SourceFailed(e));
          case Halted(rest):
            throw 'unreachable';
        });
  }
  
  function progress(chunk:Chunk):Promise<Noise> {
    throw 'abstract';
  }
  
  function end():Promise<Result> {
    throw 'abstract';
  }
}

kevinresol avatar Mar 01 '19 15:03 kevinresol