Add Ox integration: SSE & WebSockets?
The default backend is already fully usable for direct style Scala, along with streaming (via InputStream bodies), and web sockets (using the WebSocket's blocking .send() and .receive() methods).
However, what we are missing is integration with SSE, streaming websockets (sometimes its more convenient to work this way) and high-level streaming (mapping over byte chunks or lines, for example).
That's why it might reasonable to have an Ox integration module. In fact, this is a joint Ox+sttp-client issue, as it requires changes in both. Going from the end:
- for support high-level streaming operations (on the byte-chunk or line level), we could add some I/O capabilities to Ox's
Source, such as two-way conversions between anInputStreamand aSource[byte chunk]; between anInputStreamand aSource[String](lines); and finally writing such sources to files/reading from files. What remains to be determined here is what's a good representation of abyte chunk. A simpleArray[Byte]?ByteBuffer? Pekko'sBytString? - for web sockets, we could provide a fairly simple conversion between a
WebSocketand a(Source[WebSocketFrame], Sink[WebSocketFrame] - finally, for SSE, I do have some code, but it might need polishing. And it would be great to include it in the integration module:
def parseSse(is: InputStream)(using Ox): Source[ServerSentEvent] =
val chunks = StageCapacity.newChannel[Array[Byte]]
fork {
try
repeatWhile {
val a = new Array[Byte](1024)
val r = is.read(a)
if r == -1 then
chunks.done()
false
else
chunks.send(a.take(r))
true
}
catch case t: Throwable => chunks.errorSafe(t)
}
chunks
.mapStatefulConcat(() => Array.empty[Byte]) { case (buffer, nextChunk) =>
@tailrec
def splitChunksAtNewLine(buf: Array[Byte], chunk: Array[Byte], acc: Vector[Array[Byte]])
: (Array[Byte], Vector[Array[Byte]]) =
val newlineIdx = chunk.indexOf('\n')
if newlineIdx == -1 then (buf ++ chunk, acc)
else
val (chunk1, chunk2) = chunk.splitAt(newlineIdx + 1)
splitChunksAtNewLine(Array.empty[Byte], chunk2, acc :+ (buffer ++ chunk1))
val (newBuffer, toEmit) = splitChunksAtNewLine(buffer, nextChunk, Vector.empty)
(newBuffer, toEmit)
}
.mapAsView(new String(_))
.mapStatefulConcat(() => Vector.empty[String]) { case (acc, el) =>
if el.isBlank then (Vector.empty, Some(acc)) else (acc :+ el.dropRight(1), Nil)
}
.map(lines => ServerSentEvent.parse(lines.asInstanceOf[Vector[String]].toList))
Example usage:
@main def sseClient(): Unit =
supervised {
basicRequest
.post(uri"http://localhost:51823/sse/echo3")
.body("1234567890")
.response(asInputStreamAlways { is =>
parseSse(is).foreach(el => println(s"Got: $el"))
()
})
.send()
.body
}
Wouldn't that be a asSSEAlways?
For all other streaming approaches we simply offer a SSE-parsing stream stage, e.g.: https://sttp.softwaremill.com/en/stable/backends/akka.html#server-sent-events
asSSEAlways is hard to do generically, as you don't know what's the stream type underneath. So it could be done, but would need to be backend-specific