morpheus-graphql icon indicating copy to clipboard operation
morpheus-graphql copied to clipboard

Subscription returns null

Open dataO1 opened this issue 5 years ago • 28 comments

I am trying to create a subscription endpoint like this:

resolveJoinedLobby :: Maybe User.SessionE
                   -> Pipe
                   -> JoinedLobbyArgs
                   -> ResolveS USEREVENT IO UserJoined
resolveJoinedLobby session pipe args =
  SubResolver { subChannels = [USER], subResolver = subResolver }
  where
    subResolver (Event _ content) = lift (resolveJoinedLobby' content)

    resolveJoinedLobby' :: Content -> IO (UserJoined (IORes USEREVENT))
    resolveJoinedLobby' content = return
      UserJoined { userJoinedUsername =
                     constRes $ pack $ show $ contentID content
                 }

When i perform the request i get back

{
  "data": null
} 

Why is that ? I also dont get, why we need a IORes for the lifted type instead of IOSubRes. I took this from the examples folder, its pretty much the same code, but with different data types.

dataO1 avatar Dec 31 '19 15:12 dataO1

oh, i will investigate it. @diskroma thanks for using morpheus. because of you i can cover lots of bugs.

nalchevanidze avatar Dec 31 '19 15:12 nalchevanidze

actually it does work in my exmple. what is different in your code? can you share some simple example that reproduces your issue?

nalchevanidze avatar Dec 31 '19 19:12 nalchevanidze

IOSubRes is resolver with callback function. nested IOSubRes has more trouble than benefit. that why it is allowed only root level of Subscription.

nalchevanidze avatar Dec 31 '19 19:12 nalchevanidze

Im using the stack version 0.8.0. So not the current checkout of master, I couldnt find out how to get that compiling, im very new to haskell and stack. Maybe thats a reason ? Thats also the reason, why I defined the root Query, Mutation and Subscription types in native haskell, like this:

...

data Subscription (m :: * -> *) =
  Subscription { joined :: JoinedLobbyArgs -> m (UserJoined (IORes USEREVENT))
               }
  deriving (Generic, GQLType)

data JoinedLobbyArgs = JoinedLobbyArgs { _id :: Text }
  deriving (Generic)

The UserJoined result is a type defined in the gql file, which is just a wrapper around a Text (the username). Im using the above resolveJoinedLobby function in the root resolver, just like this:


lobbyApi :: Pipe -> Maybe User.SessionE -> ByteString -> IO ByteString
lobbyApi pipe session = interpreter $ lobbyGqlRoot pipe session

lobbyGqlRoot :: Pipe
             -> Maybe User.SessionE
             -> GQLRootResolver IO USEREVENT Query Mutation Subscription
lobbyGqlRoot pipe session =
  GQLRootResolver { queryResolver, mutationResolver, subscriptionResolver }
  where
    queryResolver = 

...

    subscriptionResolver =
      Subscription { joined = resolveJoinedLobby session pipe }

And for the query:

subscription {
  joined (_id:"5e09fd02b39b1a2258000001"){
    username
  }
}

dataO1 avatar Jan 01 '20 15:01 dataO1

oh, i will investigate it. @diskroma thanks for using morpheus. because of you i can cover lots of bugs.

As I said, I just started learning haskell, so im sorry if i am asking some trivial things sometimes, but I like the library so far. Keep up the good work :)

dataO1 avatar Jan 01 '20 15:01 dataO1

can you try with morpheus-graphql-0.9.0 ?

nalchevanidze avatar Jan 01 '20 19:01 nalchevanidze

I cant test it yet, since i get a kind mismatch in 0.9.0. See #360

dataO1 avatar Jan 01 '20 22:01 dataO1

@nalchevanidze Okay, got it to compile, same problem persists with 0.9.0.

dataO1 avatar Jan 02 '20 09:01 dataO1

@diskroma hi could you provide whole problem in one simple file or gist where i could reproduce it?

nalchevanidze avatar Jan 04 '20 12:01 nalchevanidze

