How to use exometer to monitor stock tick stream?
Hello,
I am going to use exometer to monitor stock tick stream, InfluxDB acts as the backend database. And the table schema is designed as below:
time stock_name exchange acc_count
----------------------------------------------
t1 appl x 1029
t2 amzn y 20
t3 appl x 1043
t4 amzn y 80
And I create each metric for every stock
exometer:update_or_create([tick, StockName, Exchange], 1, counter, []).
My question is how can I write exometer_report:subscribe/6 method to send data back to influxdb described above?
The way exometer works, you typically define a reporter which subscribes to metrics and at regular (configurable) intervals pushes data to some data store, log or analytics backend.
What this gives you is a set of periodic snapshots, which is often just what you want.
Is your question how you can ensure that all ticks are reported to the database?
@uwiger, yes. I want to collect all ticks count snapshot and report to database.
Update counter in client [updated]
exometer:update_or_create([tick, StockName, Exchange], 1, counter, []).
I have tried to wrap the snapshot like below (stock_count.erl)
init() ->
ok = exometer:new([tick],
{function, ?MODULE, snapshot_count, [], proplist,
[stock_name, exchange, acc_count]}).
subscribe(Reporter, Interval) ->
ok = exometer_report:subscribe(Reporter,
[tick],
[stock_name, exchange, acc_count],
Interval, [], true).
snapshot_count() ->
Metrics = exometer:find_entries([tick, '_', '_']), % [updated]
lists:foldl(
fun({Metric, _, _}, Acc) ->
[mkt_tick, StockName, Exchange] = Metric,
Count =
case exometer:get_value(Metric, value) of
{ok, V} ->
proplists:get_value(value, V, 0);
{error, not_found} ->
0
end,
Acc ++ [{Metric, [{stock_name, StockName},
{exchange, Exchange}, % [updated]
{acc_count, Count}]}]
end, [], Metrics).
The return value of snapshot_count is array of proplists and it fails to report to backend. Any suggestions?
Ok, just to check: your snapshot_count() function searches for metrics with the pattern [tick, '_', '_', '_'], but you previously said that your updates look like exometer:update_or_create([tick, StockName], 1, counter, []).
Also, your foldl() function doesn't actually produce a proplist of the form [{stock_name, V1},{exchange,V2},{acc_count,V3}]
Was this just a typo? What happens when you manually sample the [tick] metric with exometer:get_value([tick]).
My understanding of what you describe leads me to suggest this:
For demonstration, I create a reporter which simply stores values in an ets table. Relevant callbacks:
exometer_init(_Opts) ->
ets:new(demo_ticks, [ordered_set, named_table, public]),
{ok, []}.
exometer_report(Metric, value, _Extra, Value, St) ->
Time = erlang:monotonic_time(millisecond),
ets:insert(demo_ticks, {{Time,Metric}, Value}),
{ok, St};
exometer_report(_Metric, _DataPoint, _Extra, _Value, St) ->
{ok, St}.
exometer_report_bulk(Found, _Extra, St) ->
Time = erlang:monotonic_time(millisecond),
[ets:insert(demo_ticks, {{Time,Metric}, proplists:get_value(value, DPs)})
|| {Metric, DPs} <- Found],
Then, testing it in the shell:
Eshell V9.1 (abort with ^G)
1> application:ensure_all_started(exometer_core).
...
{ok,[hut,setup,bear,folsom,exometer_core]}
2> [exometer:update_or_create([tick,StockName,Exchange],1,counter,[]) || {StockName,Exchange} <- [{appl,x},{amzn,x},{appl,y},{amzn,y}]].
[ok,ok,ok,ok]
3> exometer_report:add_reporter(tick_reporter, [{module,tick_reporter},{intervals,[{main,5000}]},{report_bulk,true}]).
ok
4> exometer_report:subscribe(tick_reporter,{find,[tick,'_','_']},value,main).
ok
5> ets:tab2list(demo_ticks).
[{{-576460744789,[tick,amzn,x]},1},
{{-576460744789,[tick,amzn,y]},1},
{{-576460744789,[tick,appl,x]},1},
{{-576460744789,[tick,appl,y]},1},
{{-576460744788,[tick,amzn,x]},1},
{{-576460744788,[tick,amzn,y]},1},
{{-576460744788,[tick,appl,x]},1},
{{-576460744788,[tick,appl,y]},1},
{{-576460739787,[tick,amzn,x]},1},
{{-576460739787,[tick,amzn,y]},1},
{{-576460739787,[tick,appl,x]},1},
{{-576460739787,[tick,appl,y]},1},
{{-576460739786,[tick,amzn,x]},1},
{{-576460739786,[tick,amzn,y]},1},
{{-576460739786,[tick,appl,x]},1},
{{-576460739786,[tick,appl,y]},1}]
6> [exometer:update_or_create([tick,StockName,Exchange],1,counter,[]) || {StockName,Exchange} <- [{appl,x},{amzn,x},{appl,y},{amzn,y}]].
[ok,ok,ok,ok]
7> ets:tab2list(demo_ticks).
[...,
{{-576460739786,[tick,appl,y]},1},
{{-576460734785,[tick,amzn,x]},1},
{{-576460734785,[tick,amzn,y]},1},
{{-576460734785,[tick,appl,x]},1},
{{-576460734785,[tick,appl,y]},1},
{{-576460734783,[tick,amzn,x]},1},
{{-576460734783,[tick,amzn,y]},1},
{{-576460734783,[tick,appl,x]},1},
{{-576460734783,[tick,appl|...]},1},
{{-576460729782,[tick|...]},2},
{{-576460729782,[...]},2},
{{-576460729782,...},2},
{{...},...},
{...}|...]
Ok, just to check: your snapshot_count() function searches for metrics with the pattern [tick, '', '', '_'], but you previously said that your updates look like exometer:update_or_create([tick, StockName], 1, counter, []). Also, your foldl() function doesn't actually produce a proplist of the form [{stock_name, V1},{exchange,V2},{acc_count,V3}]
@uwiger, thanks for point out the errors. I've update the code in the original post. Creating new reporter is enlightening for me and I will to try to handle the data points in the reporter.
As mentioned before, I wanted to collect data from all stock metrics and send the data to one table in backend. And in above demonstration, it creates tables like tick_appl_x and tick_amzn_y according to the exometer_report_influxdb. Do you have any suggestion on merging the metrics into one table?
Well, my example simply wrote the metric and values to an ets table, using an increasing timestamp to separate consecutive updates. But the tick_reporter used the same callback (exometer_report_bulk) as the exometer_report_influxdb code you mentioned; the illustration was meant to show how you can set up a subscription on all tick counters and receive them periodically as a list of values in the reporter callback. Thus, you don't need a function metric that does the folding and aggregation.