distributed-process-platform
distributed-process-platform copied to clipboard
GenServer
See issue #1 and https://gist.github.com/4025934
Continuing over here since it is purely genserver related...
How do I register a process name given I have a SendPort? Is there an easy way to extract the ProcessId from it?
I also could not find a timeout version of receiveChan.
How do I register a process name given I have a SendPort? Is there an easy way to extract the ProcessId from it?
Yes, sendPortProcessId . sendPortId
I also could not find a timeout version of receiveChan.
This is available in HEAD.
I am not 100% if this is an issue with the prototype code I have or an issue with spawnChannelLocal. Basically, spawnChannelLocal returns a SendPort a, but spawnChannelLocal is called from GenServer which is a step removed from the concrete types in Counter? Is this something we will have to defer to the Server callbacks too to make sure it is fully typed?
This is the compilation error in line 132 of GenServer.hs (https://github.com/rodlogic/distributed-process-platform/blob/master/src/Control/Distributed/Platform/GenServer.hs).
Couldn't match type `rq' with `(ProcessId, rq)'
`rq' is a rigid type variable bound by
the type signature for
serverStart :: (Serializable rq, Serializable rs) =>
Name -> Process (Server rq rs) -> Process (SendPort rq)
at src/Control/Distributed/Platform/GenServer.hs:96:1
Expected type: SendPort rq
Actual type: SendPort (ProcessId, rq)
In the first argument of `return', namely `sreq'
In a stmt of a 'do' block: return sreq
@hyperthunk I am committing my experiments in the above fork, btw. Still putting my fingers on the different areas of distributed-process.
@edsko Is there a way to register the SendPort by name instead? If the server is using channels, the pid is a bit useless.
@edsko Is there a way to register the SendPort by name instead? If the server is using channels, the pid is a bit useless.
No. You could add your own registry for sendports if you wish.
Is this something we will have to defer to the Server callbacks too to make sure it is fully typed?
No, that's not the issue; the logic in your code doesn't quite add up. On line 123 you ask for a message from the channel taht containts both the request and a process ID, but yet you claim in the type of the function that the channel only carries requests.
@rodlogic - I like these ideas and think we should start to collaboratively work on GenServer now. I have some feedback I'd like to make so once you've got this compiling, please send me a pull request so I can comment on individual lines of code in the diff. Once we're merged I'll start writing some HUnit tests for it and we can split up some more fine grained tasks to work on individually.
I quite like the setup you've got now, bar some stylistic questions, especially the use of temporary channels which is really nice. Most of the things I'd like to suggest are renaming, and a bit of splitting up 'where' into separate functions. Some things we can add quite quickly once we've merged this (apart from tests!) include:
Tagging internal messages
OTP's gen_server differentiates between messages sent via call/cast and 'info' messages by sending the former in a tagged tuple {'$gen', Msg} and we can do the same easily. That way the server 'loop' can use receiveWait to drain the message queue and match on these easily, with something like
type ReplyTo = ReceivePort
data CallRequest = CallReq Serialisable ReplyTo
data CastRequest = CastReq Serialisable
and instead of expect we can use
receiveWait [ match (\(CallReq m replyTo) -> handleCall server m replyTo)
, match (\(CastReq m) -> handleCast server m)
, match (\(Serializable m) -> handleInfo server m)
]
When the callbacks return a Timeout in the reply/noreply data, we can defer to receiveTimeout instead.
An explicit asynchronous call API
As well as providing a call implementation, we might consider beginCall and endCall as well, with beginCall returning the ReceivePort and endCall doing the blocking receive (with an optional timeout). That way clients can choose to defer getting the result until they want it, whilst call can be defined in terms of endCall . beginCall or some such with a bit of glue around it.
@hyperthunk Sounds good. Let me get it into a running state again and then I'll send a pull request. I have paid no attention to the code structure so far and was more focused on the actual call interaction with expect/channels/etc. I am/was expecting we'll iterate this and refactor the code as necessary.
Tagging internal messages
If we go the typed-channels route, which imo we should unless we have specific reason not to, we will have to use receiveChan instead of expect/receiveWait and to handle multiple types of messages we will have to "merge" the different ports with mergePort{RR,Biased}. Aside from the order in which the ports are selected, I am assuming that the underlying semantics is the same as receiveWait's with multiple matches. Is that so?
Now, what about info messages? They seem to be used for 'all the other kinds of messages', but what kind of messages are expected there in Erlang? The gen_server.erl has simple examples like:
handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
And from the documentation:
If the gen_server should be able to receive other messages than requests, the callback function handle_info(Info, State) must be implemented to handle them. Examples of other messages are exit messages, if the gen_server is linked to other processes (than the supervisor) and trapping exit signals.
handle_info({'EXIT', Pid, Reason}, State) ->
..code to handle exits here..
{noreply, State1}.
Are there other specific examples of how info messages are used in Erlang?
An explicit asynchronous call API
Definitely. In the Java world is common to see APIs like:
public interface Counter {
int count();
Future<Integer> countAsync();
}
And the Future interface, which I think is what you are more or less referring to with the difference that it hide the internals from the client process.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
I am sure Haskell has similar patterns. In it's simplest form, this Future value (the reply) will wrap the reply's receive port completely from the call. The only limitation here is that it won't be possible to send future values to other processes, but unless we have a clear and valuable use case it is not worth solving that.
Another point for us to consider and somewhat related to the async API for calls, is the handling of deferred reply's by the server process, which in Erlang I think happens when you return {noreply, State} from the handle_call callback. It would be nice to have a dual of Future that would represent the promise to send a typed reply back by any process and only once since there is a caller waiting for this reply. The server process's implementation could decide to send this promise to another process, which would then finally reply to the caller. For instance, instead of something like:
handleCall :: req -> Process (CallResult reply)
it would look like:
handleCall :: req -> Promise reply -> Process (CallResult)
where Promise would be a simple function/API such as:
serverReply :: Promise reply -> reply -> Process ()
The server implemenation would have the option of simply calling serverReply and returning CallOk or sending the Promise to another process and returning CallDeferred. The promise wrapper will guarantee that serverReply can be called only once.
Created a pull request: https://github.com/hyperthunk/distributed-process-platform/pull/5
@rodlogic good, I think we're on the same track here. Some thoughts...
If we go the typed-channels route, which imo we should unless we have specific reason not to, we will have to use receiveChan instead of expect/receiveWait and to handle multiple types of messages we will have to "merge" the different ports with mergePort{RR,Biased}. Aside from the order in which the ports are selected, I am assuming that the underlying semantics is the same as receiveWait's with multiple matches. Is that so?
I do like the typed channels route, but it kind of kills off the handle_info concept, which is for handling 'unexpected' messages or (in other words) messages that aren't a primary part of this particular server's protocol, but are possible such as system messages (for dynamic debugging) and monitor signals and so on.
Also, we can't monitor other processes very easily in this way, because monitor signals come via expect/receiveWait rather than on channels. Basically the typical use of handle_info in OTP is to deal with processes that are interested in other processes, though perhaps not supervising them. Consider this example from RabbitMQ:
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
State = #state{monitors = Monitors}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
{noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
This is a classic example of receiving a monitor notification and doing some hidden internal state change in response without replying.
Personally, I think we should actually have two kinds of gen-server: one using channels and one using bare messages. They can probably share a lot of infrastructure, and you'll pick the bare messages one only if you need to do things like monitor and/or link handling.
Killing Processes (also needed for supervision)
Unless I'm really missing something, there doesn't appear to be a corollary to erlang:exit(Pid, Term) in the API, which means that in order to kill a process you have to dream up some protocol to allow (for example) supervisors to instruct their children to terminate and brutally kill them if they don't respond in a timely fashion by exiting.
One way to do this is to pass some Exit reason structure to the process, another approach would be to spawn a 'live' process to which all the children link, and then kill it when you need to terminate them abruptly. Anyway, we need to be able to kill processes in various ways.
The point is partly that we should prioritize 'shutdown' messages. If the Channel based GenServer is going to support this, then need to check we understand the semantics of mergePort_ properly.
Futures
I like the things you've said about this: There is a similar set of concepts in the .NET world (BeginInvoke, EndInvoke).
Another point for us to consider and somewhat related to the async API for calls, is the handling of deferred reply's by the server process, which in Erlang I think happens when you return {noreply, State} from the handle_call callback. It would be nice to have a dual of Future that would represent the promise to send a typed reply back by any process and only once since there is a caller waiting for this reply. The server process's implementation could decide to send this promise to another process, which would then finally reply to the caller. [snip]
Yes the 'reply later' option is very useful. I do wonder whether this will work more cleanly for a Channels based server API wise though.
Another thing Promises/Futures enable is the ability to construct higher order channels, dealing with things like delegation and proxying, which is very very useful (and important!) in building fault tolerant systems. One of the key things about OTP's gen_server is that the implementation is no longer in control of their own mailbox. This can be vitally important. Consider this wrapping code, for example, which deals with the common situation where you need fine grained control over a process that might get into a blocking state very easily - in this case we're managing an external resource (like an OS process) that could take forever to respond, but we must be able to interact with the wrapper in a timely fashion if we need to shut it down: https://github.com/nebularis/systest/blob/resource2/src/systest_resource.erl
Insulating yourself and providing middle-man processes to make sure that you can avoid deadlocks is vital in a complex actor based system, and I suspect doing that purely with Channels might be hard, though I'm willing to be persuaded otherwise.
Created a pull request: hyperthunk/distributed-process-platform#5
@rodlogic thanks that's perfect - I'll try and give it proper attention over the next day or so and hopefully we can get it merged over the weekend.
Now that we have some sort of a starting point, I was pondering a bit about the direction we are taking here and wondering if the GenServer design could be even simpler by leveraging more of Haskell's strengths.
Barred some of the limitations of the current design, which we will fix over many iterations, can we simplify even more what someone building a server has to implement? Can we make this 'design survace area' even smaller? I think it should be dead simple to create a server and the type system should keep me honest even if that means a bit more complexity under the covers.
What is a server, really? A server is nothing more than a Process with an id and one or more channels of interaction (in a more abstract sense). These channels are either a call (request/response) or a cast (one-way) and a client may choose to use a call channel synchronously or asynchronously, and a server may choose to 'reply later', delegate the reply to another server/process, or just reply right there. I am ignoring info messages here.
So what could be better in the "Counter" example from the point of view of who is designing and implementing it? Having to implement CounterRequest and CounterResponse smells like cruft to me and somewhat error prone. It would be nice if I could design Counter with mostly functions and have the rest inferred automatically somehow. For instance, the count service in the Counter server could be just:
count :: () -> Int
This is already specifying that there is a Request () and a Response Int. Could this Request and Response message be automatically generated for us? Isn't this was Closure in CloudHaskell is proposing/implementing?
In addition, it would be great if the following could also be automatically derived from the above definition (and implementation):
countAsync :: () -> Future Int
And something like the following for the handler type:
countHandler :: () -> Promise Int -> Maybe Int
@rodlogic I like these ideas in general, though obviously we'll need to understand what the plumbing looks like before we can generate any of it. :)
As the input to any GenServer callback function must be an instance of Typeable then we can presumably use {g}cast to differentiate between the messages passed at runtime and possibly using funResultTy match the possibilities against the supplied callbacks, leaving the implementor to simply list the functions they want to expose as callbacks. With a bit of magic (a la Template Haskell or some such) then we can probably just magically grab everything the callback module is exporting.
And yes, the return type should tell us whether we're dealing with a future/promise or an immediate reply. Of course there are servers that never reply, dealing only with casts and casts in general would need to be handled a little differently, but some indication that we're not replying shouldn't be too hard to dream up.
Of course we always need to deal with timeouts as well, so even if we do avoid making the implementation return a data type indicating whether they're replying or not, we will need to somehow deal with timeouts, hibernation and so on. Ideally we should be able to do this with a type, though I'm struggling to see how we'd wrap the return type (whether it is Int, Future Int or Promise Int) without forcing the author to do something in their code.
I suppose, and I shudder to suggest this, that we can have timeout/hibernate throw an exception containing the required span, but that feels really clunky and I'm sure there's a better way.
One thing though:
What is a server, really? A server is nothing more than a Process with an id and one or more channels of interaction (in a more abstract sense). These channels are either a call (request/response) or a cast (one-way) and a client may choose to use a call channel synchronously or asynchronously, and a server may choose to 'reply later', delegate the reply to another server/process, or just reply right there. I am ignoring info messages here.
Yes, and I am holding to my point about info messages, which is that we probably need a non-channel based option for people who do care about info messages coming from monitors.
Guys, I'm getting a bit stuck with this. I do not want to specify the exact input type for my gen server, as this means that it can only deal with one kind of input message. Honestly, what use is that for writing a supervisor, that needs to handle the following instructions:
- Monitor notifications from the children
- add_child
- delete_child
- restart_child
- stop_children
So how can we write a generic process that accepts all these different types of input messages and handles them uniformly by evaluating a callback function? The callback can be written in terms of generic inputs, for example
handleCall :: (Serializable m) => m -> ProcessId -> Process ()
We need to handle these 5 different input types now, so how does a record with 1 input type help us at all? As I've said in distributed-process issue 71 this seems to completely defeat the purpose of having expect and receiveWait defined in such a way that we can receive anything because we can't use the messages without fully specifying the types we're dealing with.
Now I completely understand why this is the case - how on earth is the type system supposed to guess what we mean if we don't specify the types? I don't know if opening up AbstractMessage as I've suggested in that (previously mentioned) issue is the right thing to do or not. It feels to me like we're dealing with chalk and cheese here - the idea of gen server which deals with a totally open domain of input types just doesn't fit Haskell's type system, which is exactly what I felt to begin with.
Having a gen server that accepts just one type of input is fine, if all you want to do is centralize the error handling, timeouts and so on. If that's what we want to do, then the gen server infrastructure might be overkill.
How I arrived at this....
What I tried (over the weekend) to get this working was several things. First of all, I tried to stop limiting the input domain of the handleX functions to a specific type. As long as we don't mind not being able to use record accessors we can match out the handleX functions we need...
-- we can *try* using existential types for this....
data Server s = forall m. (Serializable m) => Server {
init :: .....
handleCall :: m -> state -> ProcessAction s
state :: s
}
handleRequest :: (Serializable m) => Server s -> m -> ProcessAction s
handleRequest Server{ handleCall = hc } = hc
But is Serializable specific enough to use in receiveWait and can it be applied to data taken out of expect using an m <- expect expression? I tried numerous approaches here, included attempting to encode the input messages in various forms of other types.
So @edsko my question is, taking the following, which compiles (therefore it must work!) ;) are we on the right track and is there some type trickery I'm missing to use the record instead of the type class, because that completely bombed out for me, and what's the neat way of dealing with the Maybe s that receiveTimeout returns in our handleRequest implementation?
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
module Control.Distributed.Platform.GenProcess where
import Prelude hiding (catch, init)
import Control.Distributed.Process
import Control.Distributed.Process.Serializable
import Control.Monad (forever)
import Control.Concurrent
import Data.Typeable (Typeable)
import Data.Binary
import Data.DeriveTH
type Name = String
data TimeUnit = Hours | Minutes | Seconds | Millis
data Timeout = Timeout TimeUnit Int | Infinity
data ProcessAction =
ProcessContinue
| ProcessTimeout Timeout
| ProcessStop String
data GenMessage m where
Message :: (Typeable m) => GenMessage (m, ReplyTo)
deriving (Typeable)
data ReplyTo = ReplyTo ProcessId | None
deriving (Typeable, Show)
data Gen = Call | Cast
deriving (Typeable, Show)
$(derive makeBinary ''ReplyTo)
$(derive makeBinary ''Gen)
class (Typeable s) => GenProcess s where
init :: Process () -> a -> s
state :: Process (s) -> s
handleCall :: (Serializable m) => s -> m -> ReplyTo -> (ProcessAction, s)
serverContinue :: (GenProcess s) => s -> Timeout -> Process (s)
serverContinue s t = do
case t of
(Infinity) -> receiveWait handlers
-- (Timeout u v) -> receiveTimeout (timeToMs u v) handlers
where handlers = [ (match (\(Call, m, r) -> handleRequest s m r)) ]
handleRequest :: (GenProcess s, Serializable m) =>
s -> m -> ReplyTo -> Process (s)
handleRequest s m r = do
let (action, s2) = handleCall s m r
case action of
ProcessContinue -> serverContinue s2 Infinity
(ProcessTimeout t) -> serverContinue s2 t
timeToMs :: TimeUnit -> Int -> Int
timeToMs Millis ms = ms
timeToMs Seconds sec = sec * 1000
timeToMs Minutes min = (min * 60) * 1000
reply :: (Serializable m) => ReplyTo -> m -> Process ()
reply (ReplyTo pid) m = send pid m
reply _ _ = return ()
replyVia :: (Serializable m) => SendPort m -> m -> Process ()
replyVia p m = sendChan p m
So @edsko my question is, taking the following, which compiles (therefore it must work!)
Urgh - I take that back, I just hadn't reconfigured recently enough.
src/Control/Distributed/Platform/GenProcess.hs:53:47:
Ambiguous type variable `m0' in the constraints:
(Typeable m0)
arising from a use of `handleRequest'
at src/Control/Distributed/Platform/GenProcess.hs:53:47-59
(Binary m0)
arising from a use of `handleRequest'
at src/Control/Distributed/Platform/GenProcess.hs:53:47-59
Probable fix: add a type signature that fixes these type variable(s)
In the expression: handleRequest s m r
In the first argument of `match', namely
`(\ (Call, m, r) -> handleRequest s m r)'
In the expression: (match (\ (Call, m, r) -> handleRequest s m r))
So how do I handle messages in the general case? I can be more specific by using an existential type, but it's still too ambiguous:
type GenMessage = forall m. (Typeable m, Serializable m) => m
-- snip
serverContinue :: (GenProcess s) => s -> Timeout -> Process (s)
serverContinue s t = do
case t of
(Infinity) -> receiveWait handlers
-- (Timeout u v) -> receiveTimeout (timeToMs u v) handlers
where handlers = [ (match (\(Call, m, r) -> handleRequest s m r)) ]
handleRequest :: (GenProcess s) =>
s -> GenMessage -> ReplyTo -> Process (s)
handleRequest s m r = do
let (action, s2) = handleCall s m r
case action of
ProcessContinue -> serverContinue s2 Infinity
(ProcessTimeout t) -> serverContinue s2 t
will still yield the same error. So do we actually need an API on AbstractMessage in order to do this then? Or is there some Typeable magic I can do here?
Interestingly, I wonder if instead of trying to define handleCall just once for the gen server, we should take the gen server state's type and a list of possible handlers, each being a pure function mapping from some domain of inputs to a ProcessAction state instead. That would alleviate the problem of knowing a single handleCall input type and we should be able, in theory, to pass these to receiveWait verbatim and get back the ProcessAction s for further inspection.
@hyperthunk I have been hitting similar issues trying to improve GenServer in different ways:
- Generic dispatching of messages
- N message types per server
- Non-blocking client API (Future/etc).
So how can we write a generic process that accepts all these different types of input messages and handles them uniformly by evaluating a callback function?
The simplest answer is to force servers to use a sum type: e.g.:
data SupervisionRequest
= AddChild
| DeleteChild
| RestartChild
| StopChildren
However, this will get cumbersome very quickly and is not really a solution considering the different types of messages a single server may need to handle. Besides this approach would mean that there would be no way to reuse protocols across servers without writing wrapper types. I think we are on the same page here: GenServer has to support dispatching any number of different message types.
Now I completely understand why this is the case - how on earth is the type system supposed to guess what we mean if we don't specify the types? I don't know if opening up AbstractMessage as I've suggested in that (previously mentioned) issue is the right thing to do or not. It feels to me like we're dealing with chalk and cheese here - the idea of gen server which deals with a totally open domain of input types just doesn't fit Haskell's type system, which is exactly what I felt to begin with.
I am also struggling with getting the dynamic dispatching right. However, I don't think this has anything to do with Haskell type system, but with our knowledge (or lack) of it. I can vaguely conceive a solution for this but I just can't get it to work properly. We need to construct a list of handlers that encapsulate the message type using an existential and then have a function associated with this handler that returns a Match type, which is what we need for receiveWait/receiveTimeout (note that the match primitive is also using an existential and hiding the original type from receiveWait/Timeout). The key thing is that who needs to return this Match type is each handler in the list since only the individual handler knows it's type hidden by the existential. That is the gist of what I a pursuing right now.
But is Serializable specific enough to use in receiveWait and can it be applied to data taken out of expect using an m <- expect expression? I tried numerous approaches here, included attempting to encode the input messages in various forms of other types.
Serializable is also Binary and Typeable. Afaik, CloudHaskell's marshalling is generating a fingerprint of the message type using Typeable and this fingerprint is then compared when unmarshalling the message on the other end. So, as long as CloudHaskell can compute a fingerprint of the receiving type with the fingerprint that was unmarshalled, the receive/expect should work fine.
I am close to a stable set of changes after tinkering with this for a few days. I will share what I have as soon as I clean it up a bit.
Well....
@hyperthunk I have been hitting similar issues trying to improve GenServer in different ways:
Maybe I'm wrong but I suspect this will require either some clever redesign on our part or opening up of AbstractMessage in CH.
Non-blocking client API (Future/etc).
That doesn't seem so hard if we can solve the need for handling types we know nothing about.
The simplest answer is to force servers to use a sum type: e.g.:
In fact, I don't see how that solves the problem at all. You still have to write the code to either expect that specific type (which means its not a general purpose server) or pass in the input type(s) to the record or type class so that the code which uses them can be type checked properly.
I am also struggling with getting the dynamic dispatching right. However, I don't think this has anything to do with Haskell type system, but with our knowledge (or lack) of it. I can vaguely conceive a solution for this but I just can't get it to work properly. We need to construct a list of handlers that encapsulate the message type using an existential and then have a function associated with this handler that returns a Match type, which is what we need for receiveWait/receiveTimeout (note that the match primitive is also using an existential and hiding the original type from receiveWait/Timeout). The key thing is that who needs to return this Match type is each handler in the list since only the individual handler knows it's type hidden by the existential. That is the gist of what I a pursuing right now.
Oh no, there's nothing wrong with Haskell at all. I'm very familiar with the ocaml type system though less with Haskell but this constraint seems perfectly sensible to me. We just need to figure out what type to use to open up the API awe want. The list of marchers is close to what what I had in mind too, but you still can't write a function that operates on the existential. What this means is that each handler has to return the ProcessAction (state) and we write the handleRequest function (which is called after we've matched something) to decide on the looping and timeout.
... is also Binary and Typeable. Afaik, CloudHaskell's marshalling is generating a fingerprint of the message type using Typeable and this fingerprint is then compared when unmarshalling the message on the other end. So, as long as CloudHaskell can compute a fingerprint of the receiving type with the fingerprint that was unmarshalled, the receive/expect should work fine.
Great, but remember that the decision about being able to use a type in expect/receiveX is based on the clauses the handlers provide. We shouldn't need to care about that.
I am close to a stable set of changes after tinkering with this for a few days. I will share what I have as soon as I clean it up a bit.
Great. I'm going to experiment with the same idea (list of handlers that produces [Match (ProcessAction)] in a separate module so as to minimise merge conflicts.
@hyperthunk Just as a quick side note re: "..., but with our knowledge (or lack) of it". Change that 'our' to a 'my' as I don't have a lot of practical experience with Haskell (nor ML) and was speaking with myself in mind.
2nd GenServer iteration
I have committed another iteration of the GenServer based on the experiments from the past few days. At least back into a stable state after much tinkering so I can take a bit more space to consider the code you sent above.
This commit (the past 2 actually) reverts to using process messages as opposed to channel messages. I am assuming the implementation could be changed to support typed-channels by merging the channels, but considering that it may not be possible to receive process and channels messages at the same time I am keeping it simple for right now.
It also abstract the two basic protocols of gen_server, i.e. calls and casts, and assumes that specific variations will be based on either one of those (e.g. a AbstractMessage support would be implemented using the cast protocol, iow just simple, one-way messaging; the same for info messags, i.e. they come in and conform to the cast protocol.
See their types:
type CallHandler a b = a -> Process (CallResult b)
type CastHandler a = a -> Process CastResult
data CallResult a
= CallOk a
| CallForward ServerId
| CallStop a String
deriving (Show, Typeable)
data CastResult
= CastOk
| CastForward ServerId
| CastStop String
I think this is in line with your code snippets above. I.e. by separating the cast and call result types we can enforce a certain invariants: No way to send a reply in a CastHandler (I am assuming we can wrap these handlers in a monad to better control what can go on inside them), only process the message, stop the server or forward it. Or, in the case of the CallHandler, you are forced to either generate a CallOk with the reply, of forward the request to another server with a CallForward, or stop the server with a CallStop.
handleReset ResetCount = do
return $ CastForward mySlaveServerId
The code is working for the simple Counter example and shows how the GenServer can be used with a synchronous CallHandler, an async CastHandler, and two separate data types: CounterRequest and ResetCount. For example:
-- Handler for the CounterRequest call
handleCounter IncrementCounter = return $ CallOk (CounterIncremented)
handleCounter GetCount = return $ CallOk (Count 0)
-- Handler for the ResetCount cast
handleReset ResetCount = return $ CastOk
Support for AbstractMessage?
I also added an experimental handleAny to deal with untyped messages based on CloudHaskell's AbstractMessage. Not sure if it works, but shows another flexibility of the design (or so it seems).
Threading state through the handlers
There is more refinements needed but the next todo there seems to figure out how to thread the server state through the handlers/callbacks. The approach I am pursuing now is to define a Server monad that is really a StateT monad wrapping the Process monad. This is immediately useful for managing the server state, but could also possibly be used to create additional DSLs on top of it (not sure).
type Server s = StateT s Process
type InitHandler s = Server s InitResult
type TerminateHandler s = TerminateReason -> Server s ()
type CallHandler s a b = a -> Server s (CallResult b)
type CastHandler s a = a -> Server s CastResult
And a sample CastHandler:
handleReset ResetCount = do
state <- get
-- do something with the state
put state
return $ CastOk
I am stuck now trying to figure out where to store this state s so that the MessageDispatcher can access it. The first iteration of GenServer was using a closure to do that but now I have N MessageDispatcher that share the same state and no closure around them.
@hyperthunk Just as a quick side note re: "..., but with our knowledge (or lack) of it". Change that 'our' to a 'my' as I don't have a lot of practical experience with Haskell (nor ML) and was speaking with myself in mind.
I didn't mean to be touchy though - I've been struggling with some of the finer details here too. :)
So..... looking at your code, I think you're generally going in the right direction. Feel free to send a pull request when you're ready - I'm doing some experiments in parallel in a different module, so we shouldn't have any clashes.
@hyperthunk Ok, the pull request is in. I will not have time for this until the weekend.
@hyperthunk Ok, the pull request is in. I will not have time for this until the weekend.
That's absolutely fine, I'm doing this in my spare time too and your input is most welcome and appreciated! :)
Sorry guys, the Parallel Haskell project has come to an end and so I will only be able to look at this in my spare time, of which I have very little. You already seem to have made progress with this, but let me just explain something, perhaps it will help, because the 'expect' matching (or 'receiveTimeout' and co) is indeed a little confusing. You should think of
expect :: Serializable a => Process a
which is really
expect :: (Typeable a, Binary a) => Process a
as having an (implicit) argument
expect :: Binary a => Fingerprint -> Process a
That Fingerprint comes from the Typeable class and it is what expect uses to find the right message in the process mailbox. Without that Fingerprint it will not be able to locate the message, and this materializes as a Haskell type error. So in the GenServer that suffered from the ambiguous type error it needed to decide on a type to instantiate handleCall and find the appropriate Typeable instance to go with it, so that it could extract the right message from the mailbox.
Introducing an existential type along the lines of
data Foo :: * where
mkFoo :: forall e. Serializable e => ... -> Foo
and then matching on mkFoo introduces a type variable, but more importantly, it brings a Typeable instance into scope and hence a type Fingerprint. That mkFoo is really:
data Foo :: * where
mkFoo :: forall e. Serializable e => `Fingerprint` -> ... -> Foo
So Cloud Haskell's AbstractMessage type is a layer of abstraction around an internal Message type which carries that Fingerprint with it. At the moment the AbstractMessage interface is incredibly minimal (all you can do is forward them) but we could add support for
wrap :: Serializable a => a -> AbstractMessage
matchAgainst :: [Match a] -> AbstractMessage -> Process (Maybe b)
and moreover make AbstractMessage itself an instance of Serializable (this is described in https://github.com/haskell-distributed/distributed-process/issues/30). I don't have time for this at the moment but pull requests are welcome :)
Note by the way that making AbstractMessage an instance of Serializable is a bit confusing: sending an AbstractMessage would require the remote end to expect an AbstractMessage, but forwarding an AbstractMessage which is really an Int, say, would require the remote process to match to expect an Int.
@edsko totally understood - we're all doing this on the side. I'll liaise with @rodlogic and we might potentially contribute some stuff to distributed-process a bit later on, though for the time being I think we'll try to build up some of the basic infrastructure with what's available now.
There is more refinements needed but the next todo there seems to figure out how to thread the server state through > the handlers/callbacks. The approach I am pursuing now is to define a Server monad that is really a StateT monad wrapping the Process monad. This is immediately useful for managing the server state, but could also possibly be used > to create additional DSLs on top of it (not sure).
I had a play with this too and found it pretty awkward. The dispatchers need to take StateT ... {args...} but the state constructor needs a parameter (say s for example) and that either complicates the existential or introduces a parameter to Dispatcher which makes all the consuming code rather more complicated.
I am stuck now trying to figure out where to store this state s so that the MessageDispatcher can access it. The first iteration of GenServer was using a closure to do that but now I have N MessageDispatcher that share the same state and no closure around them.
Yes exactly. One approach I tried was along the lines of
data Dispatcher s =
forall a . (Serializable a) =>
Dispatch { handler :: s -> Message a -> Process () }
| forall a . (Serializable a) =>
DispatchIf { handler :: s -> Message a -> Process (),
predicate :: s -> Message a -> Bool }
| DispatchAny { abstractMessagehandler :: s -> AbstractMessage -> Process () }
data GenProcess s = GenProcess {
procInit :: InitHandler s, -- ^ initialization handler
procDispatchers :: [Dispatcher s], -- ^ request dispatchers
procTerminate :: TerminateHandler -- ^ termination handler
}
But as soon as you want to initialize the state your InitHandler s constructors come into play. So you want to go through a chain somewhat like GenProcess s -> Process (InitResult s) -> ProcessState s or some such, but that hardly feels very clean. Perhaps your idea of letting the handlers (init, call/cast, terminate) deal with the state themselves is better, but from previous excursions using the state monad I had expected to deal with this in the outer (control) functions and pass it into the handlers so they look more like Dispatch { handler :: StateT s -> Message a -> ... } and so on.
@edsko thanks for the additional info and details.
@hyperthunk there is another pull request with another iterative improvement to GenServer: now threading server state through the handlers (init, handle, terminate). The code could probably be much better, but at least it compiles and runs and it keeps us moving forward.
@edsko @hyperthunk Committed a few additional changes to support the handling of CallStop and CastStop (this is when the server can instruct a termination after handling the call/cast).
I am a bit unsure about the following:
- Call timeout's - how should the client API handle that? At this point, the GenServer call API is throwing an error, which is obviously wrong:
callServer :: (Serializable rq, Serializable rs) => ServerId -> Timeout -> rq -> Process rs
One option would be to change the result type to Process (Maybe rs) and another one would be to throw an exception instead. How is this usually handled in Erlang?
- Server termination The server can terminate for a variety of reasons (CallStop, CastStop, termination by supervisor, etc), but how should this be notified back to the client? In the current Counter.hs example, the increment counter handler is stopping the server when the counter is great than 10, but, after that, the client is hanging, waiting for a response after sending the request. One simple answer would be to use a timeout when waiting for a response, but this doesnt seem like an acceptable answer. If the server is dead or doesnt exist, we should fail-fast and throw an exception, at least. How should we handle this case? Should CloudHaskell throw an exception when sending messages to a terminated or non-existent ProcessId?
- Async client API Blocking while waiting for a call response may work for simple client/server interactions, but not for more complex protocols where a server may also act as a client and possibly have multiple pending calls. Ideally, call responses should come back as part of the server receive loop. Here is a straw man client API:
-- | Sync call (in terms of callAsyn + waitReply)
call :: ServerId -> a -> Timeout -> Process (Either SomeException a)
-- | Asynchronous call to server
callAsync :: ServerId -> a -> Process (Future b)
-- | Wait for a reply blocking if necessary
waitReply :: Future a -> Timeout -> Process (Either SomeException a)
-- | Poll a future to see if there is a reply without blocking
pollReply :: Future a -> Process (Maybe (Either SomeException a))
-- | Cancel a future
cancelReply :: Future a -> Process ()
@hyperthunk
I had a play with this too and found it pretty awkward. The dispatchers need to take StateT ... {args...} but the state constructor needs a parameter (say s for example) and that either complicates the existential or introduces a parameter to Dispatcher which makes all the consuming code rather more complicated.
data Dispatcher s =
forall a . (Serializable a) =>
Dispatch { handler :: s -> Message a -> Process () }
| forall a . (Serializable a) =>
DispatchIf { handler :: s -> Message a -> Process (),
predicate :: s -> Message a -> Bool }
| DispatchAny { abstractMessagehandler :: s -> AbstractMessage -> Process () }
data GenProcess s = GenProcess {
procInit :: InitHandler s, -- ^ initialization handler
procDispatchers :: [Dispatcher s], -- ^ request dispatchers
procTerminate :: TerminateHandler -- ^ termination handler
}
Yes, threading the state explicitly adds noise dispatcher/handler functions and another reason to got for a StateT Monad, imo, to thread that state in and out of the handlers.
But as soon as you want to initialize the state your InitHandler s constructors come into play. So you want to go through a chain somewhat like GenProcess s -> Process (InitResult s) -> ProcessState s or some such, but that hardly feels very clean. Perhaps your idea of letting the handlers (init, call/cast, terminate) deal with the state themselves is better, but from previous excursions using the state monad I had expected to deal with this in the outer (control) functions and pass it into the handlers so they look more like Dispatch { handler :: StateT s -> Message a -> ... } and so on.
What do you think of the following as the contract between GenServer/Process and the user-defined handlers?
type InitHandler s = Server s InitResult
type TerminateHandler s = TerminateReason -> Server s ()
type CallHandler s a b = a -> Server s (CallResult b)
type CastHandler s a = a -> Server s CastResult
The Server monad is just a StateT type alias that wraps the Process monad. The only part that deserves a bit more attention is where we have to call receiveWait/Timeout since that is in the Process monad and not in the Server monad. Apart from that it is quite clean and the best I can come with so far.
For example, implementing the handler that returns the current count in the counter server would look like:
handleCounter GetCount = do
count <- getState
return $ CallOk (Count count)
Then the startCounter API would hook the handlers up with the following:
startCounter :: Int -> Process ServerId
startCounter count = startServer count defaultServer {
msgHandlers = [
handleCall handleCounter,
handleCast handleReset
]}
And the handleCall and handleCast would wrap the handler in a Dispatcher data type and set up the whole server process. There may be opportunities to simplify this a bit further with some helper functions or smart constructors, but that would be something minor at this point.
I also added an additional example very similar to the Counter server just as a 2nd exercise.
Next steps?
Now, where should we go from here? Unless you have very different ideas on how the GenProcess/Server should look like, it seems that we should give a 1st shot at the Supervisor module using this GenServer and fill any gaps we may find along the way. Any thoughts?
@rodlogic - first of all, thank you so much for your ongoing contributions; you've really pushed this forward and I think you've pretty much cracked the gen server API for the most part!
For my own part, I've been trying to reduce the amount of boilerplate required by the implementation code, and the reason I've not committed any of those changes is so as to avoid merge conflicts with your work (which is generally more complete and compiling properly than mine!) :)
What do you think of the following as the contract between GenServer/Process and the user-defined handlers?
I think the API is just fine. One of the things I'm attempting to do is to reduce the amount of indirection between the server process and the handlers API, by returning an 'Action' (which is basically your {Call,Cast}Result type) directly from the handlers and dealing with the instruction set in processReceive rather than in the handle{Call,Cast} wrappers. This yields type signatures more like these:
init :: Behaviour s -> Server s InitResult
loop :: Behaviour s -> Timeout -> Server s TerminateReason
processReceive :: [Dispatcher s] -> Timeout -> Server s (Either ProcessAction TerminateReason)
I'm also interested in whether the call/cast handlers can be made into pure functions and wrapped (by the API entry points) in the state monad, so that we get something more like
data CastResult s =
CastOk s
| CastForward s ServerId
| CastStop s String
type CastHandler s a = a -> CastResult s
Keeping the Server monad hidden from the implementor feels simpler to me, though it makes our code more complicated (and I've been having fun trying to implement that).
Anyway, these are minor (and mainly cosmetic, from the API perspective) things and in general I think we will move forward with the gen server implementation you've come up with and if I manage to do these simplifications - and if they do turn out to be simpler, rather than more complicated - then we'll do some refactoring then.
Now, where should we go from here? Unless you have very different ideas on how the GenProcess/Server should > look like, it seems that we should give a 1st shot at the Supervisor module using this GenServer and fill any gaps > we may find along the way. Any thoughts?
Yes, I agree we should move on to looking at supervisor now. As I said above, I'll bring those simplifications to gen server in on a branch and discuss them with you there before merging anything.
I think that before we can actually implement supervisor properly however, that we need to solve distributed-process issue 69, as without a way to kill processes we're going to struggle to implement supervision properly. I noticed that you've created a Management message to handle requesting termination, but I really don't think that we can sensibly force people to implement everything as a gen server just so that it can be supervised.
So what I propose is that we take a look at the Cloud Haskell issue and see if we can solve it and contribute a pull request. This seems to involve having the node controller listen for a new kind of message (Kill ProcessId DiedReason) and throwing and exception (using throwTo afaict) to the local process. I haven't started to dig into this yet, but apparently the cloud haskell paper describes this so it seems pretty fundamental.
@hyperthunk CH issue #69 is behind us and a good base for us to continue. It was a great move, btw, to raise this as a CH issue.
Regarding the API, I am running out of ideas now on how to improve it and imo we are close enough. I also think that the implementation needs a few iterations to remove more cruft and simplify, but maybe not a good idea to get stuck there now. So giving the Supervisor a first shot seems a great next step.
I have also incorporated some additional folders/files/content to deal with tests, code coverage and benchmarking. There is nothing there at this point, but a small step forward. Please take a look when you have a chance and I can send a pull request if it makes sense.
@rodlogic yes I think it was the right place to deal with that issue.
So regarding the API, I've just pushed a new branch which is worth a look. What I've done there is to leave your gen server alone, but add a corollary implementation called GenProcess.
This (gen process) is conceptually very similar to gen server, although it's somewhat less finished. The general idea behind it is that we keep the process loop, error handling (where we decide to catch whatever) and reply functionality in that API, which can be used to implement other kinds of generic process (like gen-FSM) as well as providing hooks that non-gen_server processes can call to enter a gen server loop and become managed. My other motivation here is that I'd like to provide means for server authors to write pure functions and have them automatically lifted into handlers/callbacks using template haskell or something of that ilk.
On this branch I was planning to change the GenServer module to add a bit of boilerplate that provides the Call/Cast abstraction (and the call/cast APIs for clients to use) over the top of GenProcess.
So giving the Supervisor a first shot seems a great next step.
Yes definitely. The catch/exit-handling needs to be put into the gen server first. I'm quite happy if you want to do that against master for now, as the changes in the gen-process branch aren't complete and I don't want to hold us up.
I have also incorporated some additional folders/files/content to deal with tests, code coverage and benchmarking. There is nothing there at this point, but a small step forward. Please take a look when you have a chance and I can send a pull request if it makes sense.
That sounds very sensible. Please go ahead and send a pull request against master.