Heres a minimal reproduction of the bug.

bug.zip

dataO1 avatar Jan 05 '20 00:01 dataO1

Hi. There is this problems in your project.

  • query must be not undefined
  • subscription needs some event to return response. Initial response is alleays empty
  • for subscription you should use ws socket port and url.
  • for client use apollo please. If you use prisma playground it will work

nalchevanidze avatar Jan 06 '20 13:01 nalchevanidze

@nalchevanidze Okay, i now finally got it compiling as a websocket app. Since i wanted to have several endpoints for different query features, the event gets triggerend in a different resolver than the one where the subscription is registered at.

The websocket is registered as follows:

...
 let wsApp    = gqlSocketApp subscriptionRoot
        state   <- initGQLState
        httpApp <- sapp state pipe
        Warp.runSettings settings $ WaiWs.websocketsOr
                defaultConnectionOptions
                (wsApp state)
                httpApp
...

This subscriptionRoot is also exposed as an endpoint via scotty for registering subscriptions. But there is also the lobbyApi endpoint, which is responsible for mutations, that should trigger the events for firing the subscriptions that got registered before via subscriptionRoot:

 post "/lobby" $ do
                session <- getSession
                raw =<< (liftIO . lobbyApi pipe session =<< body)
 post "/sub" $ raw =<< (liftIO . subscriptionApi =<< body)

The problem is that subscriptions dont get triggered like this. Is there a way to realize that ? Or do i need to have one single endpoint for everything ? The problem with that would be, that i need the session parameter in the lobbyResolver, which is parsed from a JWT token from the headers. Since this is just served in the scotty post request handler i wouldnt be able to serve it during creation of the wsApp.

dataO1 avatar Jan 18 '20 15:01 dataO1

@dataO1 hi please update to 0.14.0 and try again.

nalchevanidze avatar Aug 15 '20 13:08 nalchevanidze

I did upgrade to 0.14.0 Now the subscription doesn't show any errors, but it returns nothing either, its just waiting for events (which i of course trigger via a mutation). To make sure i didn't make any mistakes i built a dummy project, that just has a very simple query, mutation and subscription. Same thing here. Here's the code: minimal.zip

After looking into the above changes made by @RobinKrom, the fact that i have to interpret the same api twice, in order to match the types for webSocketApp seems suspicious to me. Like here:

scottyServer :: IO ()
scottyServer  = do
        (wsApp, publish) <- webSocketsApp $ subApi
        startServer wsApp $ httpEndpoint "/" api 

where api and subApi are the essentially the same thing but with another type:

api :: ByteString -> IO ByteString
api = interpreter gqlRoot

subApi :: Input api -> Stream api UserEvent IO
subApi = interpreter gqlRoot

Is there any way around this? Is this even correct usage of webSocketApp?

dataO1 avatar Aug 17 '20 21:08 dataO1

In my case, I found after some debugging that the httpPubApp and the webSocketsApp were using two separate stores. As a consequence, subscriptions never received events from mutations received over the httpPubApp. I also think that interpreting the same API twice is suspicious. After patching morpheus so that I can specify and initialize the used Store, everything works as expected. Here's my current code using the Snap webserver for comparison:

main :: IO ()
main = bracket
  initialize
  shutDown
  (quickHttpServe . site)

initialize :: IO (AcidState State, Store APIEvent IO)
initialize = do
  st <- openLocalState emptyState
  store <- initDefaultStore
  pure (st, store)

shutDown :: (AcidState State, Store APIEvent IO) -> IO ()
shutDown (st, _store) = closeAcidState st

site :: (AcidState State, Store APIEvent IO) -> Snap ()
site (st, store) = do
  uId <- getUserId
  (wsApp, publish) <- liftIO $ webSocketsApp store $ gqlApi st uId
  CORS.applyCORS CORS.defaultOptions $ route
    [ ("query", gqlQueryHandler st publish)
    , ("subscribe", gqlSubHandler st wsApp)
    ]

