finch icon indicating copy to clipboard operation
finch copied to clipboard

[#1078] Streaming with Monix's Observable

Open Avasil opened this issue 6 years ago • 5 comments

Related to #1078

Observable doesn't have the right kind but I think I managed to find decent workaround using

type ObservableF[F[_], A] = Observable[A]

// coproducts were inferring `Observable[A]` instead of alias
implicit def aliasResponseToRealResponse[F[_], A, CT <: Application.Json](implicit
  tr: ToResponse.Aux[F, ObservableF[F, A], CT]
): ToResponse.Aux[F, Observable[A], CT] = tr

Observable is built with Task internally and depends on Scheduler so it will probably stay this way. On the other hand, it provides many methods that can work with polymorphic effects.

It's still Task under the hood so it's not that nice but if you provide Scheduler you can use other F and I'm pretty sure it would still be more performant than alternatives.

Do you think this approach will work correctly? Despite some flaws, it would be fantastic to have server-side support for streaming with Monix Observable for its users. :)

Avasil avatar May 15 '19 08:05 Avasil

Codecov Report

Merging #1112 into master will decrease coverage by 2.19%. The diff coverage is 34.61%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #1112     +/-   ##
=========================================
- Coverage   80.25%   78.05%   -2.2%     
=========================================
  Files          54       56      +2     
  Lines        1028     1080     +52     
  Branches       47       48      +1     
=========================================
+ Hits          825      843     +18     
- Misses        203      237     +34
Impacted Files Coverage Δ
examples/src/main/scala/io/finch/monix/Main.scala 0% <0%> (ø)
circe/src/main/scala/io/finch/circe/Decoders.scala 70% <0%> (-30%) :arrow_down:
monix/src/main/scala/io/finch/monix/package.scala 62.06% <62.06%> (ø)
core/src/main/scala/io/finch/Endpoint.scala 72.57% <0%> (ø) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 32c6ac5...eb7f264. Read the comment docs.

codecov-io avatar May 15 '19 08:05 codecov-io

Thanks, @Avasil!

There is also PR #1098 built on top of Iterant. I wonder if we can unite them under the same module.

sergeykolbasov avatar May 16 '19 09:05 sergeykolbasov

@sergeykolbasov I don't see anything against it (the module will add monix-tail dependency but it's not that big)

I've noticed I missed test for Circe module, I'll add it tonight

Avasil avatar May 16 '19 10:05 Avasil

@sergeykolbasov I have problem with CirceSpec - I added test for Observable but it fails with following error:

[info] CirceSpec:
Reporter completed abruptly with an exception after receiving event: TestFailed(Ordinal(0, 3),GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
 (AbstractJsonSpec.scala:41)
  Falsified after 1 successful property evaluations.
  Location: (AbstractJsonSpec.scala:41)
  Occurred when passed generated values (
    arg0 = List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))"), // 3 shrinks
    arg1 = UTF-32
  )
  Label of failing property:
    Expected: List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))")
Received: List(),CirceSpec,io.finch.circe.test.CirceSpec,Some(io.finch.circe.test.CirceSpec),should monix-circe.enumerate.success.ExampleNestedCaseClass,should monix-circe.enumerate.success.ExampleNestedCaseClass,Vector(),Some(org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException: GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
 (AbstractJsonSpec.scala:41)
  Falsified after 1 successful property evaluations.
  Location: (AbstractJsonSpec.scala:41)
  Occurred when passed generated values (
    arg0 = List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))"), // 3 shrinks
    arg1 = UTF-32
  )
  Label of failing property:
    Expected: List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))")
Received: List()),Some(790),Some(IndentedText(- should monix-circe.enumerate.success.ExampleNestedCaseClass,should monix-circe.enumerate.success.ExampleNestedCaseClass,0)),Some(SeeStackDepthException),Some(io.finch.circe.test.CirceSpec),None,pool-1-thread-1-ScalaTest-running-CirceSpec,1558125731073).
java.io.NotSerializableException: sun.nio.cs.UTF_32
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:473)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.scalatest.tools.SocketReporter.apply(SocketReporter.scala:31)
        at org.scalatest.DispatchReporter$Propagator.$anonfun$run$10(DispatchReporter.scala:249)
        at org.scalatest.DispatchReporter$Propagator.$anonfun$run$10$adapted(DispatchReporter.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:388)
        at org.scalatest.DispatchReporter$Propagator.run(DispatchReporter.scala:248)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-13" java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: sun.nio.cs.UTF_32
[info] - should monix-circe.enumerate.success.ExampleNestedCaseClass *** FAILED ***

Any tips on what to look for? I assume I violate some kind of protocol

Avasil avatar May 17 '19 22:05 Avasil

I think this would have to wait for CE3 support in Monix: https://github.com/monix/monix/pull/1533

joroKr21 avatar Aug 12 '22 11:08 joroKr21