proximo icon indicating copy to clipboard operation
proximo copied to clipboard

Consider callback based server/client implementation

Open thesyncim opened this issue 7 years ago • 2 comments

Callbacks are strictly more powerful and don’t require unnecessary goroutines. Also is easy to write correct code

Server implementation(untested)


type ServerV2 struct {
	Producer Producer

	Consumer Consumer
}
type MessageSender func(*Message) error

type Consumer interface {
	Init(topic, consumer string, sendMessage MessageSender) error
	ReceiveConfirmation(*Confirmation) error
	Close() error
}

type ConfirmationSender func(*Confirmation) error

type Producer interface {
	Init(topic string, publisher string, sendConfirmation ConfirmationSender) error
	ReceiveMessage(*Message) error
	Close() error
}

func (s *ServerV2) Consume(stream MessageSource_ConsumeServer) error {

	_, cancel := context.WithCancel(stream.Context())
	defer cancel()

	c := &consumer{Consumer: s.Consumer}
	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				return nil
			}
			if strings.HasSuffix(err.Error(), "context canceled") {
				return nil
			}

			return err
		}
		//todo handle returned errors
		go c.handleRequest(msg, stream)
	}
}

type consumer struct {
	started  *atomicBool
	Consumer Consumer
}

func (c *consumer) handleRequest(msg *ConsumerRequest, stream MessageSource_ConsumeServer) error {
	switch {
	case msg.GetStartRequest() != nil:

		if c.started.Get() {
			return ErrStartedTwice
		}
		sr := msg.GetStartRequest()

		sendMessage:= func(m *Message) error {
			err := stream.Send(m)
			if err != nil {
				return err
			}
			return nil
		}
		if err := c.Consumer.Init(sr.GetTopic(), sr.GetConsumer(), sendMessage); err != nil {
			return err
		}
		c.started.Set(true)

	case msg.GetConfirmation() != nil:

		if !c.started.Get() {
			return ErrInvalidConfirm
		}

		return c.Consumer.ReceiveConfirmation(msg.GetConfirmation())
	default:
		return ErrInvalidRequest
	}
	panic("impossible")
}

type publisher struct {
	started  *atomicBool
	Producer Producer
}

func (p *publisher) handleRequest(msg *PublisherRequest, stream MessageSink_PublishServer) error {
	switch {
	case msg.GetStartRequest() != nil:
		if p.started.Get() {
			return ErrStartedTwice
		}

		sr := msg.GetStartRequest()
		sendConfirmation := func(c *Confirmation) error {
			return stream.Send(c)
		}
		p.Producer.Init(sr.Topic, "", sendConfirmation)
		p.started.Set(true)
	case msg.GetMsg() != nil:
		if !p.started.Get() {
			return ErrNotConnected
		}
		return p.Producer.ReceiveMessage(msg.GetMsg())

	}
	return nil
}

func (s *ServerV2) Publish(stream MessageSink_PublishServer) error {

	_, cancel := context.WithCancel(stream.Context())
	defer cancel()
	p := &publisher{Producer: s.Producer}
	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				return err
			}
			if strings.HasSuffix(err.Error(), "context canceled") {
				return err
			}
			return err
		}
		//todo handle returned errors
		go p.handleRequest(msg, stream)
	}
}


type atomicBool struct{ flag int32 }

func (b *atomicBool) Set(value bool) {
	var i int32 = 0
	if value {
		i = 1
	}
	atomic.StoreInt32(&(b.flag), int32(i))
}

func (b *atomicBool) Get() bool {
	if atomic.LoadInt32(&(b.flag)) != 0 {
		return true
	}
	return false
}


and the backend(untested):


var _ = proximo.Consumer(&NatsHandlerConsumer{})
var _ = proximo.Producer(&NatsHandlerProducer{})

type NatsHandlerConsumer struct {
	Url string
	sub *nats.Subscription
}

func (n *NatsHandlerConsumer) Init(topic, consumer string, sendMessage proximo.MessageSender) error {
	conn, err := nats.Connect(n.Url)
	if err != nil {
		return err
	}
	//defer conn.Close()

	ch := make(chan *nats.Msg, 64) //TODO: make 64 configurable at startup time
	sub, err := conn.ChanSubscribe(topic, ch)
	if err != nil {
		return err
	}
	n.sub = sub

	for {
		select { // drop
		case m := <-ch:
			sendMessage(&proximo.Message{
				Data: m.Data,
				Id:   proximo.GenerateID(),
			})
		}
	}

	return nil
}

func (n *NatsHandlerConsumer) ReceiveConfirmation(c *proximo.Confirmation) error { return nil }
func (n *NatsHandlerConsumer) Close() error                                      { return n.sub.Unsubscribe() }

type NatsHandlerProducer struct {
	Url              string
	topic            string
	sendConfirmation proximo.ConfirmationSender
	conn             *nats.Conn
}

func (n *NatsHandlerProducer) Init(topic string, publisher string, sendConfirmation proximo.ConfirmationSender) error {
	var err error
	n.topic = topic
	n.conn, err = nats.Connect(n.Url)
	if err != nil {
		return err
	}

	return err
}

func (n *NatsHandlerProducer) ReceiveMessage(msg *proximo.Message) error {
	err := n.conn.Publish(n.topic, msg.GetData())
	n.sendConfirmation(&proximo.Confirmation{msg.GetId()})
	return err
}

func (n *NatsHandlerProducer) Close() error {
	n.conn.Close()
	return nil
}

thesyncim avatar Mar 20 '18 12:03 thesyncim

Can you describe the problem that you are trying to solve please?

Or perhaps you have measured something as being slow due to excessive goroutines, in which case do you have benchmarks?

mjgarton avatar Mar 20 '18 17:03 mjgarton

Sry if i was not clear. This is just a alternative way to solve the same problem. My main argument here (which is arguable) is an easy way to implement new backends as we can see in the example backend. Performance is not my motivation here. Just a collateral effect.

thesyncim avatar Mar 20 '18 18:03 thesyncim