manifold icon indicating copy to clipboard operation
manifold copied to clipboard

Questions: best way to monitor stream consumption

Open bhurlow opened this issue 8 years ago • 2 comments

Hey Zach,

I have a somewhat simple manifold question: how can I monitor the progress of a stream being consumed over time? I have an incoming http request input-stream which I'm "piping" to an sink (in this case s3). I know the total content length, so I want to create another stream which can calculate the progress of the stream being drained and print this elsewhere, say a websocket. Bonus points if I can get that monitoring stream to emit events less frequently than the input one.

Is there a simple way to handle this in manifold?

Thank you!

bhurlow avatar Dec 12 '17 14:12 bhurlow

There are a few ways you could do this, but probably the easiest is to just do something like:

(let [bytes (AtomicLong. 0)]
  [(s/map #(do (.addAndGet bytes (num-bytes %)) %)) bytes)
   (s/periodically 1000 #(.getAndSet bytes 0))])

This will return a 2-tuple of an instrumented stream, and a stream that will emit the number of bytes that have passed through in the last second. If your incoming stream is just a manifold stream of byte-arrays, replace num-bytes with count.

ztellman avatar Dec 12 '17 20:12 ztellman

Awesome thank you

bhurlow avatar Dec 13 '17 17:12 bhurlow