pedestal icon indicating copy to clipboard operation
pedestal copied to clipboard

Clarify async usage in examples and/or docs?

Open ivarref opened this issue 4 years ago • 7 comments

Description

http://pedestal.io/reference/streaming says that [streaming responses] is entirely a function of the type of the body of the response map (inside the context map) and that Core.async channel is both streamed and async.

Expected Behavior

(defn respond-async [request]
  (let [c (async/chan)]
    (async/go
      (async/>! c "one")
      (async/>! c "two")
      (async/close! c))
    {:status 200
     :body   c}))

(def routes
  (route/expand-routes
    #{["/async" :get respond-async :route-name :async]}))

I figured this would return ["one", "two"] as either edn or json in the headers.

Actual Behavior

Instead I'm getting a long stacktrace:

[qtp2121742096-43] INFO io.pedestal.http - {:msg "GET /async", :line 80}
[qtp2121742096-43] ERROR io.pedestal.http.impl.servlet-interceptor - {:msg "error-stylobate triggered", :context {:response {:status 200, :body #object[clojure.core.async.impl.channels.ManyToManyChannel 0x224010a "clojure.core.async.impl.channels.ManyToManyChannel@224010a"], :headers {"Strict-Transport-Security" "max-age=31536000; includeSubdomains", "X-Frame-Options" "DENY", "X-Content-Type-Options" "nosniff", "X-XSS-Protection" "1; mode=block", "X-Download-Options" "noopen", "X-Permitted-Cross-Domain-Policies" "none", "Content-Security-Policy" "object-src 'none'; script-src 'unsafe-inline' 'unsafe-eval' 'strict-dynamic' https: http:;"}}, :io.pedestal.interceptor.chain/stack (#Interceptor{:name :io.pedestal.http.impl.servlet-interceptor/terminator-injector}), :request {:protocol "HTTP/1.1", :async-supported? true, :remote-addr "127.0.0.1", :servlet-response #object[org.eclipse.jetty.server.Response 0x61b8596 "HTTP/1.1 200 \nDate: Tue, 23 Jun 2020 11:27:26 GMT\r\n\r\n"], :servlet #object[io.pedestal.http.servlet.FnServlet 0x5fa419cf "io.pedestal.http.servlet.FnServlet@5fa419cf"], :headers {"user-agent" "curl/7.68.0", "accept" "*/*", "host" "localhost:8890"}, :server-port 8890, :servlet-request #object[org.eclipse.jetty.server.Request 0x43190b31 "Request(GET //localhost:8890/async)@43190b31"], :path-info "/async", :url-for #object[clojure.lang.Delay 0x501e496a {:status :pending, :val nil}], :uri "/async", :server-name "localhost", :query-string nil, :path-params [], :body #object[org.eclipse.jetty.server.HttpInputOverHTTP 0x72102e32 "HttpInputOverHTTP@72102e32[c=0,q=0,[0]=null,s=STREAM]"], :scheme :http, :request-method :get, :context-path ""}, :bindings {#'io.pedestal.http.route/*url-for* #object[clojure.lang.Delay 0x501e496a {:status :pending, :val nil}]}, :enter-async [#object[io.pedestal.http.impl.servlet_interceptor$start_servlet_async 0x4ea59b37 "io.pedestal.http.impl.servlet_interceptor$start_servlet_async@4ea59b37"]], :io.pedestal.interceptor.chain/terminators (#object[io.pedestal.http.impl.servlet_interceptor$terminator_inject$fn__17009 0x28fd32d3 "io.pedestal.http.impl.servlet_interceptor$terminator_inject$fn__17009@28fd32d3"]), :servlet-response #object[org.eclipse.jetty.server.Response 0x61b8596 "HTTP/1.1 200 \nDate: Tue, 23 Jun 2020 11:27:26 GMT\r\n\r\n"], :route {:path "/async", :method :get, :path-re #"/\Qasync\E", :path-parts ["async"], :interceptors [#Interceptor{:name }], :route-name :async, :path-params [], :io.pedestal.http.route.prefix-tree/satisfies-constraints? #object[clojure.core$constantly$fn__5672 0x2336f47e "clojure.core$constantly$fn__5672@2336f47e"]}, :servlet #object[io.pedestal.http.servlet.FnServlet 0x5fa419cf "io.pedestal.http.servlet.FnServlet@5fa419cf"], :servlet-request #object[org.eclipse.jetty.server.Request 0x43190b31 "Request(GET //localhost:8890/async)@43190b31"], :url-for #object[clojure.lang.Delay 0x501e496a {:status :pending, :val nil}], :io.pedestal.interceptor.chain/execution-id 2, :servlet-config #object[org.eclipse.jetty.servlet.ServletHolder$Config 0x141ac6b "org.eclipse.jetty.servlet.ServletHolder$Config@141ac6b"], :async? #object[io.pedestal.http.impl.servlet_interceptor$servlet_async_QMARK_ 0x3d8de3a4 "io.pedestal.http.impl.servlet_interceptor$servlet_async_QMARK_@3d8de3a4"]}, :line 242}
clojure.lang.ExceptionInfo: java.lang.IllegalArgumentException in Interceptor :io.pedestal.http.impl.servlet-interceptor/ring-response - No implementation of method: :default-content-type of protocol: #'io.pedestal.http.impl.servlet-interceptor/WriteableBody found for class: clojure.core.async.impl.channels.ManyToManyChannel {:execution-id 2, :stage :leave, :interceptor :io.pedestal.http.impl.servlet-interceptor/ring-response, :exception-type :java.lang.IllegalArgumentException, :exception #error {
 :cause "No implementation of method: :default-content-type of protocol: #'io.pedestal.http.impl.servlet-interceptor/WriteableBody found for class: clojure.core.async.impl.channels.ManyToManyChannel"
 :via
 [{:type java.lang.IllegalArgumentException
   :message "No implementation of method: :default-content-type of protocol: #'io.pedestal.http.impl.servlet-interceptor/WriteableBody found for class: clojure.core.async.impl.channels.ManyToManyChannel"
   :at [clojure.core$_cache_protocol_fn invokeStatic "core_deftype.clj" 583]}]
 :trace
 [[clojure.core$_cache_protocol_fn invokeStatic "core_deftype.clj" 583]
  [clojure.core$_cache_protocol_fn invoke "core_deftype.clj" 575]
  [io.pedestal.http.impl.servlet_interceptor$eval16631$fn__16645$G__16620__16650 invoke "servlet_interceptor.clj" 38]
  [io.pedestal.http.impl.servlet_interceptor$set_default_content_type invokeStatic "servlet_interceptor.clj" 153]
  [io.pedestal.http.impl.servlet_interceptor$set_default_content_type invoke "servlet_interceptor.clj" 149]
  [io.pedestal.http.impl.servlet_interceptor$set_response invokeStatic "servlet_interceptor.clj" 157]
  [io.pedestal.http.impl.servlet_interceptor$set_response invoke "servlet_interceptor.clj" 155]
  [io.pedestal.http.impl.servlet_interceptor$send_response invokeStatic "servlet_interceptor.clj" 165]
  [io.pedestal.http.impl.servlet_interceptor$send_response invoke "servlet_interceptor.clj" 162]
  [io.pedestal.http.impl.servlet_interceptor$leave_ring_response invokeStatic "servlet_interceptor.clj" 227]
  [io.pedestal.http.impl.servlet_interceptor$leave_ring_response invoke "servlet_interceptor.clj" 218]
  [io.pedestal.interceptor.chain$try_f invokeStatic "chain.clj" 54]
  [io.pedestal.interceptor.chain$try_f invoke "chain.clj" 44]
  [io.pedestal.interceptor.chain$leave_all_with_binding invokeStatic "chain.clj" 254]
  [io.pedestal.interceptor.chain$leave_all_with_binding invoke "chain.clj" 237]
  [io.pedestal.interceptor.chain$leave_all$fn__11907 invoke "chain.clj" 268]
  [clojure.lang.AFn applyToHelper "AFn.java" 152]
  [clojure.lang.AFn applyTo "AFn.java" 144]
  [clojure.core$apply invokeStatic "core.clj" 665]
  [clojure.core$with_bindings_STAR_ invokeStatic "core.clj" 1973]
  [clojure.core$with_bindings_STAR_ doInvoke "core.clj" 1973]
  [clojure.lang.RestFn invoke "RestFn.java" 425]
  [io.pedestal.interceptor.chain$leave_all invokeStatic "chain.clj" 266]
  [io.pedestal.interceptor.chain$leave_all invoke "chain.clj" 260]
  [io.pedestal.interceptor.chain$execute invokeStatic "chain.clj" 379]
  [io.pedestal.interceptor.chain$execute invoke "chain.clj" 352]
  [io.pedestal.interceptor.chain$execute invokeStatic "chain.clj" 389]
  [io.pedestal.interceptor.chain$execute invoke "chain.clj" 352]
  [io.pedestal.http.impl.servlet_interceptor$interceptor_service_fn$fn__17034 invoke "servlet_interceptor.clj" 351]
  [io.pedestal.http.servlet.FnServlet service "servlet.clj" 28]
  [org.eclipse.jetty.servlet.ServletHolder handle "ServletHolder.java" 873]
  [org.eclipse.jetty.servlet.ServletHandler doHandle "ServletHandler.java" 542]
  [org.eclipse.jetty.server.handler.ScopedHandler nextHandle "ScopedHandler.java" 255]
  [org.eclipse.jetty.server.handler.ContextHandler doHandle "ContextHandler.java" 1345]
  [org.eclipse.jetty.server.handler.ScopedHandler nextScope "ScopedHandler.java" 203]
  [org.eclipse.jetty.servlet.ServletHandler doScope "ServletHandler.java" 480]
  [org.eclipse.jetty.server.handler.ScopedHandler nextScope "ScopedHandler.java" 201]
  [org.eclipse.jetty.server.handler.ContextHandler doScope "ContextHandler.java" 1247]
  [org.eclipse.jetty.server.handler.ScopedHandler handle "ScopedHandler.java" 144]
  [org.eclipse.jetty.server.handler.HandlerWrapper handle "HandlerWrapper.java" 132]
  [org.eclipse.jetty.server.Server handle "Server.java" 505]
  [org.eclipse.jetty.server.HttpChannel handle "HttpChannel.java" 370]
  [org.eclipse.jetty.server.HttpConnection onFillable "HttpConnection.java" 267]
  [org.eclipse.jetty.io.AbstractConnection$ReadCallback succeeded "AbstractConnection.java" 305]
  [org.eclipse.jetty.io.FillInterest fillable "FillInterest.java" 103]
  [org.eclipse.jetty.io.ChannelEndPoint$2 run "ChannelEndPoint.java" 117]
  [org.eclipse.jetty.util.thread.QueuedThreadPool runJob "QueuedThreadPool.java" 698]
  [org.eclipse.jetty.util.thread.QueuedThreadPool$Runner run "QueuedThreadPool.java" 804]
  [java.lang.Thread run "Thread.java" 834]]}}
	at io.pedestal.interceptor.chain$throwable__GT_ex_info.invokeStatic(chain.clj:35)
	at io.pedestal.interceptor.chain$throwable__GT_ex_info.invoke(chain.clj:32)
	at io.pedestal.interceptor.chain$try_f.invokeStatic(chain.clj:57)
	at io.pedestal.interceptor.chain$try_f.invoke(chain.clj:44)
	at io.pedestal.interceptor.chain$leave_all_with_binding.invokeStatic(chain.clj:254)
	at io.pedestal.interceptor.chain$leave_all_with_binding.invoke(chain.clj:237)
	at io.pedestal.interceptor.chain$leave_all$fn__11907.invoke(chain.clj:268)
	at clojure.lang.AFn.applyToHelper(AFn.java:152)
	at clojure.lang.AFn.applyTo(AFn.java:144)
	at clojure.core$apply.invokeStatic(core.clj:665)
	at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1973)
	at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1973)
	at clojure.lang.RestFn.invoke(RestFn.java:425)
	at io.pedestal.interceptor.chain$leave_all.invokeStatic(chain.clj:266)
	at io.pedestal.interceptor.chain$leave_all.invoke(chain.clj:260)
	at io.pedestal.interceptor.chain$execute.invokeStatic(chain.clj:379)
	at io.pedestal.interceptor.chain$execute.invoke(chain.clj:352)
	at io.pedestal.interceptor.chain$execute.invokeStatic(chain.clj:389)
	at io.pedestal.interceptor.chain$execute.invoke(chain.clj:352)
	at io.pedestal.http.impl.servlet_interceptor$interceptor_service_fn$fn__17034.invoke(servlet_interceptor.clj:351)
	at io.pedestal.http.servlet.FnServlet.service(servlet.clj:28)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
	at org.eclipse.jetty.server.Server.handle(Server.java:505)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
	at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: No implementation of method: :default-content-type of protocol: #'io.pedestal.http.impl.servlet-interceptor/WriteableBody found for class: clojure.core.async.impl.channels.ManyToManyChannel
	at clojure.core$_cache_protocol_fn.invokeStatic(core_deftype.clj:583)
	at clojure.core$_cache_protocol_fn.invoke(core_deftype.clj:575)
	at io.pedestal.http.impl.servlet_interceptor$eval16631$fn__16645$G__16620__16650.invoke(servlet_interceptor.clj:38)
	at io.pedestal.http.impl.servlet_interceptor$set_default_content_type.invokeStatic(servlet_interceptor.clj:153)
	at io.pedestal.http.impl.servlet_interceptor$set_default_content_type.invoke(servlet_interceptor.clj:149)
	at io.pedestal.http.impl.servlet_interceptor$set_response.invokeStatic(servlet_interceptor.clj:157)
	at io.pedestal.http.impl.servlet_interceptor$set_response.invoke(servlet_interceptor.clj:155)
	at io.pedestal.http.impl.servlet_interceptor$send_response.invokeStatic(servlet_interceptor.clj:165)
	at io.pedestal.http.impl.servlet_interceptor$send_response.invoke(servlet_interceptor.clj:162)
	at io.pedestal.http.impl.servlet_interceptor$leave_ring_response.invokeStatic(servlet_interceptor.clj:227)
	at io.pedestal.http.impl.servlet_interceptor$leave_ring_response.invoke(servlet_interceptor.clj:218)
	at io.pedestal.interceptor.chain$try_f.invokeStatic(chain.clj:54)
	... 37 more

Steps to reproduce

Use the code above or this gist: https://gist.github.com/ivarref/b57cf40ea4739dacc4bbe7bbcaa8aec0/d449ef39b35e00e9d3ae2b45313d4ec360a3303f

Environment

Linux / REPL.

Operating System (including version).

Ubuntu 18.04.4 LTS

Your current Leiningen or Boot version (lein --version or boot --version)

tools.deps and clojure 1.10.1

Pedestal version

0.5.8

Thanks for a fine library. Kind regards.

ivarref avatar Jun 23 '20 11:06 ivarref

@ivarref, sorry you ran into an issue while exploring Pedestal's async capabilities. The reason you are seeing an error is because the response does not include a Content-Type header and Pedestal is trying to resolve the default content type for this response body type (clojure.core.async.impl.protocols.Channel) of which there is none. If you add aContent-Type header, you will be able to stream data from the channel.

While Pedestal does implement the notion of default content type for a variety of body types, I'm not convinced that it should do so for core.async channels. I do think the error could be clearer, though. Perhaps a default implementation should be provided which throws an error with better messaging.

ddeaguiar avatar Jun 26 '20 17:06 ddeaguiar

Thinking about this further and looking at the existing Response Bodies docs, I'm inclined to set the default content-type for core.async channels to nil.

ddeaguiar avatar Jun 26 '20 18:06 ddeaguiar

Hi @ddeaguiar

And thanks for your helpful response!

It helped to set the Content-Type:

(defn respond-async [request]
  (let [c (async/chan)]
    (async/go
      (async/>! c {:foo 1})
      (async/>! c {:bar 2})
      (async/close! c))
    {:status  200
     :headers {"Content-Type" "application/json"}
     :body    c}))

However it is not producing an array, nor valid JSON maps:

$ curl http://localhost:8890/async
{:foo 1}{:bar 2}

Is there something I'm missing, or am I expected to produce [, , and ] myself, as well as to encode each object to string?

Thanks again and kind regards.

ivarref avatar Jun 29 '20 11:06 ivarref

@ivarref once you start streaming responses you are working at a lower level and are in control over how the information is streamed. Pedestal does not provide any additional control or make decisions based on content type when streaming data.

ddeaguiar avatar Jun 29 '20 20:06 ddeaguiar

OK thanks @ddeaguiar

ivarref avatar Jun 30 '20 08:06 ivarref

For others reading this thread, I ended up with the following:


; require [cheshire.core :as json]

(defn json-array-chan [c]
  (let [out (async/chan)]
    (async/go
      (async/>! out "[")
      (loop [first true
             v (async/<! c)]
        (when v
          (when-not first
            (async/>! out ","))
          (async/>! out (json/generate-string v))
          (recur false (async/<! c))))
      (async/>! out "]")
      (async/close! out))
    out))

(defn respond-async [request]
  (let [c (async/chan)]
    (async/go
      (doseq [e (range 10)] ; also works with for example 1e9
        (async/>! c {:foo e}))
      (async/close! c))
    {:status  200
     :headers {"Content-Type" "application/json;charset=UTF-8"}
     :body    (json-array-chan c)}))

Is this style considered decent? I'd like that (an improved?) json-array-chan be in the examples and/or pedestal. To me this seems like a common use case.

Gist here: https://gist.github.com/ivarref/b57cf40ea4739dacc4bbe7bbcaa8aec0/501d6f904b82d2a5e1e5761a246bb3beba228bf7

ivarref avatar Jun 30 '20 08:06 ivarref

Feels like there needs to be a bit more documentation about streaming responses, overlapping with core.async.

hlship avatar Apr 28 '23 21:04 hlship