jPOS icon indicating copy to clipboard operation
jPOS copied to clipboard

Silent drop of messages on OneShotChannelAdaptorMK2

Open rainer010 opened this issue 4 years ago • 4 comments

OneShotChannelAdaptorMK2 uses one ThreadPoolExecutor with a queue of the type SynchronousQueue and It limits the number of threads. With this configuration, when the limit is reached, all subsequent messages are immediately discarded. See ThreadPoolExecutor documentation in Queuing section. Some of these solutions are recommended:

  • Allow to configure queue type

    String queueType = cfg.get("queue-type", "synchronous");
        Queue queue;
        switch (queueType){
            case "linked-blocking":
                queue=new LinkedBlockingDeque();
                break;
            case "array-blocking":
                queue=new ArrayBlockingQueue(size);
                break;
            case "synchronous":
                queue=new SynchronousQueue();
                break;
            default:
                throw new ConfigurationException();
        }
  • Do not define a maximum thread limit
        // define in this way
        threadPool = Executors.newCachedThreadPool();
        
        // or this way      
        threadPool = new ThreadPoolExecutor(1,
                Integer.MAX_VALUE,
                10,
                TimeUnit.SECONDS,
                new SynchronousQueue<>());
  • Change the strategy of drop message notification. Use another logger to report that messages were discarded (usually channel logger is disabled in prod). Another strategy could be to notify the QueryHost that the message was discarded.
// Option 1
try {
    threadPool.execute(new Worker(m, i));
}catch (Exception e){                        
   getDiscardedLog().error(getName()+"Discard",e.getMessage());
}
// Option 2
try {
    threadPool.execute(new Worker(m, i));
}catch (Exception e){
    sp.out(out, buildResponseWithError(m));
}

rainer010 avatar Jun 02 '21 20:06 rainer010

Are you using the channel adapter directly, without going through a MUX, that in turn uses the Space?

ar avatar Jun 02 '21 23:06 ar

We are using a QMUX and sending message with the QueryHost participant.

rainer010 avatar Jun 03 '21 13:06 rainer010

I add an example to explain myself better.

With this example setup:

<channel-adaptor name="jpts-channel" class="org.jpos.iso.OneShotChannelAdaptorMK2"
                 logger="Q2">
    ....
    <max-connections>128</max-connections>
    <max-connect-attempts>20</max-connect-attempts>
</channel-adaptor>

If there are already 128 active connections (threads) waiting for a response, OneShotChannelAdaptorMK2 will create a new worker if there is a pending message in its queue, this will generate an exception immediately (because we are using SynchronousQueue and there are no available threads).

...
    public void run(){
        while (running()){
            try{
                Object o = sp.in(in, delay);
                if (o instanceof ISOMsg){
                    ...
                    threadPool.execute(new Worker(m, i));
                }
            }catch (Exception e){
                getLog().warn(getName(), e.getMessage());
            }
    ...

Generally, the channel log is deactivated in prod, so we do not find out about this situation, neither does the QueryHost find out about this situation and it fails only when its timeout expires.

rainer010 avatar Jun 03 '21 14:06 rainer010

This problem doesn't happen in the OneShotChannelAdapter, I suggest you use that one instead of the contributed MK2 variant.

ar avatar Jun 04 '21 00:06 ar