bumblebee-status/bumblebee/modules/network_traffic.py

115 lines
3.2 KiB
Python
Raw Normal View History

2019-05-14 01:32:16 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Displays network traffic
No extra configuratio needed.
"""
import psutil
import netifaces
import bumblebee.input
import bumblebee.output
import bumblebee.engine
2019-05-14 01:14:13 +02:00
import bumblebee.util
WIDGET_NAME = 'network_traffic'
class Module(bumblebee.engine.Module):
"""Bumblebee main module """
def __init__(self, engine, config):
2019-05-14 01:14:13 +02:00
super(Module, self).__init__(engine, config)
try:
self._bandwidth = BandwidthInfo()
self._download_tx = self._bandwidth.bytes_recv()
self._upload_tx = self._bandwidth.bytes_sent()
except Exception:
""" We do not want do explode anything """
pass
@classmethod
def state(cls, widget):
"""Return the widget state"""
if widget.name == '{}.rx'.format(WIDGET_NAME):
return 'rx'
elif widget.name == '{}.tx'.format(WIDGET_NAME):
return 'tx'
return None
def update(self, widgets):
try:
download_tx = self._bandwidth.bytes_recv()
upload_tx = self._bandwidth.bytes_sent()
2019-05-14 01:14:13 +02:00
download_rate = (download_tx - self._download_tx)
upload_rate = (upload_tx - self._upload_tx)
2019-05-14 01:14:13 +02:00
self.update_widgets(widgets, download_rate, upload_rate)
self._download_tx, self._upload_tx = download_tx, upload_tx
except Exception:
""" We do not want do explode anything """
pass
@classmethod
def update_widgets(cls, widgets, download_rate, upload_rate):
"""Update tx/rx widgets with new rates"""
2019-05-14 01:14:13 +02:00
del widgets[:]
widgets.extend((
TrafficWidget(text=download_rate, direction='tx'),
TrafficWidget(text=upload_rate, direction='rx')
2019-05-14 01:14:13 +02:00
))
2019-05-14 01:53:31 +02:00
class BandwidthInfo(object):
"""Get received/sent bytes from network adapter"""
2019-05-14 01:14:13 +02:00
def bytes_recv(self):
"""Return received bytes"""
2019-05-14 01:14:13 +02:00
return self.bandwidth().bytes_recv
def bytes_sent(self):
"""Return sent bytes"""
2019-05-14 01:14:13 +02:00
return self.bandwidth().bytes_sent
def bandwidth(self):
"""Return bandwidth information"""
2019-05-14 01:14:13 +02:00
io_counters = self.io_counters()
return io_counters[self.default_network_adapter()]
@classmethod
def default_network_adapter(cls):
"""Return default active network adapter"""
gateway = netifaces.gateways()['default']
if not gateway:
raise 'No default gateway found'
return gateway[netifaces.AF_INET][1]
2019-05-14 01:14:13 +02:00
@classmethod
def io_counters(cls):
"""Return IO counters"""
2019-05-14 01:14:13 +02:00
return psutil.net_io_counters(pernic=True)
2019-05-14 01:53:31 +02:00
class TrafficWidget(object):
"""Create a traffic widget with humanized bytes string with proper icon (up/down)"""
def __new__(cls, text, direction):
widget = bumblebee.output.Widget(name='{0}.{1}'.format(WIDGET_NAME, direction))
widget.set('theme.minwidth', '0000000KiB/s')
widget.full_text(cls.humanize(text))
2019-05-14 01:14:13 +02:00
return widget
2019-05-14 01:14:13 +02:00
@staticmethod
def humanize(text):
"""Return humanized bytes"""
2019-05-14 01:14:13 +02:00
humanized_byte_format = bumblebee.util.bytefmt(text)
return '{0}/s'.format(humanized_byte_format)