akka-stream-json
akka-stream-json copied to clipboard
The stream gets stuck with the parser when large number of files are pushed thru.
Hi,
I managed to get file delete into the stream processing by defining a runnable graph as below:
def g(source: Source[Path, Unit]) = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Path](2))
val A: FlowShape[Path, JsValue] = builder.add(flow)
val B: FlowShape[Path, Path] = builder.add(id)
val C: FlowShape[Path, Boolean] = builder.add(deleteFlow)
val zip = builder.add(Zip[JsValue, Path]())
val unzip = builder.add(Unzip[JsValue, Path]())
val out = Sink.foreach(println)
source ~> broadcast ~> A ~> zip.in0
broadcast ~> B ~> zip.in1
zip.out ~> unzip.in
unzip.out0 ~> out
unzip.out1 ~> C ~> Sink.ignore
ClosedShape
})
However, if I have the flow as
val flow = Flow[Path]
.filter(_.getFileName.toString.trim.endsWith(".json"))
.throttle(1, 1 second, 1, p => 2, ThrottleMode.Shaping)
.flatMapConcat(p => FileIO.fromFile(p.toFile))
.via(JsonStreamParser[JsValue])
The stream will get stuck if I have like 50 files pushed thru in succession. But it would work slowly thru if I have the following code instead:
val flow = Flow[Path]
.filter(_.getFileName.toString.trim.endsWith(".json"))
.map(p => ByteBuffer.wrap(scala.io.Source.fromFile(p.toString).toArray.map(_.toByte)))
.map(Parser.parseFromByteBuffer)
.map(_.get)
the error that I got: java.lang.RuntimeException: Nonzero exit code returned from runner: 255 at scala.sys.package$.error(package.scala:27)
I know the _.get is unsafe. But I am just testing out and it turns out to have different behavior. Again, I am not really sure if this has anything to do with the parser. Any hint or idea how I should investigate this further is appreciated. Thanks.