proximo
proximo copied to clipboard
Consider callback based server/client implementation
Callbacks are strictly more powerful and don’t require unnecessary goroutines. Also is easy to write correct code
Server implementation(untested)
type ServerV2 struct {
Producer Producer
Consumer Consumer
}
type MessageSender func(*Message) error
type Consumer interface {
Init(topic, consumer string, sendMessage MessageSender) error
ReceiveConfirmation(*Confirmation) error
Close() error
}
type ConfirmationSender func(*Confirmation) error
type Producer interface {
Init(topic string, publisher string, sendConfirmation ConfirmationSender) error
ReceiveMessage(*Message) error
Close() error
}
func (s *ServerV2) Consume(stream MessageSource_ConsumeServer) error {
_, cancel := context.WithCancel(stream.Context())
defer cancel()
c := &consumer{Consumer: s.Consumer}
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
if strings.HasSuffix(err.Error(), "context canceled") {
return nil
}
return err
}
//todo handle returned errors
go c.handleRequest(msg, stream)
}
}
type consumer struct {
started *atomicBool
Consumer Consumer
}
func (c *consumer) handleRequest(msg *ConsumerRequest, stream MessageSource_ConsumeServer) error {
switch {
case msg.GetStartRequest() != nil:
if c.started.Get() {
return ErrStartedTwice
}
sr := msg.GetStartRequest()
sendMessage:= func(m *Message) error {
err := stream.Send(m)
if err != nil {
return err
}
return nil
}
if err := c.Consumer.Init(sr.GetTopic(), sr.GetConsumer(), sendMessage); err != nil {
return err
}
c.started.Set(true)
case msg.GetConfirmation() != nil:
if !c.started.Get() {
return ErrInvalidConfirm
}
return c.Consumer.ReceiveConfirmation(msg.GetConfirmation())
default:
return ErrInvalidRequest
}
panic("impossible")
}
type publisher struct {
started *atomicBool
Producer Producer
}
func (p *publisher) handleRequest(msg *PublisherRequest, stream MessageSink_PublishServer) error {
switch {
case msg.GetStartRequest() != nil:
if p.started.Get() {
return ErrStartedTwice
}
sr := msg.GetStartRequest()
sendConfirmation := func(c *Confirmation) error {
return stream.Send(c)
}
p.Producer.Init(sr.Topic, "", sendConfirmation)
p.started.Set(true)
case msg.GetMsg() != nil:
if !p.started.Get() {
return ErrNotConnected
}
return p.Producer.ReceiveMessage(msg.GetMsg())
}
return nil
}
func (s *ServerV2) Publish(stream MessageSink_PublishServer) error {
_, cancel := context.WithCancel(stream.Context())
defer cancel()
p := &publisher{Producer: s.Producer}
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return err
}
if strings.HasSuffix(err.Error(), "context canceled") {
return err
}
return err
}
//todo handle returned errors
go p.handleRequest(msg, stream)
}
}
type atomicBool struct{ flag int32 }
func (b *atomicBool) Set(value bool) {
var i int32 = 0
if value {
i = 1
}
atomic.StoreInt32(&(b.flag), int32(i))
}
func (b *atomicBool) Get() bool {
if atomic.LoadInt32(&(b.flag)) != 0 {
return true
}
return false
}
and the backend(untested):
var _ = proximo.Consumer(&NatsHandlerConsumer{})
var _ = proximo.Producer(&NatsHandlerProducer{})
type NatsHandlerConsumer struct {
Url string
sub *nats.Subscription
}
func (n *NatsHandlerConsumer) Init(topic, consumer string, sendMessage proximo.MessageSender) error {
conn, err := nats.Connect(n.Url)
if err != nil {
return err
}
//defer conn.Close()
ch := make(chan *nats.Msg, 64) //TODO: make 64 configurable at startup time
sub, err := conn.ChanSubscribe(topic, ch)
if err != nil {
return err
}
n.sub = sub
for {
select { // drop
case m := <-ch:
sendMessage(&proximo.Message{
Data: m.Data,
Id: proximo.GenerateID(),
})
}
}
return nil
}
func (n *NatsHandlerConsumer) ReceiveConfirmation(c *proximo.Confirmation) error { return nil }
func (n *NatsHandlerConsumer) Close() error { return n.sub.Unsubscribe() }
type NatsHandlerProducer struct {
Url string
topic string
sendConfirmation proximo.ConfirmationSender
conn *nats.Conn
}
func (n *NatsHandlerProducer) Init(topic string, publisher string, sendConfirmation proximo.ConfirmationSender) error {
var err error
n.topic = topic
n.conn, err = nats.Connect(n.Url)
if err != nil {
return err
}
return err
}
func (n *NatsHandlerProducer) ReceiveMessage(msg *proximo.Message) error {
err := n.conn.Publish(n.topic, msg.GetData())
n.sendConfirmation(&proximo.Confirmation{msg.GetId()})
return err
}
func (n *NatsHandlerProducer) Close() error {
n.conn.Close()
return nil
}
Can you describe the problem that you are trying to solve please?
Or perhaps you have measured something as being slow due to excessive goroutines, in which case do you have benchmarks?
Sry if i was not clear. This is just a alternative way to solve the same problem. My main argument here (which is arguable) is an easy way to implement new backends as we can see in the example backend. Performance is not my motivation here. Just a collateral effect.