From 17fcf10b74fbda384e26ecb480b77c46a1a3dbc5 Mon Sep 17 00:00:00 2001 From: Jan-Philipp Litza Date: Mon, 10 Feb 2014 21:45:32 +0100 Subject: [PATCH] Offload rrdtool calls and model to separate module --- RRD.py | 343 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ rrd.py | 75 +++++-------- 2 files changed, 372 insertions(+), 46 deletions(-) create mode 100644 RRD.py diff --git a/RRD.py b/RRD.py new file mode 100644 index 0000000..d1ae870 --- /dev/null +++ b/RRD.py @@ -0,0 +1,343 @@ +import subprocess +import re +import io +import os +from tempfile import TemporaryFile +from operator import xor, eq +from functools import reduce +from itertools import starmap +import math + +class RRDIncompatibleException(Exception): + """ + Is raised when an RRD doesn't have the desired definition and cannot be + upgraded to it. + """ + pass +class RRDOutdatedException(Exception): + """ + Is raised when an RRD doesn't have the desired definition, but can be + upgraded to it. + """ + pass + +if not hasattr(__builtins__, "FileNotFoundError"): + class FileNotFoundError(Exception): + pass + +class RRD: + """ + An RRD is a Round Robin Database, a database which forgets old data and + aggregates multiple records into new ones. + + It contains multiple Data Sources (DS) which can be thought of as columns, + and Round Robin Archives (RRA) which can be thought of as tables with the + DS as columns and time-dependant rows. + """ + + # rra[2].cdp_prep[0].value = 1,8583033333e+03 + _info_regex = re.compile(""" + (?P
[a-z_]+) + \[ (?P[a-zA-Z0-9_]+) \] + \. + | + (?P[a-z_]+) + \s*=\s* + "? (?P.*?) "? + $""", re.X) + _cached_info = None + + def _exec_rrdtool(self, cmd, *args, **kwargs): + pargs = ["rrdtool", cmd, self.filename] + for k,v in kwargs.items(): + pargs.extend(["--" + k, str(v)]) + pargs.extend(args) + subprocess.check_output(pargs) + + def __init__(self, filename): + self.filename = filename + + def ensureSanity(self, ds_list, rra_list, **kwargs): + """ + Create or upgrade the RRD file if necessary to contain all DS in + ds_list. If it needs to be created, the RRAs in rra_list and any kwargs + will be used for creation. Note that RRAs and options of an existing + database are NOT modified! + """ + try: + self.checkSanity(ds_list) + except FileNotFoundError: + self.create(ds_list, rra_list, **kwargs) + except RRDOutdatedException: + self.upgrade(ds_list) + + def checkSanity(self, ds_list=()): + """ + Check if the RRD file exists and contains (at least) the DS listed in + ds_list. + """ + if not os.path.exists(self.filename): + raise FileNotFoundError(self.filename) + info = self.info() + if set(ds_list) - set(info['ds'].values()) != set(): + if set((ds.name, ds.type) for ds in ds_list) \ + - set((ds.name, ds.type) for ds in info['ds'].values()) != set(): + raise RRDIncompatibleException() + else: + raise RRDOutdatedException() + + def upgrade(self, dss): + """ + Upgrade the DS definitions (!) of this RRD. + (To update its values, use update()) + + The list dss contains DSS objects to be updated or added. The + parameters of a DS can be changed, but not its type. New DS are always + added at the end in the order of their appearance in the list. + + This is done internally via an rrdtool dump -> rrdtool restore and + modifying the dump on the fly. + """ + info = self.info() + new_ds = list(info['ds'].values()) + new_ds.sort(key=lambda ds: ds.index) + for ds in dss: + if ds.name in info['ds']: + old_ds = info['ds'][ds.name] + if info['ds'][ds.name].type != ds.type: + raise RuntimeError('Cannot convert existing DS "%s" from type "%s" to "%s"' % + (ds.name, old_ds.type, ds.type)) + ds.index = old_ds.index + new_ds[ds.index] = ds + else: + ds.index = len(new_ds) + new_ds.append(ds) + added_ds_num = len(new_ds) - len(info['ds']) + + dump = subprocess.Popen( + ["rrdtool", "dump", self.filename], + stdout=subprocess.PIPE + ) + restore = subprocess.Popen( + ["rrdtool", "restore", "-", self.filename + ".new"], + stdin=subprocess.PIPE + ) + echo = True + ds_definitions = True + for line in dump.stdout: + if ds_definitions and b'' in line: + echo = False + if b'' in line: + ds_definitions = False + for ds in new_ds: + restore.stdin.write(bytes(""" + + %s + %s + %i + %s + %s + + + %s + %s + %i + + """ % ( + ds.name, + ds.type, + ds.args[0], + ds.args[1], + ds.args[2], + ds.last_ds, + ds.value, + ds.unknown_sec) + , "utf-8")) + + if b'' in line: + restore.stdin.write(added_ds_num*b""" + + NaN + NaN + NaN + 0 + + """) + + # echoing of input line + if echo: + restore.stdin.write( + line.replace( + b'', + (added_ds_num*b'NaN')+b'' + ) + ) + + if ds_definitions and b'' in line: + echo = True + dump.stdout.close() + restore.stdin.close() + try: + dump.wait(1) + except subprocess.TimeoutExpired: + dump.kill() + try: + restore.wait(2) + except subprocess.TimeoutExpired: + dump.kill() + raise RuntimeError("rrdtool restore process killed") + + os.rename(self.filename + ".new", self.filename) + self._cached_info = None + + def create(self, ds_list, rra_list, **kwargs): + """ + Create a new RRD file with the specified list of RRAs and DSs. + + Any kwargs are passed as --key=value to rrdtool create. + """ + self._exec_rrdtool( + "create", + *map(str, rra_list + ds_list), + **kwargs + ) + self._cached_info = None + + def update(self, V): + """ + Update the RRD with new values V. + + V can be either list or dict: + * If it is a dict, its keys must be DS names in the RRD and it is + ensured that the correct DS are updated with the correct values, by + passing a "template" to rrdtool update (see man rrdupdate). + * If it is a list, no template is generated and the order of the + values in V must be the same as that of the DS in the RRD. + """ + try: + args = ['N:' + ':'.join(map(str, V.values()))] + kwargs = {'template': ':'.join(V.keys())} + except AttributeError: + args = ['N:' + ':'.join(map(str, V))] + kwargs = {} + self._exec_rrdtool("update", *args, **kwargs) + self._cached_info = None + + def info(self): + """ + Return a dictionary with information about the RRD. + + See `man rrdinfo` for more details. + """ + if self._cached_info: + return self._cached_info + env = os.environ.copy() + env["LC_ALL"] = "C" + proc = subprocess.Popen( + ["rrdtool", "info", self.filename], + stdout=subprocess.PIPE, + env=env + ) + out, err = proc.communicate() + out = out.decode() + info = {} + for line in out.splitlines(): + base = info + for match in self._info_regex.finditer(line): + section, key, name, value = match.group("section", "key", "name", "value") + if section and key: + try: + key = int(key) + except ValueError: + pass + if section not in base: + base[section] = {} + if key not in base[section]: + base[section][key] = {} + base = base[section][key] + if name and value: + try: + base[name] = int(value) + except ValueError: + try: + base[name] = float(value) + except: + base[name] = value + dss = {} + for name, ds in info['ds'].items(): + ds_obj = DS(name, ds['type'], ds['minimal_heartbeat'], ds['min'], ds['max']) + ds_obj.index = ds['index'] + ds_obj.last_ds = ds['last_ds'] + ds_obj.value = ds['value'] + ds_obj.unknown_sec = ds['unknown_sec'] + dss[name] = ds_obj + info['ds'] = dss + rras = [] + for rra in info['rra'].values(): + rras.append(RRA(rra['cf'], rra['xff'], rra['pdp_per_row'], rra['rows'])) + info['rra'] = rras + self._cached_info = info + return info + +class DS: + """ + DS stands for Data Source and represents one line of data points in a Round + Robin Database (RRD). + """ + name = None + type = None + args = [] + index = -1 + last_ds = 'U' + value = 0 + unknown_sec = 0 + def __init__(self, name, dst, *args): + self.name = name + self.type = dst + self.args = args + + def __str__(self): + return "DS:%s:%s:%s" % ( + self.name, + self.type, + ":".join(map(str, self._nan_to_U_args())) + ) + + def __repr__(self): + return "%s(%r, %r, %s)" % ( + self.__class__.__name__, + self.name, + self.type, + ", ".join(map(repr, self.args)) + ) + + def __eq__(self, other): + return all(starmap(eq, zip(self._compare_keys(), other._compare_keys()))) + + def __hash__(self): + return reduce(xor, map(hash, self._compare_keys())) + + def _nan_to_U_args(self): + return tuple( + 'U' if type(arg) is float and math.isnan(arg) + else arg + for arg in self.args + ) + + def _compare_keys(self): + return (self.name, self.type, self._nan_to_U_args()) + +class RRA: + def __init__(self, cf, *args): + self.cf = cf + self.args = args + + def __str__(self): + return "RRA:%s:%s" % (self.cf, ":".join(map(str, self.args))) + + def __repr__(self): + return "%s(%r, %s)" % ( + self.__class__.__name__, + self.cf, + ", ".join(map(repr, self.args)) + ) diff --git a/rrd.py b/rrd.py index 22aa82c..b28b2ea 100755 --- a/rrd.py +++ b/rrd.py @@ -2,6 +2,7 @@ import subprocess import time import os +from RRD import RRD, DS, RRA class rrd: def __init__( self @@ -11,7 +12,7 @@ class rrd: , displayTimeNode = "1d" ): self.dbPath = databaseDirectory - self.globalDbFile = databaseDirectory + "/nodes.rrd" + self.globalDb = RRD(databaseDirectory + "/nodes.rrd") self.imagePath = imagePath self.displayTimeGlobal = displayTimeGlobal self.displayTimeNode = displayTimeNode @@ -24,39 +25,29 @@ class rrd: except: os.mkdir(self.imagePath) - def checkAndCreateIfNeededGlobalDatabase(self): - """ Creates the global database file iff it did not exist. - """ - if not os.path.exists(self.globalDbFile): - # Create Database with rrdtool - args = ["rrdtool",'create', self.globalDbFile - ,'--start', str(round(self.currentTimeInt - 60)) - ,'--step' , '60' - # Number of nodes available - ,'DS:nodes:GAUGE:120:0:U' - # Number of client available - ,'DS:clients:GAUGE:120:0:U' - ,'RRA:AVERAGE:0.5:1:120' - ,'RRA:AVERAGE:0.5:60:744' - ,'RRA:AVERAGE:0.5:1440:1780' - ] - subprocess.call(args) - def updateGlobalDatabase(self,nodeCount,clientCount): """ Adds a new (#Nodes,#Clients) entry to the global database. """ - # Update Global RRDatabase - args = ["rrdtool",'updatev', self.globalDbFile - # #Nodes #Clients - , self.currentTime + ":"+str(nodeCount)+":"+str(clientCount) - ] - subprocess.check_output(args) + self.globalDb.ensureSanity( + [ + # Number of nodes available + DS('nodes', 'GAUGE', 120, 0, float('NaN')), + # Number of client available + DS('clients', 'GAUGE', 120, 0, float('NaN')), + ], [ + RRA('LAST', 0, 1, 44640), + RRA('LAST', 0, 60, 744), + RRA('LAST', 0, 1440, 1780), + ], + step=60, + ) + self.globalDb.update({'nodes': nodeCount, 'clients': clientCount}) def createGlobalGraph(self): nodeGraph = self.imagePath + "/" + "globalGraph.png" args = ["rrdtool", 'graph', nodeGraph, '-s', '-' + self.displayTimeGlobal, '-w', '800', '-h' '400' - ,'DEF:nodes=' + self.globalDbFile + ':nodes:AVERAGE', 'LINE1:nodes#F00:nodes\\l' - ,'DEF:clients=' + self.globalDbFile + ':clients:AVERAGE','LINE2:clients#00F:clients' + ,'DEF:nodes=' + self.globalDb.filename + ':nodes:AVERAGE', 'LINE1:nodes#F00:nodes\\l' + ,'DEF:clients=' + self.globalDb.filename + ':clients:AVERAGE','LINE2:clients#00F:clients' ] subprocess.check_output(args) @@ -67,26 +58,20 @@ class rrd: def nodeMACToPNGFile(self,nodeMAC): return self.imagePath + "/" + str(nodeMAC).replace(":","") + ".png" - def checkAndCreateIfNeededNodeDatabase(self,nodePrimaryMAC): - # TODO check for bad nodeNames - nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC); - if not os.path.exists(nodeFile): - # TODO Skalen anpassen - args = ["rrdtool",'create',nodeFile - ,'--start',str(round(self.currentTimeInt - 60)) - ,'--step' , '60' - ,'DS:upstate:GAUGE:120:0:1' - ,'DS:clients:GAUGE:120:0:200' - ,'RRA:AVERAGE:0.5:1:120' - ,'RRA:AVERAGE:0.5:5:1440' - ,'RRA:AVERAGE:0.5:60:720' - ,'RRA:AVERAGE:0.5:720:730' - ] - subprocess.check_output(args) - # Call only if node is up def updateNodeDatabase(self,nodePrimaryMAC,clientCount): + # TODO check for bad nodeNames nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC) + nodeRRD = RRD(nodeFile) + nodeRRD.ensureSanity( + [ + DS('upstate', 'GAUGE', 120, 0, 1), + DS('clients', 'GAUGE', 120, 0, float('NaN')), + ], [ + RRA('LAST', 0, 1, 44640), + ], + step=60, + ) # Update Global RRDatabase args = ["rrdtool",'updatev', nodeFile # #Upstate #Clients @@ -128,10 +113,8 @@ class rrd: elif target in nodes and not source in nodes: nodes[target].clients += 1 - self.checkAndCreateIfNeededGlobalDatabase() self.updateGlobalDatabase(len(nodes),clientCount) for mac in nodes: - self.checkAndCreateIfNeededNodeDatabase(mac) self.updateNodeDatabase(mac,nodes[mac].clients) def update_images(self):