Offload rrdtool calls and model to separate module

This commit is contained in:
Jan-Philipp Litza 2014-02-10 21:45:32 +01:00
parent 791947234c
commit 17fcf10b74
2 changed files with 372 additions and 46 deletions

343
RRD.py Normal file
View file

@ -0,0 +1,343 @@
import subprocess
import re
import io
import os
from tempfile import TemporaryFile
from operator import xor, eq
from functools import reduce
from itertools import starmap
import math
class RRDIncompatibleException(Exception):
"""
Is raised when an RRD doesn't have the desired definition and cannot be
upgraded to it.
"""
pass
class RRDOutdatedException(Exception):
"""
Is raised when an RRD doesn't have the desired definition, but can be
upgraded to it.
"""
pass
if not hasattr(__builtins__, "FileNotFoundError"):
class FileNotFoundError(Exception):
pass
class RRD:
"""
An RRD is a Round Robin Database, a database which forgets old data and
aggregates multiple records into new ones.
It contains multiple Data Sources (DS) which can be thought of as columns,
and Round Robin Archives (RRA) which can be thought of as tables with the
DS as columns and time-dependant rows.
"""
# rra[2].cdp_prep[0].value = 1,8583033333e+03
_info_regex = re.compile("""
(?P<section>[a-z_]+)
\[ (?P<key>[a-zA-Z0-9_]+) \]
\.
|
(?P<name>[a-z_]+)
\s*=\s*
"? (?P<value>.*?) "?
$""", re.X)
_cached_info = None
def _exec_rrdtool(self, cmd, *args, **kwargs):
pargs = ["rrdtool", cmd, self.filename]
for k,v in kwargs.items():
pargs.extend(["--" + k, str(v)])
pargs.extend(args)
subprocess.check_output(pargs)
def __init__(self, filename):
self.filename = filename
def ensureSanity(self, ds_list, rra_list, **kwargs):
"""
Create or upgrade the RRD file if necessary to contain all DS in
ds_list. If it needs to be created, the RRAs in rra_list and any kwargs
will be used for creation. Note that RRAs and options of an existing
database are NOT modified!
"""
try:
self.checkSanity(ds_list)
except FileNotFoundError:
self.create(ds_list, rra_list, **kwargs)
except RRDOutdatedException:
self.upgrade(ds_list)
def checkSanity(self, ds_list=()):
"""
Check if the RRD file exists and contains (at least) the DS listed in
ds_list.
"""
if not os.path.exists(self.filename):
raise FileNotFoundError(self.filename)
info = self.info()
if set(ds_list) - set(info['ds'].values()) != set():
if set((ds.name, ds.type) for ds in ds_list) \
- set((ds.name, ds.type) for ds in info['ds'].values()) != set():
raise RRDIncompatibleException()
else:
raise RRDOutdatedException()
def upgrade(self, dss):
"""
Upgrade the DS definitions (!) of this RRD.
(To update its values, use update())
The list dss contains DSS objects to be updated or added. The
parameters of a DS can be changed, but not its type. New DS are always
added at the end in the order of their appearance in the list.
This is done internally via an rrdtool dump -> rrdtool restore and
modifying the dump on the fly.
"""
info = self.info()
new_ds = list(info['ds'].values())
new_ds.sort(key=lambda ds: ds.index)
for ds in dss:
if ds.name in info['ds']:
old_ds = info['ds'][ds.name]
if info['ds'][ds.name].type != ds.type:
raise RuntimeError('Cannot convert existing DS "%s" from type "%s" to "%s"' %
(ds.name, old_ds.type, ds.type))
ds.index = old_ds.index
new_ds[ds.index] = ds
else:
ds.index = len(new_ds)
new_ds.append(ds)
added_ds_num = len(new_ds) - len(info['ds'])
dump = subprocess.Popen(
["rrdtool", "dump", self.filename],
stdout=subprocess.PIPE
)
restore = subprocess.Popen(
["rrdtool", "restore", "-", self.filename + ".new"],
stdin=subprocess.PIPE
)
echo = True
ds_definitions = True
for line in dump.stdout:
if ds_definitions and b'<ds>' in line:
echo = False
if b'<!-- Round Robin Archives -->' in line:
ds_definitions = False
for ds in new_ds:
restore.stdin.write(bytes("""
<ds>
<name> %s </name>
<type> %s </type>
<minimal_heartbeat>%i</minimal_heartbeat>
<min>%s</min>
<max>%s</max>
<!-- PDP Status -->
<last_ds>%s</last_ds>
<value>%s</value>
<unknown_sec> %i </unknown_sec>
</ds>
""" % (
ds.name,
ds.type,
ds.args[0],
ds.args[1],
ds.args[2],
ds.last_ds,
ds.value,
ds.unknown_sec)
, "utf-8"))
if b'</cdp_prep>' in line:
restore.stdin.write(added_ds_num*b"""
<ds>
<primary_value> NaN </primary_value>
<secondary_value> NaN </secondary_value>
<value> NaN </value>
<unknown_datapoints> 0 </unknown_datapoints>
</ds>
""")
# echoing of input line
if echo:
restore.stdin.write(
line.replace(
b'</row>',
(added_ds_num*b'<v>NaN</v>')+b'</row>'
)
)
if ds_definitions and b'</ds>' in line:
echo = True
dump.stdout.close()
restore.stdin.close()
try:
dump.wait(1)
except subprocess.TimeoutExpired:
dump.kill()
try:
restore.wait(2)
except subprocess.TimeoutExpired:
dump.kill()
raise RuntimeError("rrdtool restore process killed")
os.rename(self.filename + ".new", self.filename)
self._cached_info = None
def create(self, ds_list, rra_list, **kwargs):
"""
Create a new RRD file with the specified list of RRAs and DSs.
Any kwargs are passed as --key=value to rrdtool create.
"""
self._exec_rrdtool(
"create",
*map(str, rra_list + ds_list),
**kwargs
)
self._cached_info = None
def update(self, V):
"""
Update the RRD with new values V.
V can be either list or dict:
* If it is a dict, its keys must be DS names in the RRD and it is
ensured that the correct DS are updated with the correct values, by
passing a "template" to rrdtool update (see man rrdupdate).
* If it is a list, no template is generated and the order of the
values in V must be the same as that of the DS in the RRD.
"""
try:
args = ['N:' + ':'.join(map(str, V.values()))]
kwargs = {'template': ':'.join(V.keys())}
except AttributeError:
args = ['N:' + ':'.join(map(str, V))]
kwargs = {}
self._exec_rrdtool("update", *args, **kwargs)
self._cached_info = None
def info(self):
"""
Return a dictionary with information about the RRD.
See `man rrdinfo` for more details.
"""
if self._cached_info:
return self._cached_info
env = os.environ.copy()
env["LC_ALL"] = "C"
proc = subprocess.Popen(
["rrdtool", "info", self.filename],
stdout=subprocess.PIPE,
env=env
)
out, err = proc.communicate()
out = out.decode()
info = {}
for line in out.splitlines():
base = info
for match in self._info_regex.finditer(line):
section, key, name, value = match.group("section", "key", "name", "value")
if section and key:
try:
key = int(key)
except ValueError:
pass
if section not in base:
base[section] = {}
if key not in base[section]:
base[section][key] = {}
base = base[section][key]
if name and value:
try:
base[name] = int(value)
except ValueError:
try:
base[name] = float(value)
except:
base[name] = value
dss = {}
for name, ds in info['ds'].items():
ds_obj = DS(name, ds['type'], ds['minimal_heartbeat'], ds['min'], ds['max'])
ds_obj.index = ds['index']
ds_obj.last_ds = ds['last_ds']
ds_obj.value = ds['value']
ds_obj.unknown_sec = ds['unknown_sec']
dss[name] = ds_obj
info['ds'] = dss
rras = []
for rra in info['rra'].values():
rras.append(RRA(rra['cf'], rra['xff'], rra['pdp_per_row'], rra['rows']))
info['rra'] = rras
self._cached_info = info
return info
class DS:
"""
DS stands for Data Source and represents one line of data points in a Round
Robin Database (RRD).
"""
name = None
type = None
args = []
index = -1
last_ds = 'U'
value = 0
unknown_sec = 0
def __init__(self, name, dst, *args):
self.name = name
self.type = dst
self.args = args
def __str__(self):
return "DS:%s:%s:%s" % (
self.name,
self.type,
":".join(map(str, self._nan_to_U_args()))
)
def __repr__(self):
return "%s(%r, %r, %s)" % (
self.__class__.__name__,
self.name,
self.type,
", ".join(map(repr, self.args))
)
def __eq__(self, other):
return all(starmap(eq, zip(self._compare_keys(), other._compare_keys())))
def __hash__(self):
return reduce(xor, map(hash, self._compare_keys()))
def _nan_to_U_args(self):
return tuple(
'U' if type(arg) is float and math.isnan(arg)
else arg
for arg in self.args
)
def _compare_keys(self):
return (self.name, self.type, self._nan_to_U_args())
class RRA:
def __init__(self, cf, *args):
self.cf = cf
self.args = args
def __str__(self):
return "RRA:%s:%s" % (self.cf, ":".join(map(str, self.args)))
def __repr__(self):
return "%s(%r, %s)" % (
self.__class__.__name__,
self.cf,
", ".join(map(repr, self.args))
)

