fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Semantics of `fold` apparently broken by `observe` in case of failure

Open epellizzer opened this issue 6 years ago • 8 comments

Hi.

The following piece of code shows that in version 1.0.5, when a pipe given to observe fails, a folded stream fails to comply with its guarantee of single emission.

import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import fs2.Stream

object TestApp extends IOApp {
  def run(args: List[String]): IO[ExitCode] =
    stream.compile.drain.as(ExitCode.Success)

  private def stream =
    for {
      count <- subStream
      _ <- Stream.eval(IO(println(s"Count: $count")))
    } yield ()

  private def subStream =
    Stream.range[IO](1, 10)
      .observe(_ => Stream.raiseError[IO](new RuntimeException("Just for fun")))
      .fold(0)((c, _) => c + 1)
}

The result is the following.

Count: 0
java.lang.RuntimeException: Just for fun
	at com.softwaymedical.itodirectory.extraction.TestApp$.$anonfun$subStream$1(TestApp.scala:19)
	at com.softwaymedical.itodirectory.extraction.TestApp$.$anonfun$subStream$1$adapted(TestApp.scala:19)
	at fs2.Stream$.through$extension(Stream.scala:2521)
	at fs2.Stream$InvariantOps$.sinkStream$1(Stream.scala:3493)
	...

Since fold should emit a single value, I expect the stream to fail without emitting anything. This is indeed the observed behavior when the implementation of subStream is replaced with:

Stream.raiseError[IO](new RuntimeException("Just for fun")).fold(0)((c, _) => c + 1)

Instead, a value is emitted, and then the streams fails.

As a result, the later effects (materialized here with Stream.eval(IO(println(s"Count: $count")))) take place despite the failure, which in my case leads to dramatic consequences.

I am aware of the concerns expressed in #1206 and #1208 and I was wondering whether they could be the cause.

I use observe a lot, mainly because I often need to use fold to gather information at the end of an ingestion process and use it afterwards. It forces me to shift my main logic into a "secondary" pipe given to observe, the reporting becoming the main stream so that I can preserve the result.

By the way, if there exists a better way to implement this kind of behavior, I would be glad to know about it.

epellizzer avatar Sep 12 '19 15:09 epellizzer

By the way, if there exists a better way to implement this kind of behavior, I would be glad to know about it.

I didn't 100% understand the use case, but iiic you need to carry extra state through a process, which you can do with a variety of combinators like mapAccumulate/scan/etc, or through the use of a Ref. It's not as modular as having two separate streams like in observe, but then it's not as complex either, because you don't have any of the coordination concerns between the main and observed stream. broadcast sits in between, you duplicate the stream and depending on what you do, it might be easier than observe.

I will also look at the specifics of the report once I get some time :)

SystemFw avatar Sep 12 '19 16:09 SystemFw

Ah, partially related: if you have a minimised example of the sort of behaviour you are trying to express with observe, I can have a look as to whether there is an easier way to implement it

SystemFw avatar Sep 12 '19 16:09 SystemFw

Thank you for your answer.

I don't have time to give you a minimised example right now, but I can explain what I do in the specific case where I stumbled upon this issue.

I have to ingest data into several Elasticsearch indices. There are three steps in my process, each involving sending data to its own index, and more or less depending on the processing of the previous step. I must remember how many elements were actually processed for each step, so that when all the steps are done, some final effects must be executed, like sending a report giving the details of each step.

This is only one case, but I very often need to count the number of processed elements, regardless of how they are processed.

For example, in the case I mention here, the ingestion into Elasticsearch is done by rechunking the elements by a fixed, configured amount, and performing a bulk request for each chunk. My Elasticsearch client is thus implemented as a sink, but since I must preserve the amount of processed elements and emit it at the end, I can only give the sink to observe.

And I just realized that any failure while discussing with Elasticsearch causes my main stream to emit 0 and happily execute the later effects (including swapping some aliases in ES, which is a disaster here, since the ingestion failed), as if they were put in a finally. It also means I have many other pieces of code in other programs that are subject to this.

epellizzer avatar Sep 12 '19 16:09 epellizzer

Damn, now even when my pipes succeed, I randomly get the following error from fs2 :

java.lang.Throwable: Fail to find scope for next step: current: Token(266e42e1), step: Step(FreeC.Bind(FreeC.Pure(()), fs2.internal.FreeC$ViewL$$$Lambda$86/0x000000080132e040@4b974aae),Some(Token(2ea58561)))
	at fs2.internal.Algebra$.$anonfun$compileLoop$5(Algebra.scala:272)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:139)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
	at cats.effect.internals.IORunLoop$RestartCallback.run(IORunLoop.scala:366)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:133)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:120)
	at com.softwaymedical.itodirectory.common.elastic.ElasticClient$$anon$1.onResponse(ElasticClient.scala:194)
	at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1598)
	at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:556)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:300)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:294)
	at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
	at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
	at java.base/java.lang.Thread.run(Thread.java:835)

Sometimes it works, sometimes it fails. It happened after I added a pretty complicated pipe to compute a diff between two sorted streams. I probably did something wrong, but I guess fs2 shouldn't react this way. The pipe is also used in observe.

Note that when it fails with this error, once again the folded stream emits a value, and then fails. It is even more obvious when I add attempt; a Right and a Left are emitted, in this order.

I will try to come up with small examples in the next days, and as a last resort, I will share some of my actual code.

epellizzer avatar Sep 13 '19 12:09 epellizzer

Ok, the last error I mentionned is unrelated to observe. Something is wrong with my complex pipe but I haven't figured it out yet. It seems to work because it produces consistent output, even with thousands of elements, but the error occurs once it completes. Anyway, this is a different matter.

epellizzer avatar Sep 17 '19 07:09 epellizzer

But the Scope one is more worrying tbh (for that case I'd check with the newest version though)

SystemFw avatar Sep 17 '19 08:09 SystemFw

Indeed, and I'm going to try with 2.0.0.

epellizzer avatar Sep 17 '19 11:09 epellizzer

No luck with 2.0.0.

In addition to the scope issue, profiling shows high GC pressure, and in the case when my diff pipe is used with identical streams (that is, it should yield no difference), the heap blows up.

I'm going to investigate, but in the meantime, I will file two new issues, one for the scope and one for the memory leak.

epellizzer avatar Sep 18 '19 15:09 epellizzer