bumblebee-status/modules/contrib/network_traffic.py

112 lines
2.9 KiB
Python
Raw Normal View History

2020-04-29 14:20:20 +00:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Displays network traffic
* No extra configuration needed
"""
import psutil
import netifaces
import core.module
import core.widget
import util.format
WIDGET_NAME = "network_traffic"
2020-04-29 14:20:20 +00:00
class Module(core.module.Module):
def __init__(self, config, theme):
widgets = [
core.widget.Widget(
module=self,
name="{0}.rx".format(WIDGET_NAME),
full_text=self.download_rate,
),
core.widget.Widget(
module=self,
name="{0}.tx".format(WIDGET_NAME),
full_text=self.upload_rate,
),
2020-04-29 14:20:20 +00:00
]
super().__init__(config, theme, widgets)
self.widgets()[0].set("theme.minwidth", "0000000KiB/s")
self.widgets()[1].set("theme.minwidth", "0000000KiB/s")
2020-04-29 14:20:20 +00:00
try:
self._bandwidth = BandwidthInfo()
self._rate_recv = "?"
self._rate_sent = "?"
2020-04-29 14:20:20 +00:00
self._bytes_recv = self._bandwidth.bytes_recv()
self._bytes_sent = self._bandwidth.bytes_sent()
except Exception:
""" We do not want do explode anything """
pass
def state(self, widget):
"""Return the widget state"""
if widget.name == "{}.rx".format(WIDGET_NAME):
return "rx"
elif widget.name == "{}.tx".format(WIDGET_NAME):
return "tx"
2020-04-29 14:20:20 +00:00
return None
def update(self):
try:
bytes_recv = self._bandwidth.bytes_recv()
bytes_sent = self._bandwidth.bytes_sent()
self._rate_recv = bytes_recv - self._bytes_recv
self._rate_sent = bytes_sent - self._bytes_sent
2020-04-29 14:20:20 +00:00
self._bytes_recv, self._bytes_sent = bytes_recv, bytes_sent
except Exception:
""" We do not want do explode anything """
pass
def download_rate(self, _):
return "{}/s".format(util.format.byte(self._rate_recv))
2020-04-29 14:20:20 +00:00
def upload_rate(self, _):
return "{}/s".format(util.format.byte(self._rate_sent))
2020-04-29 14:20:20 +00:00
class BandwidthInfo(object):
"""Get received/sent bytes from network adapter"""
def bytes_recv(self):
"""Return received bytes"""
return self.bandwidth().bytes_recv
def bytes_sent(self):
"""Return sent bytes"""
return self.bandwidth().bytes_sent
def bandwidth(self):
"""Return bandwidth information"""
io_counters = self.io_counters()
return io_counters[self.default_network_adapter()]
@classmethod
def default_network_adapter(cls):
"""Return default active network adapter"""
gateway = netifaces.gateways()["default"]
2020-04-29 14:20:20 +00:00
if not gateway:
raise "No default gateway found"
2020-04-29 14:20:20 +00:00
return gateway[netifaces.AF_INET][1]
@classmethod
def io_counters(cls):
"""Return IO counters"""
return psutil.net_io_counters(pernic=True)
2020-04-29 14:20:20 +00:00
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4