gqlQueryHandler :: AcidState State -> (APIEvent -> IO ()) -> Snap ()
gqlQueryHandler st publish = do
  rqBody <- readRequestBody 10000
  rq <- getRequest
  uId <- getUserId
  r <- liftIO $ httpPubApp (gqlApi st uId) publish rqBody
  writeLBS r

gqlSubHandler :: AcidState State -> ServerApp -> Snap ()
gqlSubHandler st wsApp = do
  r <- getRequest
  runWebSocketsSnap wsApp

Let me know if I can help somehow, it's been a joy to use morpheus so far!

ghost avatar Aug 18 '20 09:08 ghost

hi. @dataO1

you only need.

subApi :: Input api -> Stream api UserEvent IO
subApi = interpreter gqlRoot

after that you use : (wsApp, publish) <- webSocketsApp api and httpPubApp api publish

nalchevanidze avatar Aug 18 '20 20:08 nalchevanidze

hi @robin-da,

httpPubApp

does not need any store.

it receives callback function publish with signature (e -> m ()). so on every mutation it calls the publish with corresponding event. it does not be to aware of store state.

httpPubApp:

returns websocket App and callback function (e -> m ()). every time you call publish it triggers subscriptions that satisfies that subscriptions.

(wsApp, publish) <- webSocketsApp api

why this way?

the cenrralized store is bad. imagine if we have milion of subscripions. on publish server must check every node and notify them(very slow). it is really hard to scale it up.

with this method we can scale it horizontally. subscription nodes do not know each other. only publisher node must know them to notify.

  • subscription nodes do not share store.
  • they can be deployed on different servers.

publisher server calls them with httpClient.

httpApp ::  ByteString -> IO ByteString
httpApp = httpPubApp api publish

publish :: e -> String ->IO ()
publish event 
    =  traverse_   
       (publishEventOn event)  
       ["<subscription_1 server url> ","<subscription_2 server url> ",...,"<subscription_n server url> "]

publishEventOn :: e -> String ->IO ()
publishEventOn event url = runReq defaultHttpConfig $ do
  ...
  req  POST (https url) (ReqBodyLbs (decode event))

nalchevanidze avatar Aug 18 '20 21:08 nalchevanidze

@robin-da,

or in your case.

api :: Input api -> Stream api UserEvent IO
api = interpreter gqlRoot

buildApp:: IO  (ServerApp, ByteString -> IO ByteString)
buildApp = do
  (wsApp, publish) <- webSocketsApp api
  pure (wsApp,httpPubApp api publish)

nalchevanidze avatar Aug 18 '20 21:08 nalchevanidze

Hi @nalchevanidze, I tried the recommended solution and I ended up with a subscription that never receives any events.

I agree with your argument that you don't want a centralized store because of scaling. I think the problem that I'm experiencing is that the store the httpApi publishes to is different from the store the webSocketsApi reads events from. I arrived at this conclusion when I traced the publish function and can see that it always traverses over an empty store, and I also traced the writeStore method of the Store handle and can see that events are written to a store, but probably a different one. By forcing the webSocketsApi and the httpApp to use the same store, the bug disappeared. I have yet to figure out, how they end up with two different stores.

drsk0 avatar Aug 19 '20 09:08 drsk0

@RobinKrom I think you initiate 2 different web socket apps. one that you actually use. and second that you pass to the httpApp. that why is second one never updated.

can you show me actual code?

nalchevanidze avatar Aug 19 '20 09:08 nalchevanidze

Here's the code with an unpatched morpheus that doesn't work. It connects to the websocket but never receives any events.

main :: IO ()
main = bracket
  initialize
  shutDown
  (quickHttpServe . site)

initialize :: IO (AcidState State)
initialize = do
  st <- openLocalState emptyState
  pure st

shutDown :: AcidState State -> IO ()
shutDown st = closeAcidState st

site :: AcidState State -> Snap ()
site st = do
  uId <- getUserId
  (wsApp, publish) <- liftIO $ webSocketsApp $ gqlApi st uId
  CORS.applyCORS CORS.defaultOptions $ route
    [ ("query", gqlQueryHandler st publish)
    , ("subscribe", gqlSubHandler st wsApp)
    ]

