influxdb-java
influxdb-java copied to clipboard
BatchProcessor exception handler is not triggering
Expected behaviour:
When registering an exceptionHandler in BatchOptions, it should be invoked when data cannot be written.
Actual behaviour: The handler isn't invoked, not even when attempting to write to a database that cannot even be connected to (because it doesn't exist). Log calls invoked by it are not showing up in the log, and debugging breakpoints set in the handler are never triggered.
This is how I register the handler (Kotlin syntax, but you should easily get the idea):
InfluxDBFactory.connect(props.uri, props.user, props.password).apply {
setDatabase(props.name)
enableBatch(BatchOptions.DEFAULTS.exceptionHandler { point, e ->
log.error("Error during write", e)
})
}
I can run this code against a bogus uri, and the handler doesn't trigger. Am I doing something completely wrong here?
which versions of java, influxdb and influxdb-java are involved ?
Java 8, influxdb-java 2.17, influxdb 1.7.10
This scenario is covered by a test, can only imagine this is somewhat related to kotlin, but i must admit that i dont know anything about kotlin
Seems unlikely. I never had any issues with Java interop so far, and when looking at the Debugger, the exceptionHandler in the BatchProcessor seems to be correctly instantiated. Just to make extra sure, I changed my code to implement the interface anonymously instead of using the Lambda syntax:
BatchOptions.Defaults.exceptionHandler(object: BiConsumer<Iterable<Point>, Throwable> {
override fun accept(t: Iterable<Point>, u: Throwable) {
throw RuntimeException("I am Error")
}
The only difference is that the kotlin compiler suggests that I should use lambda syntax instead, the problem is still the same. That makes it rather unlikely that this is an interop issue. Can you point me to that test? I'd like to take a look at it, maybe there's something I'm missing.
See: https://github.com/influxdata/influxdb-java/blob/master/src/test/java/org/influxdb/impl/BatchProcessorTest.java
Hmmm... I did a minimal sample in a Java project and a kotlin project, and it would appear that the handler is not invoked until disableBatch() is called. Is that expected behaviour? That would basically mean that I have to call enableBatch before every write, and call disableBatch afterwards.
Is that how it is intended to be used? Somehow I was under the impression that I would just have to call enableBatch once, and then disableBatch() when the application closes. If this is indeed the intended way, maybe clarify the documentation a bit.
I guess you hit a case where no real write to influxdb happened which would cause a trigger of the callback. By calling disableBatch all in flight points are written to the database which eventually fails. But if not enough points/time has passed this would not happen.
Ah. So when doing asynchronous writes, what's the best way of ensuring that the DB is responding? I'm thinking of making a watchdog thread that sends regular pings, do you have any better suggestion?
By the way, on the matter of InfluxDB.ping(), there's the slight oddity that if something else is listening on that port, it doesn't throw an exception. You get a valid Pong with version "unknown". It might be better to always throw if there isn't a valid response.
This is the problem with async, you might not know. But for the sake of performance this is required. You will get notified by your handler if the batch write did not succeed, so if you go for a short batch period with smaller amount of points, you can probably write these lost points in a second go if the handler was fired.
The problem with ping is to detect a invalid response, im glad to take improvements here.
Ah. So when doing asynchronous writes, what's the best way of ensuring that the DB is responding? I'm thinking of making a watchdog thread that sends regular pings, do you have any better suggestion?
@avisecag could you please provide a full example (test case?) where your code is not working as expected?
By the way, on the matter of InfluxDB.ping(), there's the slight oddity that if something else is listening on that port, it doesn't throw an exception. You get a valid Pong with version "unknown". It might be better to always throw if there isn't a valid response.
Any service listening on the specified port that answers with 204 and an empty body will be considered as a healthy InfluxDB instance. This is defined by https://docs.influxdata.com/influxdb/v1.7/tools/api/#ping-http-endpoint
If you want to make sure you can read from your InfluxDB (i.e. you have an up and running server), you could execute a query like "SHOW DATABASES" and expect at least one valid entry.
https://docs.influxdata.com/influxdb/v1.7/query_language/schema_exploration#show-databases
could you please provide a full example (test case?) where your code is not working as expected?
Well, it's not so much about what I expect as what I should expect, since apparently the described behaviour is normal. If you want code to reproduce just that, sure thing (precondition: Don't have an actual influxDB running on localhost:8086):
public class Main {
public static void main(String[] args) {
final InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086", "user", "password");
influxDB.setDatabase("nobase");
influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler( (points, e) -> {
System.out.println("exception handler invoked");
}));
final Map<String, Object> fields = new TreeMap<String, Object>();
fields.put("foo", "bar");
influxDB.write(Point.measurement("foo").fields(fields).build());
System.out.println("waiting");
try {
Thread.sleep(2000);
} catch(Exception e) {
System.out.println(e.toString());
}
System.out.println("done waiting");
influxDB.disableBatch();
}
}
No matter how long you make the break, the handler is not invoked until disableBatch is called:
waiting
done waiting
exception handler invoked
Any service listening on the specified port that answers with 204 and an empty body will be considered as a healthy InfluxDB instance.
Ok, I wasn't aware of that. I thought if you already have a version in the body you might as well use it to validate the response, but the spec is the spec.