finch
finch copied to clipboard
[#1078] Streaming with Monix's Observable
Related to #1078
Observable doesn't have the right kind but I think I managed to find decent workaround using
type ObservableF[F[_], A] = Observable[A]
// coproducts were inferring `Observable[A]` instead of alias
implicit def aliasResponseToRealResponse[F[_], A, CT <: Application.Json](implicit
tr: ToResponse.Aux[F, ObservableF[F, A], CT]
): ToResponse.Aux[F, Observable[A], CT] = tr
Observable is built with Task internally and depends on Scheduler so it will probably stay this way. On the other hand, it provides many methods that can work with polymorphic effects.
It's still Task under the hood so it's not that nice but if you provide Scheduler you can use other F and I'm pretty sure it would still be more performant than alternatives.
Do you think this approach will work correctly?
Despite some flaws, it would be fantastic to have server-side support for streaming with Monix Observable for its users. :)
Codecov Report
Merging #1112 into master will decrease coverage by
2.19%. The diff coverage is34.61%.
@@ Coverage Diff @@
## master #1112 +/- ##
=========================================
- Coverage 80.25% 78.05% -2.2%
=========================================
Files 54 56 +2
Lines 1028 1080 +52
Branches 47 48 +1
=========================================
+ Hits 825 843 +18
- Misses 203 237 +34
| Impacted Files | Coverage Δ | |
|---|---|---|
| examples/src/main/scala/io/finch/monix/Main.scala | 0% <0%> (ø) |
|
| circe/src/main/scala/io/finch/circe/Decoders.scala | 70% <0%> (-30%) |
:arrow_down: |
| monix/src/main/scala/io/finch/monix/package.scala | 62.06% <62.06%> (ø) |
|
| core/src/main/scala/io/finch/Endpoint.scala | 72.57% <0%> (ø) |
:arrow_up: |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update 32c6ac5...eb7f264. Read the comment docs.
Thanks, @Avasil!
There is also PR #1098 built on top of Iterant. I wonder if we can unite them under the same module.
@sergeykolbasov I don't see anything against it (the module will add monix-tail dependency but it's not that big)
I've noticed I missed test for Circe module, I'll add it tonight
@sergeykolbasov I have problem with CirceSpec - I added test for Observable but it fails with following error:
[info] CirceSpec:
Reporter completed abruptly with an exception after receiving event: TestFailed(Ordinal(0, 3),GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
(AbstractJsonSpec.scala:41)
Falsified after 1 successful property evaluations.
Location: (AbstractJsonSpec.scala:41)
Occurred when passed generated values (
arg0 = List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))"), // 3 shrinks
arg1 = UTF-32
)
Label of failing property:
Expected: List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))")
Received: List(),CirceSpec,io.finch.circe.test.CirceSpec,Some(io.finch.circe.test.CirceSpec),should monix-circe.enumerate.success.ExampleNestedCaseClass,should monix-circe.enumerate.success.ExampleNestedCaseClass,Vector(),Some(org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException: GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
(AbstractJsonSpec.scala:41)
Falsified after 1 successful property evaluations.
Location: (AbstractJsonSpec.scala:41)
Occurred when passed generated values (
arg0 = List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))"), // 3 shrinks
arg1 = UTF-32
)
Label of failing property:
Expected: List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))")
Received: List()),Some(790),Some(IndentedText(- should monix-circe.enumerate.success.ExampleNestedCaseClass,should monix-circe.enumerate.success.ExampleNestedCaseClass,0)),Some(SeeStackDepthException),Some(io.finch.circe.test.CirceSpec),None,pool-1-thread-1-ScalaTest-running-CirceSpec,1558125731073).
java.io.NotSerializableException: sun.nio.cs.UTF_32
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:473)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.scalatest.tools.SocketReporter.apply(SocketReporter.scala:31)
at org.scalatest.DispatchReporter$Propagator.$anonfun$run$10(DispatchReporter.scala:249)
at org.scalatest.DispatchReporter$Propagator.$anonfun$run$10$adapted(DispatchReporter.scala:248)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.scalatest.DispatchReporter$Propagator.run(DispatchReporter.scala:248)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-13" java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: sun.nio.cs.UTF_32
[info] - should monix-circe.enumerate.success.ExampleNestedCaseClass *** FAILED ***
Any tips on what to look for? I assume I violate some kind of protocol
I think this would have to wait for CE3 support in Monix: https://github.com/monix/monix/pull/1533