jesque icon indicating copy to clipboard operation
jesque copied to clipboard

java.lang.ClassCastException: [B cannot be cast to java.lang.Long

Open m8r-ubnc5g opened this issue 9 years ago • 0 comments

@gresrun: Is #90 really fixed? I have seen a similar Exception in my code. I have written a little ugly presentation test to show, that this exception can still be thrown.

I'm using jesque v2.1.1

Some informations about my redis DB:

> 127.0.0.1:6379> INFO
> # Server
> redis_version:2.8.4
> redis_git_sha1:00000000
> redis_git_dirty:0
> redis_build_id:a44a05d76f06a5d9
> redis_mode:standalone
> os:Linux 3.19.0-32-generic x86_64
> arch_bits:64
> multiplexing_api:epoll
> gcc_version:4.8.2
> process_id:2863
> run_id:77826f62f35753f66b72d04f349650fa3c2f3071
> tcp_port:6379
> uptime_in_seconds:90919
> uptime_in_days:1
> hz:10
> lru_clock:694402
> config_file:/etc/redis/redis.conf

And below there is the test class. I know that this code is not very cool, but it shows this exception:

Exception in thread "Worker-2 Jesque-2.1.1: STOPPING" java.lang.ClassCastException: [B cannot be cast to java.lang.Long
    at redis.clients.jedis.Connection.getIntegerReply(Connection.java:222)
    at redis.clients.jedis.Jedis.srem(Jedis.java:1091)
    at net.greghaines.jesque.worker.WorkerImpl.run(WorkerImpl.java:198)
    at java.lang.Thread.run(Thread.java:745)

Please notice the timeout is set to 20. If I increase the timeout to e.g. 5.000 (default), then the ClassCastException is not thrown (or perhaps the test have to run longer than before!).


import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerImplFactory;
import net.greghaines.jesque.worker.WorkerListener;
import net.greghaines.jesque.worker.WorkerPool;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.JedisPool;

public class JesqueWorkerPoolTest{

    private static final Logger LOG = LoggerFactory.getLogger(JesqueWorkerPoolTest.class);

    private static final Random RANDOM = new Random();
    private static final Object[] args = new Object[] {};
    private static final Map<String, ? extends Object> vars = new HashMap<String, Object>();
    private static final String className = JesqueWorkerPoolTest.class.getName();

    private static final String Q_NAME = "JustTESTQ";
    private static final List<String> queues = new ArrayList<String>();
    static {
        queues.add(Q_NAME);
    }

    private static volatile long lastRun = System.currentTimeMillis();
    private static volatile AtomicLong workerStopCounter = new AtomicLong();

    public static void main(
            String[] mainargs) {
        AtomicLong jobCounter = new AtomicLong();

        Config jesqueConfig = getConfig();

        JobFactory jobFactory = new JobFactory() {

            @Override
            public Object materializeJob(
                    Job job) throws Exception {
                return new Runnable() {

                    @Override
                    public void run() {
                        System.out.println("Run JOB:" + jobCounter.getAndIncrement());
                        lastRun = System.currentTimeMillis();
                    }
                };
            }
        };

        ClientPoolImpl jesqueClientPool = new ClientPoolImpl(jesqueConfig, new JedisPool(jesqueConfig.getHost()));

        WorkerImplFactory workerFactory = new WorkerImplFactory(jesqueConfig, queues, jobFactory);
        WorkerPool workerPool = new WorkerPool(workerFactory, 4);

        workerPool.getWorkerEventEmitter().addListener(new WorkerListener() {
            @Override
            public void onEvent(
                    WorkerEvent event,
                    Worker worker,
                    String queue,
                    Job job,
                    Object runner,
                    Object result,
                    Throwable t) {
                ZonedDateTime lastWorkerProblemEventTime = ZonedDateTime.now();
                String lastWorkerProblemMsg = String.format(
                        "Event '%s' @ %s:%s@%s. JobArgs: %s; JobVars:%s. EventTime:%s", event == null ? "<EventIsNull>"
                                : event.toString(), worker == null ? "<WorkerIsNull>" : worker.getName(), queue,
                        runner == null ? "<RunnerIsNull>" : runner.getClass().getSimpleName(),
                        job != null ? job.getArgs() : "null", job != null ? job.getVars() : "null",
                        lastWorkerProblemEventTime.toString());
                if (t == null) {
                    LOG.info(lastWorkerProblemMsg);
                } else {
                    LOG.warn(lastWorkerProblemMsg, t);
                }
                if (event.compareTo(WorkerEvent.WORKER_STOP) == 0) {
                    workerStopCounter.incrementAndGet();
                }
            }
        });

        long startWokerPool = System.currentTimeMillis();
        workerPool.run();
        while (workerStopCounter.get() < 4 && 100000 > (System.currentTimeMillis() - lastRun)) {
            jesqueClientPool.enqueue(Q_NAME, new Job(className, args, vars));
            LOG.info("workerStopCounter:" + workerStopCounter.get());
            LOG.info("lastRun:" + lastRun);
            LOG.info("System.currentTimeMillis() - lastRun:" + (System.currentTimeMillis() - lastRun));
            try {
                Thread.sleep(RANDOM.nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long endWorkerPool = System.currentTimeMillis();

        LOG.info("jobCounter:" + jobCounter.get());
        LOG.info("workerStopCounter:" + workerStopCounter.get());
        long workerDurationMS = endWorkerPool - startWokerPool;
        LOG.info("WorkerPoolWorkTime:" + new Duration(workerDurationMS));
        LOG.info("workerPool.isShutdown():" + workerPool.isShutdown());

        workerPool.end(true);
    }

    private static Config getConfig() {
        ConfigBuilder configBuilder = new ConfigBuilder();
        configBuilder.withHost("myredishost");
        configBuilder.withTimeout(20);
        configBuilder.withPort(6379);
        return configBuilder.build();
    }

}

Perhaps the problem of Stopping workers due to some known exceptions could be reduced (workaround), if the Pool would spawn new Workers if there is the WorkerEvent.WORKER_STOP event with this type of Exception? Because now I have to monitor the WorkerPool and start own workers or in the end I'm stucked because all workers are down.

`

m8r-ubnc5g avatar Jan 28 '16 11:01 m8r-ubnc5g