rrcf icon indicating copy to clipboard operation
rrcf copied to clipboard

Realtime Streaming Data Integration

Open cjkini opened this issue 4 years ago • 6 comments

Hi Guys. Thanks for this RCCF library. It is so useful. I have a question here.

How do we integrate this with realtime streaming data such as from mqtt server.? (e.g:mqtt server sends data every one second)

In this rccf streaming example, the generated sine wave data need to be completely generated first, then later it will be pumped into rccf for anomaly detection.

My question here is how do we implement it, if the data keep continuously generated?

I am thinking of using que, or buffer that collect the data point first before it is submitted to rccf for anomaly scoring.

Regards, cj

cjkini avatar Aug 23 '19 02:08 cjkini

Hi @cjkini

Here's a minimal example of streaming data to a server with rrcf.

Server code:

from collections import deque
import rrcf
from sanic import Sanic, response

app = Sanic()
NULL = b''

# Set tree parameters
num_trees = 40
tree_size = 256

# Create a forest of empty trees
forest = []
for _ in range(num_trees):
    tree = rrcf.RCTree()
    forest.append(tree)

# Create deque to keep track of indices
indices = deque([], maxlen=tree_size)

@app.route("/", methods=['POST'])
async def feed_rrcf(request):
    # Get request data
    json = request.json
    index = json.setdefault('index')
    point = json.setdefault('point')
    is_valid = (index is not None) and (point is not None)
    # If point is valid...
    if is_valid:
        # Check if tree size is maxed out
        if len(indices) == tree_size:
            oldest_index = indices.popleft()
        else:
            oldest_index = None
        # Add new index to queue
        indices.append(index)
        # Initialize anomaly score
        avg_codisp = 0
        # For each tree...
        for tree in forest:
            # If tree is above permitted size, drop the oldest point (FIFO)
            if oldest_index is not None:
                tree.forget_point(oldest_index)
            # Insert the new point into the tree
            tree.insert_point(point, index=index)
            # Compute codisp on the new point and take the average among all trees
            avg_codisp += tree.codisp(index) / num_trees
        print('CoDisp for point ({index}) is {avg_codisp}'.format(index=index,
                                                                  avg_codisp=avg_codisp))
    return response.raw(NULL)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

Code to send data

import requests
import time
import numpy as np

# Set parameters
num_points = 200
ndim = 3
endpoint = 'http://localhost:8000/'

# Set random seed
np.random.seed(0)

# Send random data points to server
for index in range(num_points):
    point = np.random.randn(ndim).tolist()
    json = {'index' : index, 'point' : point}
    requests.post(endpoint, json=json)
    time.sleep(0.01)

Output

>>> python3 server_code.py

[2019-08-23 00:50:15 -0400] [49148] [INFO] Goin' Fast @ http://0.0.0.0:8000
[2019-08-23 00:50:15 -0400] [49148] [INFO] Starting worker [49148]
CoDisp for point (0) is 0.0
[2019-08-23 00:50:34 -0400] - (sanic.access)[INFO][127.0.0.1:49509]: POST http://localhost:8000/  200 0
CoDisp for point (1) is 1.0000000000000004
[2019-08-23 00:50:34 -0400] - (sanic.access)[INFO][127.0.0.1:49511]: POST http://localhost:8000/  200 0
CoDisp for point (2) is 1.2750000000000001
...
CoDisp for point (198) is 3.739963294254606
[2019-08-23 00:51:32 -0400] - (sanic.access)[INFO][127.0.0.1:50110]: POST http://localhost:8000/  200 0
CoDisp for point (199) is 4.481089816800935

mdbartos avatar Aug 23 '19 04:08 mdbartos

@mdbartos

Your example above is calculating a CoDisp for each point in the stream but it's not determining whether a point is an anomaly.

  1. Would it make sense to create a dictionary that stores the CoDisp for each point seen? This would allow for the calculation of a CoDisp threshold (e.g. median + 3*std). If the CoDisp of the next point is greater than the threshold then it'd be an anomaly. This threshold would be recalculated with each new CoDisp added to the dictionary.
  2. Should the RRCF reach maximum size (i.e. tree_size==256 in your example) prior to calculating the CoDisp of any points? I see that you're calculating a point's CoDisp before you reach the maximum tree size.

Thanks!

sdlis avatar Aug 26 '19 23:08 sdlis

@sdlis

Thanks for pointing this out.

The code I posted is just intended as a minimal example for showing the mechanics of running rrcf on a server with streaming data. It should definitely be modified to suit the user's needs.

  1. Agreed. Also, computing the average CoDisp over a point's entire lifetime in the tree would be a better anomaly score than just the CoDisp on ingress. There are also probably better sampling strategies than just FIFO sampling (e.g. reservoir sampling), but the correct method to choose would be application-dependent.

  2. I could see a case for doing it either way. If your threshold is set as a percentile of observed CoDisps, then it could still function as a useful anomaly score for n < 256.

mdbartos avatar Aug 27 '19 03:08 mdbartos

Thanks for the quick reply!

Building on the idea of maintaining a CoDisp dictionary for threshold calculation, should the CoDisps of points from this dictionary be removed when their corresponding points are removed from the tree?

For example, a tree with tree_size==256 is about to receive its 257th point. The first point is removed from the tree using forget_point() and then the 257th point is inserted. Should the first CoDisp be removed from the CoDisp dictionary?

sdlis avatar Aug 27 '19 22:08 sdlis

@sdlis

That sounds reasonable. You can base your threshold on a longer record if needed though.

mdbartos avatar Sep 03 '19 18:09 mdbartos

@mdbartos

I have 2 questions regarding reservoir sampling with RRCF:

  1. Is the codisp (and not avg_codisp) of a point used since there's no guarantee the point will be in multiple trees in the forest let alone at the same index?

  2. My understanding of RRCF is that a point can't be scored without first being inserted into a tree. However, when using reservoir sampling, there's a chance [1-(tree_size/index)] (https://en.wikipedia.org/wiki/Reservoir_sampling) that an incoming point in a stream isn't selected to be inserted into a tree. However, for my use case I need to score all incoming points. Therefore, when a point isn't selected for insertion but still needs to be scored, is it correct to simply insert the new point, calculate the codisp, and then remove the new point from the tree?

Thanks.

sdlis avatar Sep 27 '19 13:09 sdlis