[modules] Re-add module nic
This commit is contained in:
parent
7420434358
commit
533b8ca0cc
8 changed files with 134 additions and 8 deletions
|
@ -43,7 +43,7 @@ class Module(core.input.Object):
|
|||
self.update()
|
||||
except Exception as e:
|
||||
module = Error(self._config, 'error', str(e))
|
||||
self._widgets = [module.widgets()[0]]
|
||||
self._widgets = [module.widget()]
|
||||
self.update = module.update
|
||||
|
||||
def name(self):
|
||||
|
@ -55,6 +55,13 @@ class Module(core.input.Object):
|
|||
def widgets(self):
|
||||
return self._widgets
|
||||
|
||||
def widget(self, name=None):
|
||||
if not name: return self.widgets()[0]
|
||||
|
||||
for w in self.widgets():
|
||||
if w.name() == name: return w
|
||||
return None
|
||||
|
||||
def state(self, widget):
|
||||
return []
|
||||
|
||||
|
|
|
@ -3,10 +3,14 @@ import core.decorators
|
|||
import util.store
|
||||
|
||||
class Widget(util.store.Store, core.input.Object):
|
||||
def __init__(self, full_text):
|
||||
def __init__(self, full_text='', name=None, module=None):
|
||||
super(Widget, self).__init__()
|
||||
self._full_text = full_text
|
||||
self._module = None
|
||||
self._module = module
|
||||
self._name = name
|
||||
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
def full_text(self, value=None):
|
||||
if value:
|
||||
|
|
114
modules/nic.py
Normal file
114
modules/nic.py
Normal file
|
@ -0,0 +1,114 @@
|
|||
#pylint: disable=C0111,R0903
|
||||
|
||||
'''Displays the name, IP address(es) and status of each available network interface.
|
||||
|
||||
Requires the following python module:
|
||||
* netifaces
|
||||
|
||||
Parameters:
|
||||
* nic.exclude: Comma-separated list of interface prefixes to exclude (defaults to 'lo,virbr,docker,vboxnet,veth,br')
|
||||
* nic.include: Comma-separated list of interfaces to include
|
||||
* nic.states: Comma-separated list of states to show (prefix with '^' to invert - i.e. ^down -> show all devices that are not in state down)
|
||||
* nic.format: Format string (defaults to '{intf} {state} {ip} {ssid}')
|
||||
'''
|
||||
|
||||
import shutil
|
||||
import netifaces
|
||||
import subprocess
|
||||
|
||||
import core.widget
|
||||
import core.module
|
||||
import util.cli
|
||||
import util.format
|
||||
|
||||
class Module(core.module.Module):
|
||||
def __init__(self, config):
|
||||
widgets = []
|
||||
super().__init__(config, widgets)
|
||||
self._exclude = tuple(filter(len, self.parameter('exclude', 'lo,virbr,docker,vboxnet,veth,br').split(',')))
|
||||
self._include = self.parameter('include', '').split(',')
|
||||
|
||||
self._states = { 'include': [], 'exclude': [] }
|
||||
for state in tuple(filter(len, util.format.aslist(self.parameter('states', '')))):
|
||||
if state[0] == '^':
|
||||
self._states['exclude'].append(state[1:])
|
||||
else:
|
||||
self._states['include'].append(state)
|
||||
self._format = self.parameter('format','{intf} {state} {ip} {ssid}');
|
||||
self.iwgetid = shutil.which('iwgetid')
|
||||
self._update_widgets(widgets)
|
||||
|
||||
def update(self):
|
||||
self._update_widgets(self.widgets())
|
||||
|
||||
def state(self, widget):
|
||||
states = []
|
||||
|
||||
if widget.get('state') == 'down':
|
||||
states.append('critical')
|
||||
elif widget.get('state') != 'up':
|
||||
states.append('warning')
|
||||
|
||||
intf = widget.get('intf')
|
||||
iftype = 'wireless' if self._iswlan(intf) else 'wired'
|
||||
iftype = 'tunnel' if self._istunnel(intf) else iftype
|
||||
|
||||
states.append('{}-{}'.format(iftype, widget.get('state')))
|
||||
|
||||
return states
|
||||
|
||||
def _iswlan(self, intf):
|
||||
# wifi, wlan, wlp, seems to work for me
|
||||
if intf.startswith('w'): return True
|
||||
return False
|
||||
|
||||
def _istunnel(self, intf):
|
||||
return intf.startswith('tun') or intf.startswith('wg')
|
||||
|
||||
def get_addresses(self, intf):
|
||||
retval = []
|
||||
try:
|
||||
for ip in netifaces.ifaddresses(intf).get(netifaces.AF_INET, []):
|
||||
if ip.get('addr', '') != '':
|
||||
retval.append(ip.get('addr'))
|
||||
except Exception:
|
||||
return []
|
||||
return retval
|
||||
|
||||
def _update_widgets(self, widgets):
|
||||
interfaces = [i for i in netifaces.interfaces() if not i.startswith(self._exclude)]
|
||||
interfaces.extend([i for i in netifaces.interfaces() if i in self._include])
|
||||
|
||||
for widget in widgets:
|
||||
widget.set('visited', False)
|
||||
|
||||
for intf in interfaces:
|
||||
addr = []
|
||||
state = 'down'
|
||||
for ip in self.get_addresses(intf):
|
||||
addr.append(ip)
|
||||
state = 'up'
|
||||
|
||||
if len(self._states['exclude']) > 0 and state in self._states['exclude']: continue
|
||||
if len(self._states['include']) > 0 and state not in self._states['include']: continue
|
||||
|
||||
widget = self.widget(intf)
|
||||
if not widget:
|
||||
widget = core.widget.Widget(name=intf, module=self)
|
||||
widgets.append(widget)
|
||||
# join/split is used to get rid of multiple whitespaces (in case SSID is not available, for instance
|
||||
widget.full_text(' '.join(self._format.format(ip=', '.join(addr),intf=intf,state=state,ssid=self.get_ssid(intf)).split()))
|
||||
widget.set('intf', intf)
|
||||
widget.set('state', state)
|
||||
widget.set('visited', True)
|
||||
|
||||
for widget in widgets:
|
||||
if widget.get('visited') is False:
|
||||
widgets.remove(widget)
|
||||
|
||||
def get_ssid(self, intf):
|
||||
if self._iswlan(intf) and self.iwgetid:
|
||||
return util.cli.execute('{} -r {}'.format(self.iwgetid, intf), ignore_errors=True)
|
||||
return ''
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -18,7 +18,7 @@ class TestModule(core.module.Module):
|
|||
class config(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.module = TestModule()
|
||||
self.widget = self.module.widgets()[0]
|
||||
self.widget = self.module.widget()
|
||||
self.width = 10
|
||||
self.module.set('width', self.width)
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ class module(unittest.TestCase):
|
|||
cfg = core.config.Config(shlex.split('-p test_module.foo=5'))
|
||||
module = core.module.Error(cfg, 'test-mod', 'xyz')
|
||||
self.assertEqual(['critical'], module.state(None), 'error module must have critical state')
|
||||
full_text = module.full_text(module.widgets()[0])
|
||||
full_text = module.full_text(module.widget())
|
||||
self.assertTrue('test-mod' in full_text)
|
||||
self.assertTrue('xyz' in full_text)
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ class i3(unittest.TestCase):
|
|||
|
||||
def test_padding(self):
|
||||
self.i3.theme(self.paddedTheme)
|
||||
result = self.i3.__pad(self.someModule, self.someModule.widgets()[0], 'abc')
|
||||
result = self.i3.__pad(self.someModule, self.someModule.widget(), 'abc')
|
||||
self.assertEqual(' abc ', result)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
||||
|
|
|
@ -11,6 +11,6 @@ class kernel(unittest.TestCase):
|
|||
|
||||
def test_full_text(self):
|
||||
self.assertEqual(1, len(self.module.widgets()))
|
||||
self.assertEqual(self.someKernel, self.module.widgets()[0].full_text())
|
||||
self.assertEqual(self.someKernel, self.module.widget().full_text())
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import logging
|
||||
|
@ -15,6 +16,6 @@ def execute(cmd, wait=True, ignore_errors=False):
|
|||
if proc.returncode != 0 and not ignore_errors:
|
||||
raise RuntimeError('{} exited with {}'.format(cmd, proc.returncode))
|
||||
return out.decode('utf-8')
|
||||
return ''
|
||||
return ''
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
||||
|
|
Loading…
Reference in a new issue