# pylint: disable=C0111,R0903 """Periodically checks the RTT of a configurable host using ICMP echos Requires the following executable: * ping Parameters: * ping.address : IP address to check * ping.timeout : Timeout for waiting for a reply (defaults to 5.0) * ping.probes : Number of probes to send (defaults to 5) * ping.warning : Threshold for warning state, in seconds (defaults to 1.0) * ping.critical: Threshold for critical state, in seconds (defaults to 2.0) """ import re import time import threading import core.module import core.widget import core.event import core.decorators import util.cli def get_rtt(module, widget): try: widget.set('rtt-unreachable', False) res = util.cli.execute('ping -n -q -c {} -W {} {}'.format( widget.get('rtt-probes'), widget.get('rtt-timeout'), widget.get('address') )) for line in res.split('\n'): if line.startswith('{} packets transmitted'.format(widget.get('rtt-probes'))): m = re.search(r'(\d+)% packet loss', line) widget.set('packet-loss', m.group(1)) if not line.startswith('rtt'): continue m = re.search(r'([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+)\s+(\S+)', line) widget.set('rtt-min', float(m.group(1))) widget.set('rtt-avg', float(m.group(2))) widget.set('rtt-max', float(m.group(3))) widget.set('rtt-unit', m.group(5)) except Exception as e: widget.set('rtt-unreachable', True) core.event.trigger('update', [ module.id ], redraw_only=True) class Module(core.module.Module): @core.decorators.every(seconds=60) def __init__(self, config): widget = core.widget.Widget(self.rtt) super().__init__(config, widget) widget.set('address', self.parameter('address', '8.8.8.8')) widget.set('rtt-probes', self.parameter('probes', 5)) widget.set('rtt-timeout', self.parameter('timeout', 5.0)) widget.set('rtt-avg', 0.0) widget.set('rtt-unit', '') widget.set('packet-loss', 0) def rtt(self, widget): if widget.get('rtt-unreachable'): return '{}: unreachable'.format(widget.get('address')) return '{}: {:.1f}{} ({}%)'.format( widget.get('address'), widget.get('rtt-avg'), widget.get('rtt-unit'), widget.get('packet-loss') ) def state(self, widget): if widget.get('rtt-unreachable'): return ['critical'] return self.threshold_state(widget.get('rtt-avg'), 1000.0, 2000.0) def update(self): thread = threading.Thread(target=get_rtt, args=(self, self.widget(),)) thread.start() # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4