jesque
jesque copied to clipboard
java.lang.ClassCastException: [B cannot be cast to java.lang.Long
@gresrun: Is #90 really fixed? I have seen a similar Exception in my code. I have written a little ugly presentation test to show, that this exception can still be thrown.
I'm using jesque v2.1.1
Some informations about my redis DB:
> 127.0.0.1:6379> INFO
> # Server
> redis_version:2.8.4
> redis_git_sha1:00000000
> redis_git_dirty:0
> redis_build_id:a44a05d76f06a5d9
> redis_mode:standalone
> os:Linux 3.19.0-32-generic x86_64
> arch_bits:64
> multiplexing_api:epoll
> gcc_version:4.8.2
> process_id:2863
> run_id:77826f62f35753f66b72d04f349650fa3c2f3071
> tcp_port:6379
> uptime_in_seconds:90919
> uptime_in_days:1
> hz:10
> lru_clock:694402
> config_file:/etc/redis/redis.conf
And below there is the test class. I know that this code is not very cool, but it shows this exception:
Exception in thread "Worker-2 Jesque-2.1.1: STOPPING" java.lang.ClassCastException: [B cannot be cast to java.lang.Long
at redis.clients.jedis.Connection.getIntegerReply(Connection.java:222)
at redis.clients.jedis.Jedis.srem(Jedis.java:1091)
at net.greghaines.jesque.worker.WorkerImpl.run(WorkerImpl.java:198)
at java.lang.Thread.run(Thread.java:745)
Please notice the timeout is set to 20. If I increase the timeout to e.g. 5.000 (default), then the ClassCastException is not thrown (or perhaps the test have to run longer than before!).
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerImplFactory;
import net.greghaines.jesque.worker.WorkerListener;
import net.greghaines.jesque.worker.WorkerPool;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPool;
public class JesqueWorkerPoolTest{
private static final Logger LOG = LoggerFactory.getLogger(JesqueWorkerPoolTest.class);
private static final Random RANDOM = new Random();
private static final Object[] args = new Object[] {};
private static final Map<String, ? extends Object> vars = new HashMap<String, Object>();
private static final String className = JesqueWorkerPoolTest.class.getName();
private static final String Q_NAME = "JustTESTQ";
private static final List<String> queues = new ArrayList<String>();
static {
queues.add(Q_NAME);
}
private static volatile long lastRun = System.currentTimeMillis();
private static volatile AtomicLong workerStopCounter = new AtomicLong();
public static void main(
String[] mainargs) {
AtomicLong jobCounter = new AtomicLong();
Config jesqueConfig = getConfig();
JobFactory jobFactory = new JobFactory() {
@Override
public Object materializeJob(
Job job) throws Exception {
return new Runnable() {
@Override
public void run() {
System.out.println("Run JOB:" + jobCounter.getAndIncrement());
lastRun = System.currentTimeMillis();
}
};
}
};
ClientPoolImpl jesqueClientPool = new ClientPoolImpl(jesqueConfig, new JedisPool(jesqueConfig.getHost()));
WorkerImplFactory workerFactory = new WorkerImplFactory(jesqueConfig, queues, jobFactory);
WorkerPool workerPool = new WorkerPool(workerFactory, 4);
workerPool.getWorkerEventEmitter().addListener(new WorkerListener() {
@Override
public void onEvent(
WorkerEvent event,
Worker worker,
String queue,
Job job,
Object runner,
Object result,
Throwable t) {
ZonedDateTime lastWorkerProblemEventTime = ZonedDateTime.now();
String lastWorkerProblemMsg = String.format(
"Event '%s' @ %s:%s@%s. JobArgs: %s; JobVars:%s. EventTime:%s", event == null ? "<EventIsNull>"
: event.toString(), worker == null ? "<WorkerIsNull>" : worker.getName(), queue,
runner == null ? "<RunnerIsNull>" : runner.getClass().getSimpleName(),
job != null ? job.getArgs() : "null", job != null ? job.getVars() : "null",
lastWorkerProblemEventTime.toString());
if (t == null) {
LOG.info(lastWorkerProblemMsg);
} else {
LOG.warn(lastWorkerProblemMsg, t);
}
if (event.compareTo(WorkerEvent.WORKER_STOP) == 0) {
workerStopCounter.incrementAndGet();
}
}
});
long startWokerPool = System.currentTimeMillis();
workerPool.run();
while (workerStopCounter.get() < 4 && 100000 > (System.currentTimeMillis() - lastRun)) {
jesqueClientPool.enqueue(Q_NAME, new Job(className, args, vars));
LOG.info("workerStopCounter:" + workerStopCounter.get());
LOG.info("lastRun:" + lastRun);
LOG.info("System.currentTimeMillis() - lastRun:" + (System.currentTimeMillis() - lastRun));
try {
Thread.sleep(RANDOM.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endWorkerPool = System.currentTimeMillis();
LOG.info("jobCounter:" + jobCounter.get());
LOG.info("workerStopCounter:" + workerStopCounter.get());
long workerDurationMS = endWorkerPool - startWokerPool;
LOG.info("WorkerPoolWorkTime:" + new Duration(workerDurationMS));
LOG.info("workerPool.isShutdown():" + workerPool.isShutdown());
workerPool.end(true);
}
private static Config getConfig() {
ConfigBuilder configBuilder = new ConfigBuilder();
configBuilder.withHost("myredishost");
configBuilder.withTimeout(20);
configBuilder.withPort(6379);
return configBuilder.build();
}
}
Perhaps the problem of Stopping workers due to some known exceptions could be reduced (workaround), if the Pool would spawn new Workers if there is the WorkerEvent.WORKER_STOP event with this type of Exception? Because now I have to monitor the WorkerPool and start own workers or in the end I'm stucked because all workers are down.
`