gqlQueryHandler :: AcidState State -> (APIEvent -> IO ()) -> Snap ()
gqlQueryHandler st publish = do
  rqBody <- readRequestBody 10000
  rq <- getRequest
  liftIO $ putStrLn $ show rq
  uId <- getUserId
  r <- liftIO $ httpPubApp (gqlApi st uId) publish rqBody
  liftIO $ putStrLn $ show r
  writeLBS r

gqlSubHandler :: AcidState State -> ServerApp -> Snap ()
gqlSubHandler st wsApp = do
  r <- getRequest
  liftIO $ putStrLn $ "Subscription request:" <> show r
  runWebSocketsSnap wsApp

gqlApi :: AcidState State -> UserId -> (Input api -> Stream api APIEvent IO)
gqlApi st uId = M.interpreter $ rootResolver st uId

drsk0 avatar Aug 19 '20 10:08 drsk0

main :: IO ()
main =
  bracket
    initialize
    shutDown
    (quickHttpServe . site app)

initialize :: IO (AcidState State, ServerApp, ByteString -> IO ByteString)
initialize = do
  st <- openLocalState emptyState
  uId <- getUserId
  let api = gqlApi st uid
  (wsApp, publish) <- webSocketsApp  api
  pure (st, wsApp,  httpPubApp api publish)

shutDown :: ( AcidState State, ServerApp, ByteString -> IO ByteString)-> IO ()
shutDown (st,_,_) = closeAcidState st

site :: (ServerApp, ByteString -> IO ByteString) -> AcidState State -> Snap ()
site (_, wsApp,httpApp) st = do
  CORS.applyCORS CORS.defaultOptions $ route
    [ ("query", gqlQueryHandler httpApp)
    , ("subscribe", gqlSubHandler wsApp)
    ]

gqlQueryHandler :: (ByteString -> IO ByteString) -> Snap ()
gqlQueryHandler  app = do
  rqBody <- readRequestBody 10000
  rq <- getRequest
  liftIO $ putStrLn $ show rq
  r <- liftIO $ app rqBody
  liftIO $ putStrLn $ show r
  writeLBS r

gqlSubHandler ::  ServerApp -> Snap ()
gqlSubHandler st wsApp = do
  r <- getRequest
  liftIO $ putStrLn $ "Subscription request:" <> show r
  runWebSocketsSnap wsApp

gqlApi :: AcidState State -> UserId -> (Input api -> Stream api APIEvent IO)
gqlApi st uId = M.interpreter $ rootResolver st uId

what if you try this. i think bracket initiate them in different threads.

nalchevanidze avatar Aug 19 '20 10:08 nalchevanidze

I tried your code with some modifications:

main :: IO ()
main =
  bracket
    initialize
    shutDown
    (quickHttpServe . site)

initialize :: IO (AcidState State, ServerApp, BS.ByteString -> IO BS.ByteString)
initialize = do
  st <- openLocalState emptyState
  let uId = "this is a problem: can't use snap here!!!" -- <- getUserId
  (wsApp, publish) <- webSocketsApp (gqlApi st uId)
  pure (st, wsApp,  httpPubApp (gqlApi st uId) publish)

shutDown :: ( AcidState State, ServerApp, BS.ByteString -> IO BS.ByteString)-> IO ()
shutDown (st,_,_) = closeAcidState st

site :: (AcidState State, ServerApp, BS.ByteString -> IO BS.ByteString) -> Snap ()
site (st, wsApp,httpApp) = do
  CORS.applyCORS CORS.defaultOptions $ route
    [ ("query", gqlQueryHandler httpApp)
    , ("subscribe", gqlSubHandler st wsApp)
    ]

gqlQueryHandler :: (BS.ByteString -> IO BS.ByteString) -> Snap ()
gqlQueryHandler app = do
  rqBody <- readRequestBody 10000
  rq <- getRequest
  liftIO $ putStrLn $ show rq
  r <- liftIO $ app $ BSL.toStrict rqBody
  liftIO $ putStrLn $ show r
  writeBS r

