akka-stream-json icon indicating copy to clipboard operation
akka-stream-json copied to clipboard

The stream gets stuck with the parser when large number of files are pushed thru.

Open wpoosanguansit opened this issue 8 years ago • 0 comments

Hi,

I managed to get file delete into the stream processing by defining a runnable graph as below:

  def g(source: Source[Path, Unit]) = RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Path](2))

      val A: FlowShape[Path, JsValue] = builder.add(flow)
      val B: FlowShape[Path, Path] = builder.add(id)
      val C: FlowShape[Path, Boolean] = builder.add(deleteFlow)
      val zip = builder.add(Zip[JsValue, Path]())
      val unzip = builder.add(Unzip[JsValue, Path]())

      val out = Sink.foreach(println)
      source ~> broadcast ~> A ~> zip.in0
                broadcast ~> B ~> zip.in1
                zip.out ~> unzip.in
                unzip.out0 ~> out
                unzip.out1 ~> C ~> Sink.ignore
      ClosedShape
  })

However, if I have the flow as

val flow = Flow[Path]
    .filter(_.getFileName.toString.trim.endsWith(".json"))
    .throttle(1, 1 second, 1, p => 2, ThrottleMode.Shaping)
    .flatMapConcat(p => FileIO.fromFile(p.toFile))
    .via(JsonStreamParser[JsValue])

The stream will get stuck if I have like 50 files pushed thru in succession. But it would work slowly thru if I have the following code instead:

val flow = Flow[Path]
    .filter(_.getFileName.toString.trim.endsWith(".json"))
    .map(p => ByteBuffer.wrap(scala.io.Source.fromFile(p.toString).toArray.map(_.toByte)))
    .map(Parser.parseFromByteBuffer)
    .map(_.get)

the error that I got: java.lang.RuntimeException: Nonzero exit code returned from runner: 255 at scala.sys.package$.error(package.scala:27)

I know the _.get is unsafe. But I am just testing out and it turns out to have different behavior. Again, I am not really sure if this has anything to do with the parser. Any hint or idea how I should investigate this further is appreciated. Thanks.

wpoosanguansit avatar Jun 13 '16 18:06 wpoosanguansit