morpheus-graphql
morpheus-graphql copied to clipboard
Subscription returns null
I am trying to create a subscription endpoint like this:
resolveJoinedLobby :: Maybe User.SessionE
-> Pipe
-> JoinedLobbyArgs
-> ResolveS USEREVENT IO UserJoined
resolveJoinedLobby session pipe args =
SubResolver { subChannels = [USER], subResolver = subResolver }
where
subResolver (Event _ content) = lift (resolveJoinedLobby' content)
resolveJoinedLobby' :: Content -> IO (UserJoined (IORes USEREVENT))
resolveJoinedLobby' content = return
UserJoined { userJoinedUsername =
constRes $ pack $ show $ contentID content
}
When i perform the request i get back
{
"data": null
}
Why is that ? I also dont get, why we need a IORes for the lifted type instead of IOSubRes. I took this from the examples folder, its pretty much the same code, but with different data types.
oh, i will investigate it. @diskroma thanks for using morpheus. because of you i can cover lots of bugs.
actually it does work in my exmple. what is different in your code? can you share some simple example that reproduces your issue?
IOSubRes
is resolver with callback function. nested IOSubRes
has more trouble than benefit. that why it is allowed only root level of Subscription.
Im using the stack version 0.8.0. So not the current checkout of master, I couldnt find out how to get that compiling, im very new to haskell and stack. Maybe thats a reason ? Thats also the reason, why I defined the root Query, Mutation and Subscription types in native haskell, like this:
...
data Subscription (m :: * -> *) =
Subscription { joined :: JoinedLobbyArgs -> m (UserJoined (IORes USEREVENT))
}
deriving (Generic, GQLType)
data JoinedLobbyArgs = JoinedLobbyArgs { _id :: Text }
deriving (Generic)
The UserJoined result is a type defined in the gql file, which is just a wrapper around a Text (the username).
Im using the above resolveJoinedLobby
function in the root resolver, just like this:
lobbyApi :: Pipe -> Maybe User.SessionE -> ByteString -> IO ByteString
lobbyApi pipe session = interpreter $ lobbyGqlRoot pipe session
lobbyGqlRoot :: Pipe
-> Maybe User.SessionE
-> GQLRootResolver IO USEREVENT Query Mutation Subscription
lobbyGqlRoot pipe session =
GQLRootResolver { queryResolver, mutationResolver, subscriptionResolver }
where
queryResolver =
...
subscriptionResolver =
Subscription { joined = resolveJoinedLobby session pipe }
And for the query:
subscription {
joined (_id:"5e09fd02b39b1a2258000001"){
username
}
}
oh, i will investigate it. @diskroma thanks for using morpheus. because of you i can cover lots of bugs.
As I said, I just started learning haskell, so im sorry if i am asking some trivial things sometimes, but I like the library so far. Keep up the good work :)
can you try with morpheus-graphql-0.9.0
?
I cant test it yet, since i get a kind mismatch in 0.9.0. See #360
@nalchevanidze Okay, got it to compile, same problem persists with 0.9.0.
@diskroma hi could you provide whole problem in one simple file
or gist
where i could reproduce it?
Hi. There is this problems in your project.
- query must be not undefined
- subscription needs some event to return response. Initial response is alleays empty
- for subscription you should use ws socket port and url.
- for client use apollo please. If you use prisma playground it will work
@nalchevanidze Okay, i now finally got it compiling as a websocket app. Since i wanted to have several endpoints for different query features, the event gets triggerend in a different resolver than the one where the subscription is registered at.
The websocket is registered as follows:
...
let wsApp = gqlSocketApp subscriptionRoot
state <- initGQLState
httpApp <- sapp state pipe
Warp.runSettings settings $ WaiWs.websocketsOr
defaultConnectionOptions
(wsApp state)
httpApp
...
This subscriptionRoot
is also exposed as an endpoint via scotty for registering subscriptions.
But there is also the lobbyApi
endpoint, which is responsible for mutations, that should trigger the events for firing the subscriptions that got registered before via subscriptionRoot
:
post "/lobby" $ do
session <- getSession
raw =<< (liftIO . lobbyApi pipe session =<< body)
post "/sub" $ raw =<< (liftIO . subscriptionApi =<< body)
The problem is that subscriptions dont get triggered like this. Is there a way to realize that ? Or do i need to have one single endpoint for everything ? The problem with that would be, that i need the session parameter in the lobbyResolver, which is parsed from a JWT token from the headers. Since this is just served in the scotty post request handler i wouldnt be able to serve it during creation of the wsApp.
@dataO1 hi please update to 0.14.0
and try again.
I did upgrade to 0.14.0
Now the subscription doesn't show any errors, but it returns nothing either, its just waiting for events (which i of course trigger via a mutation).
To make sure i didn't make any mistakes i built a dummy project, that just has a very simple query, mutation and subscription.
Same thing here. Here's the code:
minimal.zip
After looking into the above changes made by @RobinKrom, the fact that i have to interpret the same api twice, in order to match the types for webSocketApp
seems suspicious to me. Like here:
scottyServer :: IO ()
scottyServer = do
(wsApp, publish) <- webSocketsApp $ subApi
startServer wsApp $ httpEndpoint "/" api
where api and subApi are the essentially the same thing but with another type:
api :: ByteString -> IO ByteString
api = interpreter gqlRoot
subApi :: Input api -> Stream api UserEvent IO
subApi = interpreter gqlRoot
Is there any way around this? Is this even correct usage of webSocketApp
?
In my case, I found after some debugging that the httpPubApp
and the webSocketsApp
were using two separate stores. As a consequence, subscriptions
never received events from mutations
received over the httpPubApp
. I also think that interpreting the same API twice is suspicious. After patching morpheus
so that I can specify and initialize the used Store
, everything works as expected.
Here's my current code using the Snap
webserver for comparison:
main :: IO ()
main = bracket
initialize
shutDown
(quickHttpServe . site)
initialize :: IO (AcidState State, Store APIEvent IO)
initialize = do
st <- openLocalState emptyState
store <- initDefaultStore
pure (st, store)
shutDown :: (AcidState State, Store APIEvent IO) -> IO ()
shutDown (st, _store) = closeAcidState st
site :: (AcidState State, Store APIEvent IO) -> Snap ()
site (st, store) = do
uId <- getUserId
(wsApp, publish) <- liftIO $ webSocketsApp store $ gqlApi st uId
CORS.applyCORS CORS.defaultOptions $ route
[ ("query", gqlQueryHandler st publish)
, ("subscribe", gqlSubHandler st wsApp)
]
gqlQueryHandler :: AcidState State -> (APIEvent -> IO ()) -> Snap ()
gqlQueryHandler st publish = do
rqBody <- readRequestBody 10000
rq <- getRequest
uId <- getUserId
r <- liftIO $ httpPubApp (gqlApi st uId) publish rqBody
writeLBS r
gqlSubHandler :: AcidState State -> ServerApp -> Snap ()
gqlSubHandler st wsApp = do
r <- getRequest
runWebSocketsSnap wsApp
Let me know if I can help somehow, it's been a joy to use morpheus
so far!
hi. @dataO1
you only need.
subApi :: Input api -> Stream api UserEvent IO
subApi = interpreter gqlRoot
after that you use : (wsApp, publish) <- webSocketsApp api
and httpPubApp api publish
hi @robin-da,
httpPubApp
does not need any store.
it receives callback function publish
with signature (e -> m ())
. so on every mutation it calls the publish with corresponding event. it does not be to aware of store state.
httpPubApp:
returns websocket App and callback function (e -> m ())
. every time you call publish
it triggers subscriptions that satisfies that subscriptions.
(wsApp, publish) <- webSocketsApp api
why this way?
the cenrralized store is bad. imagine if we have milion of subscripions. on publish server must check every node and notify them(very slow). it is really hard to scale it up.
with this method we can scale it horizontally. subscription nodes do not know each other. only publisher node must know them to notify.
- subscription nodes do not share store.
- they can be deployed on different servers.
publisher server calls them with httpClient.
httpApp :: ByteString -> IO ByteString
httpApp = httpPubApp api publish
publish :: e -> String ->IO ()
publish event
= traverse_
(publishEventOn event)
["<subscription_1 server url> ","<subscription_2 server url> ",...,"<subscription_n server url> "]
publishEventOn :: e -> String ->IO ()
publishEventOn event url = runReq defaultHttpConfig $ do
...
req POST (https url) (ReqBodyLbs (decode event))
@robin-da,
or in your case.
api :: Input api -> Stream api UserEvent IO
api = interpreter gqlRoot
buildApp:: IO (ServerApp, ByteString -> IO ByteString)
buildApp = do
(wsApp, publish) <- webSocketsApp api
pure (wsApp,httpPubApp api publish)
Hi @nalchevanidze, I tried the recommended solution and I ended up with a subscription that never receives any events.
I agree with your argument that you don't want a centralized store because of scaling. I think the problem that I'm experiencing is that the store the httpApi
publishes to is different from the store the webSocketsApi
reads events from. I arrived at this conclusion when I traced the publish
function and can see that it always traverses over an empty store, and I also traced the writeStore
method of the Store
handle and can see that events are written to a store, but probably a different one.
By forcing the webSocketsApi
and the httpApp
to use the same store, the bug disappeared. I have yet to figure out, how they end up with two different stores.
@RobinKrom I think you initiate 2 different web socket apps. one that you actually use. and second that you pass to the httpApp. that why is second one never updated.
can you show me actual code?
Here's the code with an unpatched morpheus that doesn't work. It connects to the websocket but never receives any events.
main :: IO ()
main = bracket
initialize
shutDown
(quickHttpServe . site)
initialize :: IO (AcidState State)
initialize = do
st <- openLocalState emptyState
pure st
shutDown :: AcidState State -> IO ()
shutDown st = closeAcidState st
site :: AcidState State -> Snap ()
site st = do
uId <- getUserId
(wsApp, publish) <- liftIO $ webSocketsApp $ gqlApi st uId
CORS.applyCORS CORS.defaultOptions $ route
[ ("query", gqlQueryHandler st publish)
, ("subscribe", gqlSubHandler st wsApp)
]
gqlQueryHandler :: AcidState State -> (APIEvent -> IO ()) -> Snap ()
gqlQueryHandler st publish = do
rqBody <- readRequestBody 10000
rq <- getRequest
liftIO $ putStrLn $ show rq
uId <- getUserId
r <- liftIO $ httpPubApp (gqlApi st uId) publish rqBody
liftIO $ putStrLn $ show r
writeLBS r
gqlSubHandler :: AcidState State -> ServerApp -> Snap ()
gqlSubHandler st wsApp = do
r <- getRequest
liftIO $ putStrLn $ "Subscription request:" <> show r
runWebSocketsSnap wsApp
gqlApi :: AcidState State -> UserId -> (Input api -> Stream api APIEvent IO)
gqlApi st uId = M.interpreter $ rootResolver st uId
main :: IO ()
main =
bracket
initialize
shutDown
(quickHttpServe . site app)
initialize :: IO (AcidState State, ServerApp, ByteString -> IO ByteString)
initialize = do
st <- openLocalState emptyState
uId <- getUserId
let api = gqlApi st uid
(wsApp, publish) <- webSocketsApp api
pure (st, wsApp, httpPubApp api publish)
shutDown :: ( AcidState State, ServerApp, ByteString -> IO ByteString)-> IO ()
shutDown (st,_,_) = closeAcidState st
site :: (ServerApp, ByteString -> IO ByteString) -> AcidState State -> Snap ()
site (_, wsApp,httpApp) st = do
CORS.applyCORS CORS.defaultOptions $ route
[ ("query", gqlQueryHandler httpApp)
, ("subscribe", gqlSubHandler wsApp)
]
gqlQueryHandler :: (ByteString -> IO ByteString) -> Snap ()
gqlQueryHandler app = do
rqBody <- readRequestBody 10000
rq <- getRequest
liftIO $ putStrLn $ show rq
r <- liftIO $ app rqBody
liftIO $ putStrLn $ show r
writeLBS r
gqlSubHandler :: ServerApp -> Snap ()
gqlSubHandler st wsApp = do
r <- getRequest
liftIO $ putStrLn $ "Subscription request:" <> show r
runWebSocketsSnap wsApp
gqlApi :: AcidState State -> UserId -> (Input api -> Stream api APIEvent IO)
gqlApi st uId = M.interpreter $ rootResolver st uId
what if you try this. i think bracket initiate them in different threads.
I tried your code with some modifications:
main :: IO ()
main =
bracket
initialize
shutDown
(quickHttpServe . site)
initialize :: IO (AcidState State, ServerApp, BS.ByteString -> IO BS.ByteString)
initialize = do
st <- openLocalState emptyState
let uId = "this is a problem: can't use snap here!!!" -- <- getUserId
(wsApp, publish) <- webSocketsApp (gqlApi st uId)
pure (st, wsApp, httpPubApp (gqlApi st uId) publish)
shutDown :: ( AcidState State, ServerApp, BS.ByteString -> IO BS.ByteString)-> IO ()
shutDown (st,_,_) = closeAcidState st
site :: (AcidState State, ServerApp, BS.ByteString -> IO BS.ByteString) -> Snap ()
site (st, wsApp,httpApp) = do
CORS.applyCORS CORS.defaultOptions $ route
[ ("query", gqlQueryHandler httpApp)
, ("subscribe", gqlSubHandler st wsApp)
]
gqlQueryHandler :: (BS.ByteString -> IO BS.ByteString) -> Snap ()
gqlQueryHandler app = do
rqBody <- readRequestBody 10000
rq <- getRequest
liftIO $ putStrLn $ show rq
r <- liftIO $ app $ BSL.toStrict rqBody
liftIO $ putStrLn $ show r
writeBS r
gqlSubHandler :: AcidState state -> ServerApp -> Snap ()
gqlSubHandler st wsApp = do
r <- getRequest
liftIO $ putStrLn $ "Subscription request:" <> show r
runWebSocketsSnap wsApp
This does work, yay! However, there are two major problems with this API that I hope we can change:
- this is very subtle. If you're not running a single-threaded application, you easily hit this problem.
- because you need to run
webSocketsApp
in the initialization now, you're outside of the webserver monad and it's impossible to parse any headers of incoming requests (for example for authentication) that your API depends on.
My suggestion would still be to move any initialization of state outside the webSocketsApp
function and make it explicit. This doesn't force you to use a centralized store, you can change the api such that you can have different stores for each subscription node and publish to many stores from one node. This way you let the user decide what kind of topology they would like to use to inform their subscription nodes and how to scale up.
But maybe you have a better suggestion?
My api also depends on some headers (Authorization token) that i need to parse within the ActionM
Monad from Scotty. That's why i didnt use httpPubApp
, since i would'nt be able to access the headers there.
I dont know how you would change that though, since im not familiar with all the internal stuff going on underneath.
@RobinKrom please create small github example project that i can checkout (to debug morpheus).
one suggestion from me is to use custom monad instead of passing argument to morpheus graphql.
e.g
type MyMonad = ReaderT (AcidState State) Snap
main :: IO ()
main = bracket
initialize
shutDown
(quickHttpServe . runReaderT site)
initialize :: IO (AcidState State)
initialize = openLocalState emptyState
shutDown :: AcidState State -> IO ()
shutDown = closeAcidState
site :: MyMonad ()
site = do
(wsApp, publish) <- webSocketsApp gqlApp
CORS.applyCORS CORS.defaultOptions $ route
[ ("query", gqlQueryHandler [publish] )
, ("subscribe", gqlSubHandler wsApp)
]
gqlQueryHandler :: [APIEvent -> MyMonad ()] -> MyMonad ()
gqlQueryHandler publish = do
rqBody <- readRequestBody 10000
rq <- getRequest
liftIO $ putStrLn $ show rq
r <- httpPubApp publish gqlApp rqBody
liftIO $ putStrLn $ show r
writeLBS r
gqlSubHandler :: ServerApp -> MyMonad ()
gqlSubHandler wsApp = do
r <- getRequest
liftIO $ putStrLn $ "Subscription request:" <> show r
runWebSocketsSnap wsApp
gqlApp :: App APIEvent MyMonad
gqlApp = deriveApp rootResolver
I think using a monad stack here is a matter of taste, it doesn't solve any of the two problems listed above. I'll see if I can create a small example project to experiment over the weekend, but it essentially will be the above code.
@nalchevanidze , it took me some time but I finally created a smallish test project for you to experiment with this issue: https://github.com/RobinKrom/morpheus-subscription-example. If you have nix
and direnv
you should be able to just cd
into the directory and run ghci
. If not, you should be able to build via cabal
. It basically is the above example using Snap
and an IORef
to store the state.
hi @drsk0 , sorry for not responding for long time. could you pleasse recreate the case. in best case in morpheus-graphql so i can check it whenever i can and could also automate testing.
hi @drsk0 , i will close it now and you can reopen it with example file again