gqlSubHandler :: AcidState state -> ServerApp -> Snap ()
gqlSubHandler st wsApp = do
  r <- getRequest
  liftIO $ putStrLn $ "Subscription request:" <> show r
  runWebSocketsSnap wsApp

This does work, yay! However, there are two major problems with this API that I hope we can change:

  1. this is very subtle. If you're not running a single-threaded application, you easily hit this problem.
  2. because you need to run webSocketsApp in the initialization now, you're outside of the webserver monad and it's impossible to parse any headers of incoming requests (for example for authentication) that your API depends on.

My suggestion would still be to move any initialization of state outside the webSocketsApp function and make it explicit. This doesn't force you to use a centralized store, you can change the api such that you can have different stores for each subscription node and publish to many stores from one node. This way you let the user decide what kind of topology they would like to use to inform their subscription nodes and how to scale up.

But maybe you have a better suggestion?

drsk0 avatar Aug 19 '20 11:08 drsk0

My api also depends on some headers (Authorization token) that i need to parse within the ActionM Monad from Scotty. That's why i didnt use httpPubApp, since i would'nt be able to access the headers there. I dont know how you would change that though, since im not familiar with all the internal stuff going on underneath.

dataO1 avatar Aug 19 '20 13:08 dataO1

@RobinKrom please create small github example project that i can checkout (to debug morpheus).

nalchevanidze avatar Aug 25 '20 14:08 nalchevanidze

one suggestion from me is to use custom monad instead of passing argument to morpheus graphql.

e.g

type MyMonad = ReaderT (AcidState State) Snap

main :: IO ()
main = bracket
  initialize
  shutDown
  (quickHttpServe . runReaderT site)

initialize :: IO (AcidState State)
initialize = openLocalState emptyState

shutDown :: AcidState State -> IO ()
shutDown = closeAcidState 

site ::  MyMonad ()
site = do
  (wsApp, publish) <- webSocketsApp gqlApp
  CORS.applyCORS CORS.defaultOptions $ route
    [ ("query", gqlQueryHandler [publish] )
    , ("subscribe", gqlSubHandler  wsApp)
    ]

gqlQueryHandler :: [APIEvent -> MyMonad ()] -> MyMonad ()
gqlQueryHandler  publish = do
  rqBody <- readRequestBody 10000
  rq <- getRequest
  liftIO $ putStrLn $ show rq
  r <- httpPubApp publish gqlApp rqBody
  liftIO $ putStrLn $ show r
  writeLBS r

gqlSubHandler ::  ServerApp -> MyMonad ()
gqlSubHandler wsApp = do
  r <- getRequest
  liftIO $ putStrLn $ "Subscription request:" <> show r
  runWebSocketsSnap wsApp

gqlApp :: App APIEvent MyMonad
gqlApp = deriveApp rootResolver 

nalchevanidze avatar Aug 26 '20 08:08 nalchevanidze

I think using a monad stack here is a matter of taste, it doesn't solve any of the two problems listed above. I'll see if I can create a small example project to experiment over the weekend, but it essentially will be the above code.

drsk0 avatar Aug 28 '20 09:08 drsk0

@nalchevanidze , it took me some time but I finally created a smallish test project for you to experiment with this issue: https://github.com/RobinKrom/morpheus-subscription-example. If you have nix and direnv you should be able to just cd into the directory and run ghci. If not, you should be able to build via cabal. It basically is the above example using Snap and an IORef to store the state.

drsk0 avatar Sep 15 '20 20:09 drsk0

hi @drsk0 , sorry for not responding for long time. could you pleasse recreate the case. in best case in morpheus-graphql so i can check it whenever i can and could also automate testing.

nalchevanidze avatar Oct 15 '22 10:10 nalchevanidze

hi @drsk0 , i will close it now and you can reopen it with example file again

nalchevanidze avatar Nov 08 '22 21:11 nalchevanidze