bumblebee-status/modules/core/ping.py
2020-04-12 14:44:44 +02:00

88 lines
2.9 KiB
Python

# pylint: disable=C0111,R0903
"""Periodically checks the RTT of a configurable host using ICMP echos
Requires the following executable:
* ping
Parameters:
* ping.address : IP address to check
* ping.timeout : Timeout for waiting for a reply (defaults to 5.0)
* ping.probes : Number of probes to send (defaults to 5)
* ping.warning : Threshold for warning state, in seconds (defaults to 1.0)
* ping.critical: Threshold for critical state, in seconds (defaults to 2.0)
"""
import re
import time
import threading
import core.module
import core.widget
import core.event
import core.decorators
import util.cli
def get_rtt(module, widget):
try:
widget.set('rtt-unreachable', False)
res = util.cli.execute('ping -n -q -c {} -W {} {}'.format(
widget.get('rtt-probes'), widget.get('rtt-timeout'), widget.get('address')
))
for line in res.split('\n'):
if line.startswith('{} packets transmitted'.format(widget.get('rtt-probes'))):
m = re.search(r'(\d+)% packet loss', line)
widget.set('packet-loss', m.group(1))
if not line.startswith('rtt'): continue
m = re.search(r'([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+)\s+(\S+)', line)
widget.set('rtt-min', float(m.group(1)))
widget.set('rtt-avg', float(m.group(2)))
widget.set('rtt-max', float(m.group(3)))
widget.set('rtt-unit', m.group(5))
except Exception as e:
widget.set('rtt-unreachable', True)
if widget.get('pending'):
widget.set('pending', False)
core.event.trigger('update-modules', [ module.id ], redraw_only=True)
class Module(core.module.Module):
@core.decorators.every(seconds=60)
def __init__(self, config):
widget = core.widget.Widget(self.rtt)
super().__init__(config, widget)
widget.set('address', self.parameter('address', '8.8.8.8'))
widget.set('rtt-probes', self.parameter('probes', 5))
widget.set('rtt-timeout', self.parameter('timeout', 5.0))
widget.set('rtt-avg', 0.0)
widget.set('rtt-unit', '')
widget.set('packet-loss', 0)
widget.set('pending', True)
def rtt(self, widget):
if widget.get('pending') == True:
return 'pending'
if widget.get('rtt-unreachable'):
return '{}: unreachable'.format(widget.get('address'))
return '{}: {:.1f}{} ({}%)'.format(
widget.get('address'),
widget.get('rtt-avg'),
widget.get('rtt-unit'),
widget.get('packet-loss')
)
def state(self, widget):
if widget.get('rtt-unreachable'): return ['critical']
return self.threshold_state(widget.get('rtt-avg'), 1000.0, 2000.0)
def update(self):
thread = threading.Thread(target=get_rtt, args=(self, self.widget(),))
thread.start()
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4