morgoth
morgoth copied to clipboard
How to detect anomaly for Kafka rate of messages being processed
Hi Nathaniel,
I need to find out anomaly on kafka message produced to our kafka-topic-* measurements using morgoth,
Here is the tick script
can you verify and check if this should give correct anomaly data
Also I am am not able to save the topic name which is the name of the measurement into the influxdb(kafka-morgoth-alert)
also can you explain what should be value for minSupport and errorTolerance
var groups = 'host'
var field = 'produced'
var scoreField = 'anomalyScore'
var minSupport = 0.05
var errorTolerance = 0.01
var consensus = 0.05
var sigmas = 3.5
var last_day_mean = batch
|query('SELECT * FROM "sensu"."default"./kafka-topic-lst_plugin.*/')
.groupBy(groups)
.period(1d)
.every(10m)
.align()
@morgoth()
.field(field)
.scoreField(scoreField)
.minSupport(minSupport)
.errorTolerance(errorTolerance)
.consensus(consensus)
.sigma(sigmas)
|alert()
.details('Kafka Message Produced Is Anomalous')
.crit(lambda: "anomalyScore" > 0.98)
.log('/tmp/kafka-morgoth-alert.log')
|influxDBOut()
.database('sensu')
.retentionPolicy('default')
.measurement('kafka-morgoth-alert')\
can you please help
@sukantasaha Looks like you are headed in the right direction. Here are a few pointers:
- From the sample data you provided the
produced
field looks like it is a counter, meaning it is an always increasing value. If that is the case computing the derivative ofproduced
so that you have a rate of produced values would be better. This way morgoth can compute the anomalies in the rate of production, which is probably what you want. - To under stand the values of
minSupport
anderrorTolerance
read through this doc http://docs.morgoth.io/docs/detection_framework/
Can you check this , have modified the script to find out derivative
var groups = 'host' var field = 'derivative'
var scoreField = 'anomalyScore' var minSupport = 0.05 var errorTolerance = 0.01 var consensus = 0.5
var sigmas = 4.0
var last_day_mean = batch |query('SELECT derivative(produced,6m) FROM "sensu"."default"./kafka-topic.*/') .groupBy(groups) .offset(1d) .period(18m) .every(6m) .align() @morgoth() .field(field) .scoreField(scoreField) .minSupport(minSupport) .errorTolerance(errorTolerance) .consensus(consensus) .sigma(sigmas) |alert() .crit(lambda: "anomalyScore" > 0.98) .log('/tmp/kafka-anomaly-with-morgoth-derivative.log') |influxDBOut() .database('sensu') .retentionPolicy('default') .measurement('kafka-anomaly-with-morgoth-derivative')
@sukantasaha It looks good. The only part that you might want to double check is the .offset(1d)
means that you are always querying data from the previous day.If you want to query data from the current time just remove that line.
ok I removed offset and enable the tasks, now I see in the logs alerts generated,
[kafka-derivative:alert3] 2016/12/07 11:48:00 D! CRITICAL alert triggered id:kafka-topic-social_feedback_event_queue:host=Kafka-Staging8 msg:kafka-topic-social_feedback_event_queue:host=Kafka-Staging8 is CRITICAL data:&{kafka-topic-social_feedback_event_queue map[host:Kafka-Staging8] [time anomalyScore derivative] [[2016-12-07 11:42:08.421 +0000 UTC 0.9803921568627451 0]]
question is if derivative value is 0 then why its detecting as anomaly ?