Akumuli
Akumuli copied to clipboard
Improve handling of the ephimeral time-series
This problem can be recreated using this script:
from __future__ import print_function
import argparse
import datetime
import random
import sys
import socket
class TCPChan:
def __init__(self, host, port):
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__sock.connect((host, port))
def send(self, data):
self.__sock.send(data)
PORT = 8282
HOST = '127.0.0.1'
def parse_timestamp(ts):
"""Parse ISO formatted timestamp"""
try:
return datetime.datetime.strptime(ts.rstrip('0').rstrip('.'), "%Y%m%dT%H%M%S.%f")
except ValueError:
return datetime.datetime.strptime(ts, "%Y%m%dT%H%M%S")
def parse_timedelta(delta):
t = datetime.datetime.strptime(delta,"%H:%M:%S")
return datetime.timedelta(hours=t.hour, minutes=t.minute, seconds=t.second)
def bulk_msg(ts, measurements, values, **tags):
ncol = len(measurements)
metric = "|".join(measurements)
sname = "+" + metric + ' ' + ' '.join(['{0}={1}'.format(key, val) for key, val in tags.iteritems()])
timestr = ts.strftime('+%Y%m%dT%H%M%S.%f')
header = "*{0}".format(ncol)
lines = [sname, timestr, header]
for val in values:
lines.append("+{0}".format(val))
return '\r\n'.join(lines) + '\r\n'
def generate_rows(ts, delta, measurements, types, **tags):
row = [10.0] * len(measurements)
out = list(row)
while True:
for i in xrange(0, len(measurements)):
row[i] += random.gauss(0.0, 0.01)
out[i] = row[i] if types[i] == 0 else int(row[i])
msg = bulk_msg(ts, measurements, out, **tags)
yield ts, msg
ts += delta
def generate_rr(iters):
N = len(iters)
ix = 0
while True:
it = iters[ix % N]
yield it.next()
ix += 1
def main(idrange, timerange):
chan = TCPChan(HOST, PORT)
begin, end, delta = timerange
idbegin, idend = idrange
measurements = [
'cpu.user', 'cpu.sys', 'cpu.real', 'idle', 'mem.commit',
'mem.virt', 'iops', 'tcp.packets.in', 'tcp.packets.out',
'tcp.ret',
]
# measurement types, 0 - float, 1 - int
types = [
0, 0, 0, 0, 1, 1, 1, 1, 1, 0
]
tag_combinations = {
'region': ['ap-southeast-1', 'us-east-1', 'eu-central-1'],
'OS': ['Ubuntu_16.04', 'Ubuntu_14.04'],
'instance-type': ['m3.large', 'm3.xlarge', 'm3.2xlarge', 'm4.large', 'm4.xlarge', 'm4.2xlarge'],
'arch': ['x64'],
'team': ['NY', 'NJ'],
'rack': range(1, 100),
}
list_hosts = ['host_{0}'.format(id) for id in range(idbegin, idend)]
tags = []
for host in list_hosts:
tagline = {'host': host}
for k, v in tag_combinations.iteritems():
tagline[k] = random.choice(v)
lmbd = generate_rows(begin, delta, measurements, types, **tagline)
for ts, msg in generate_rr([lmbd]):
if ts > end:
break
chan.send(msg)
del lmbd
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--rbegin', required=True, help='begining of the id range')
parser.add_argument('--rend', required=True, help='end of the id range')
parser.add_argument('--tbegin', required=True, help='begining of the time range')
parser.add_argument('--tend', required=True, help='end of the time range')
parser.add_argument('--tdelta', required=True, help='time step')
args = parser.parse_args()
main((int(args.rbegin), int(args.rend)), (parse_timestamp(args.tbegin), parse_timestamp(args.tend), parse_timedelta(args.tdelta)))
Launch parameters: --tbegin=20170101T000000 --tend=20170101T001000 --tdelta=00:00:10 --rbegin=0 --rend=10000000
.
With 1.8M series memory use grows up to 8GB. This happens because each time series needs 4KB to store first block of the NB+tree instance. This memory is used only slightly, because script writes only 60 values into each time-series. All this data is stored in memory and never flushed to disk.
Possible counter measures:
- Grow leaf nodes as they fill up the same way std::vector grows.
- Divide all series into active and inactive set. If series doesn't get updated for a while - move it to inactive set. Dump inactive series to disk.
- Maybe destructive writes should be used for serialized inactive time-series (last leaf node can be rewritten).
- Maybe, Akumuli should use common mechanism for inactive series and for late writes?
Experiment:
- run the akumulid server
- run the script with parameters:
--tbegin=20170101T000000 --tend=20170101T001000 --tdelta=00:00:10 --rbegin=0 --rend=100000
(create 1M new series) - restart the akumulid server
- run the script once again with parameters:
--tbegin=20170101T000000 --tend=20170101T001000 --tdelta=00:00:10 --rbegin=100000 --rend=110000
(create 100K new series) - stop the akumulid server
On the first run akumulid has eaten a lot of RAM (6.8GB). Restart forced akumulid to persist all data to disk. On the second run akumulid has loaded all the data but because tree is immutable - it used more RAM (almost twice more, one page for new leaf node and one page for new superblock). Both runs was performed correctly, no crashes, clean exit.
On disk, after the first run sqlite database size was 184MB, 3.9GB in the volumes. It took about 20s to stop the server first time (clean exit). Start time was also a bit long - 20-30s. 11.2GB of RAM was used just after start, 11.4 when it has finished. After the second run sqlite was about the same size, and volumes was 4.3GB. Start and stop times was about the same.
This test shows that series name indexing is not the bottleneck. Obviously, this can be solved by lazy column loading.