75
rrd.py
View file

@ -2,6 +2,7 @@
import subprocess import subprocess
import time import time
import os import os
from RRD import RRD, DS, RRA
class rrd: class rrd:
def __init__( self def __init__( self
@ -11,7 +12,7 @@ class rrd:
, displayTimeNode = "1d" , displayTimeNode = "1d"
): ):
self.dbPath = databaseDirectory self.dbPath = databaseDirectory
self.globalDbFile = databaseDirectory + "/nodes.rrd" self.globalDb = RRD(databaseDirectory + "/nodes.rrd")
self.imagePath = imagePath self.imagePath = imagePath
self.displayTimeGlobal = displayTimeGlobal self.displayTimeGlobal = displayTimeGlobal
self.displayTimeNode = displayTimeNode self.displayTimeNode = displayTimeNode
@ -24,39 +25,29 @@ class rrd:
except: except:
os.mkdir(self.imagePath) os.mkdir(self.imagePath)
def checkAndCreateIfNeededGlobalDatabase(self):
""" Creates the global database file iff it did not exist.
"""
if not os.path.exists(self.globalDbFile):
# Create Database with rrdtool
args = ["rrdtool",'create', self.globalDbFile
,'--start', str(round(self.currentTimeInt - 60))
,'--step' , '60'
# Number of nodes available
,'DS:nodes:GAUGE:120:0:U'
# Number of client available
,'DS:clients:GAUGE:120:0:U'
,'RRA:AVERAGE:0.5:1:120'
,'RRA:AVERAGE:0.5:60:744'
,'RRA:AVERAGE:0.5:1440:1780'
]
subprocess.call(args)
def updateGlobalDatabase(self,nodeCount,clientCount): def updateGlobalDatabase(self,nodeCount,clientCount):
""" Adds a new (#Nodes,#Clients) entry to the global database. """ Adds a new (#Nodes,#Clients) entry to the global database.
""" """
# Update Global RRDatabase self.globalDb.ensureSanity(
args = ["rrdtool",'updatev', self.globalDbFile [
# #Nodes #Clients # Number of nodes available
, self.currentTime + ":"+str(nodeCount)+":"+str(clientCount) DS('nodes', 'GAUGE', 120, 0, float('NaN')),
] # Number of client available
subprocess.check_output(args) DS('clients', 'GAUGE', 120, 0, float('NaN')),
], [
RRA('LAST', 0, 1, 44640),
RRA('LAST', 0, 60, 744),
RRA('LAST', 0, 1440, 1780),
],
step=60,
)
self.globalDb.update({'nodes': nodeCount, 'clients': clientCount})
def createGlobalGraph(self): def createGlobalGraph(self):
nodeGraph = self.imagePath + "/" + "globalGraph.png" nodeGraph = self.imagePath + "/" + "globalGraph.png"
args = ["rrdtool", 'graph', nodeGraph, '-s', '-' + self.displayTimeGlobal, '-w', '800', '-h' '400' args = ["rrdtool", 'graph', nodeGraph, '-s', '-' + self.displayTimeGlobal, '-w', '800', '-h' '400'
,'DEF:nodes=' + self.globalDbFile + ':nodes:AVERAGE', 'LINE1:nodes#F00:nodes\\l' ,'DEF:nodes=' + self.globalDb.filename + ':nodes:AVERAGE', 'LINE1:nodes#F00:nodes\\l'
,'DEF:clients=' + self.globalDbFile + ':clients:AVERAGE','LINE2:clients#00F:clients' ,'DEF:clients=' + self.globalDb.filename + ':clients:AVERAGE','LINE2:clients#00F:clients'
] ]
subprocess.check_output(args) subprocess.check_output(args)
@ -67,26 +58,20 @@ class rrd:
def nodeMACToPNGFile(self,nodeMAC): def nodeMACToPNGFile(self,nodeMAC):
return self.imagePath + "/" + str(nodeMAC).replace(":","") + ".png" return self.imagePath + "/" + str(nodeMAC).replace(":","") + ".png"
def checkAndCreateIfNeededNodeDatabase(self,nodePrimaryMAC):
# TODO check for bad nodeNames
nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC);
if not os.path.exists(nodeFile):
# TODO Skalen anpassen
args = ["rrdtool",'create',nodeFile
,'--start',str(round(self.currentTimeInt - 60))
,'--step' , '60'
,'DS:upstate:GAUGE:120:0:1'
,'DS:clients:GAUGE:120:0:200'
,'RRA:AVERAGE:0.5:1:120'
,'RRA:AVERAGE:0.5:5:1440'
,'RRA:AVERAGE:0.5:60:720'
,'RRA:AVERAGE:0.5:720:730'
]
subprocess.check_output(args)
# Call only if node is up # Call only if node is up
def updateNodeDatabase(self,nodePrimaryMAC,clientCount): def updateNodeDatabase(self,nodePrimaryMAC,clientCount):
# TODO check for bad nodeNames
nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC) nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC)
nodeRRD = RRD(nodeFile)
nodeRRD.ensureSanity(
[
DS('upstate', 'GAUGE', 120, 0, 1),
DS('clients', 'GAUGE', 120, 0, float('NaN')),
], [
RRA('LAST', 0, 1, 44640),
],
step=60,
)
# Update Global RRDatabase # Update Global RRDatabase
args = ["rrdtool",'updatev', nodeFile args = ["rrdtool",'updatev', nodeFile
# #Upstate #Clients # #Upstate #Clients
@ -128,10 +113,8 @@ class rrd:
elif target in nodes and not source in nodes: elif target in nodes and not source in nodes:
nodes[target].clients += 1 nodes[target].clients += 1
self.checkAndCreateIfNeededGlobalDatabase()
self.updateGlobalDatabase(len(nodes),clientCount) self.updateGlobalDatabase(len(nodes),clientCount)
for mac in nodes: for mac in nodes:
self.checkAndCreateIfNeededNodeDatabase(mac)
self.updateNodeDatabase(mac,nodes[mac].clients) self.updateNodeDatabase(mac,nodes[mac].clients)
def update_images(self): def update_images(self):