morgoth icon indicating copy to clipboard operation
morgoth copied to clipboard

How to detect anomaly for Kafka rate of messages being processed

Open sukantasaha opened this issue 8 years ago • 5 comments

Hi Nathaniel,

I need to find out anomaly on kafka message produced to our kafka-topic-* measurements using morgoth, Here is the tick script
can you verify and check if this should give correct anomaly data

Also I am am not able to save the topic name which is the name of the measurement into the influxdb(kafka-morgoth-alert)

also can you explain what should be value for minSupport and errorTolerance

var groups = 'host'
var field = 'produced'

var scoreField = 'anomalyScore'
var minSupport = 0.05
var errorTolerance = 0.01
var consensus = 0.05
var sigmas = 3.5

var last_day_mean = batch
    |query('SELECT * FROM "sensu"."default"./kafka-topic-lst_plugin.*/')
        .groupBy(groups)
        .period(1d)
        .every(10m)
        .align()
    @morgoth()
        .field(field)
        .scoreField(scoreField)
        .minSupport(minSupport)
        .errorTolerance(errorTolerance)
        .consensus(consensus)
        .sigma(sigmas)
    |alert()
        .details('Kafka Message Produced Is Anomalous')
        .crit(lambda: "anomalyScore" > 0.98)
        .log('/tmp/kafka-morgoth-alert.log')
    |influxDBOut()
        .database('sensu')
        .retentionPolicy('default')
        .measurement('kafka-morgoth-alert')\

sukantasaha avatar Nov 27 '16 15:11 sukantasaha

can you please help

sukantasaha avatar Nov 28 '16 11:11 sukantasaha

@sukantasaha Looks like you are headed in the right direction. Here are a few pointers:

  1. From the sample data you provided the produced field looks like it is a counter, meaning it is an always increasing value. If that is the case computing the derivative of produced so that you have a rate of produced values would be better. This way morgoth can compute the anomalies in the rate of production, which is probably what you want.
  2. To under stand the values of minSupport and errorTolerance read through this doc http://docs.morgoth.io/docs/detection_framework/

nathanielc avatar Nov 28 '16 16:11 nathanielc

Can you check this , have modified the script to find out derivative

var groups = 'host' var field = 'derivative'

var scoreField = 'anomalyScore' var minSupport = 0.05 var errorTolerance = 0.01 var consensus = 0.5

var sigmas = 4.0

var last_day_mean = batch |query('SELECT derivative(produced,6m) FROM "sensu"."default"./kafka-topic.*/') .groupBy(groups) .offset(1d) .period(18m) .every(6m) .align() @morgoth() .field(field) .scoreField(scoreField) .minSupport(minSupport) .errorTolerance(errorTolerance) .consensus(consensus) .sigma(sigmas) |alert() .crit(lambda: "anomalyScore" > 0.98) .log('/tmp/kafka-anomaly-with-morgoth-derivative.log') |influxDBOut() .database('sensu') .retentionPolicy('default') .measurement('kafka-anomaly-with-morgoth-derivative')

sukantasaha avatar Dec 02 '16 10:12 sukantasaha

@sukantasaha It looks good. The only part that you might want to double check is the .offset(1d) means that you are always querying data from the previous day.If you want to query data from the current time just remove that line.

nathanielc avatar Dec 02 '16 16:12 nathanielc

ok I removed offset and enable the tasks, now I see in the logs alerts generated,

[kafka-derivative:alert3] 2016/12/07 11:48:00 D! CRITICAL alert triggered id:kafka-topic-social_feedback_event_queue:host=Kafka-Staging8 msg:kafka-topic-social_feedback_event_queue:host=Kafka-Staging8 is CRITICAL data:&{kafka-topic-social_feedback_event_queue map[host:Kafka-Staging8] [time anomalyScore derivative] [[2016-12-07 11:42:08.421 +0000 UTC 0.9803921568627451 0]] }

question is if derivative value is 0 then why its detecting as anomaly ?

sukantasaha avatar Dec 07 '16 11:12 sukantasaha