majordomo
majordomo copied to clipboard
"Fast requests" accumulate in broker queue and are never dispatched
if clients send requests faster than workers can handle them, some of the requests are never dispatched to the worker, because s_dispatch() is only called from handle_ready() and handle_request(). What appears to be missing is a call to s_dispatch() at the end of handle_final() in mdp_broker.c
To reproduce, run attached broker, worker and client. then create multiple requests (say, 5 requests) with the client before handling them in the worker. Only the first one will be processed by the client. then create a new request in the client and handle a request in the worker. Now request number 2 will be handled, etc.
so IMHO, handle_final() should look like this (added a call to s_dispatch() after re-adding a worker as waiting):
static void
handle_worker_final (client_t *self)
{
mdp_msg_t *msg = self->message;
mdp_msg_t *client_msg = mdp_msg_new();
// Set routing id, messageid, service, body
zframe_t *address = mdp_msg_address(msg);
mdp_msg_set_routing_id(client_msg, address);
mdp_msg_set_id(client_msg, MDP_MSG_CLIENT_FINAL);
const char *service_name = self->service_name;
mdp_msg_set_service(client_msg, service_name);
zmsg_t *body = mdp_msg_get_body(msg);
mdp_msg_set_body(client_msg, &body);
mdp_msg_send(client_msg, self->server->router);
// Add the worker back to the list of waiting workers.
char *identity = zframe_strhex(mdp_msg_routing_id(msg));
worker_t *worker =
(worker_t *) zhash_lookup(self->server->workers, identity);
assert(worker);
zlist_append(self->server->waiting, worker);
service_t *service = (service_t *) zhash_lookup(self->server->services,
worker->service->name);
assert(service);
zlist_append(service->waiting, worker);
zstr_free(&identity);
mdp_msg_destroy(&client_msg);
s_service_dispatch(service);
}
cheers,
Joerg