Add realtime inference consumer
Realtime Consumers are consumers for the results from the invocation of a model from the realtime analysis pipeline (see model-builder, model-invocation-endpointand ksqldb). The results from the pipeline are in JSON and the invocation is called by a KSQL function which gets inference results then puts the results in a kafka topic. These consumers are meant to consume data from this topic.
3 terms used -
Action: Any task to be performed is an action. This can be sending a notification or an email or just logging something.Condition: This is a predicate that will be evaluated on the incoming data. For instance, this can be used to evaluate if the ML model run crossed a threshold of some sort.RealtimeInferenceConsumer: This is the base consumer for realtime results topics. Each consumer can be configured with a set of Conditions and Actions. If all the conditions evaluate to be true, then all the actions are triggered.
Currently, only one Condition is provided which evaluates a JSONPath expression. This should be sufficient for most simple use cases.
More complex use cases can create concrete implementations of the Condition
interface or ConditionBase abstract class. Currently, the JsonPath expression is read from the
configuration file and hence is static for a particular consumer and topic. Later, we can also
provide this through AppConfig so it can be dynamic based on userId or projectId.
The supported config (for instance an intervention using aRMT app) is of the format -
realtime-consumers:
- name: 'lstm-lung-study-consumer' # Name of the consumer
topic: 'lung_study_lstm_ad_inference' # Kafka topic to consume from (this should contain the inference results)
notify_errors:
email_addresses:
- '[email protected]'
conditions:
- name: 'LocalJsonPathCondition' # Name of the condition
projects: [ 'radar-test' ] # Only evaluate for these projects
subjects: [ 'sub-1', 'sub-2' ] # Only evaluate for these subjects
properties:
jsonpath: '$[?(@.invocation_result.anomaly_detected == true)]' # JsonPath expression to evaluate
key: 'invocation_result' # Key that contains data to evaluate
actions:
- name: 'ActiveAppNotificationAction' # Name of the action
projects: [ 'radar-test' ] # Only execute for these projects
subjects: [ 'sub-1', 'sub-2' ] # Only execute for these subjects
properties:
questionnaire_name: 'ers' # Name of the questionnaire to trigger
time_of_day: '09:00:00' # Local user time of day to trigger at
default_timezone: 'Europe/London' # Default timezone to use for the time of day if not found in the appserver
appserver_base_url: 'http://localhost:8080/' # Base URL of the appserver
management_portal_token_url: 'http://localhost/managementportal/api/oauth/token' # URL to get the management portal token
client_id: 'realtime_consumer' # Client ID for the management portal
client_secret: 'secret' # Client secret for the management portal
metadata_key: 'invocation_result' # Key that contains the metdata to be forwarded to the aRMT app
- name: 'EmailUserAction'
projects: [ 'radar-test' ]
subjects: [ 'sub-1', 'sub-2' ]
properties:
email_addresses: [ '[email protected]' ]
Note: The properties section is specific to each Action and Condition. Please take a look at the
condition and action docs for the keys supported. If the projects or subjects key is not
specified the action and condition will be used on all projects or subjects respectively.
At some point the threshold may need to come from radar-app-config service instead of from a fixed value.
Yes definitely can create impl to read the config from the app-config.
This may also be a good entry point to enable / disable the condition for a given project (e.g. to only process records for certain projects).
Yes a good idea, will have a think through how it can be done, since then conditions will need to be accessed based on the project in the record in real-time. ~~I guess a very simple first approach could be to just to add a getConditionsFor(String project) and getActionsFor(String project) and possibly also a user-specific variant like getConditionsFor(String project, String user) to the RealtimeConsumerConfig, and use that on each incoming record.~~
EDIT: Actually made projects a part of conditions and actions, so you can specify a projects list to filter-
...
conditions:
- name: 'LocalJsonPathCondition'
projects: ['radar-test']
...
If no projects are provided, works as default on all projects. But now I am wondering how this will be achieved when we move to app-config - will the conditions have the projects key? Or will it be on the request path like /config/client/realtime-consumers/projects/{project} ?
@mpgxvii, is it possible for you to review this when you have some free bandwidth? I think you plan to use this in the near future, so might be good to review and familiarise, unless you plan to use something else?