[modules] Separate modules into core & contrib

Also, improve errors when importing a module fails.

Also, add more tests.
This commit is contained in:
Tobias Witek 2020-03-06 14:14:34 +01:00
parent 47950240d0
commit efc2e4f94e
15 changed files with 38 additions and 10 deletions

0
modules/core/__init__.py Normal file
View file

View file

@ -0,0 +1,175 @@
# pylint: disable=C0111,R0903
'''Displays volume and mute status and controls for PulseAudio devices. Use wheel up and down to change volume, left click mutes, right click opens pavucontrol.
Aliases: pasink (use this to control output instead of input), pasource
Parameters:
* pulseaudio.autostart: If set to 'true' (default is 'false'), automatically starts the pulseaudio daemon if it is not running
* pulseaudio.percent_change: How much to change volume by when scrolling on the module (default is 2%)
* pulseaudio.limit: Upper limit for setting the volume (default is 0%, which means 'no limit')
Note: If the left and right channels have different volumes, the limit might not be reached exactly.
* pulseaudio.showbars: 1 for showing volume bars, requires --markup=pango;
0 for not showing volume bars (default)
Requires the following executable:
* pulseaudio
* pactl
* pavucontrol
'''
import re
import logging
import core.module
import core.widget
import core.input
import util.cli
import util.graph
import util.format
class Module(core.module.Module):
def __init__(self, config, channel):
super().__init__(config, core.widget.Widget(self.volume))
if util.format.asbool(self.parameter('autostart', False)):
util.cli.execute('pulseaudio --start', ignore_errors=True)
self._change = util.format.asint(self.parameter('percent_change', '2%').strip('%'), 0, 100)
self._limit = util.format.asint(self.parameter('limit', '0%').strip('%'), 0)
self._left = 0
self._right = 0
self._mono = 0
self._mute = False
self._failed = False
self._channel = channel
self._showbars = util.format.asbool(self.parameter('showbars', 0))
self._patterns = [
{'expr': 'Name:', 'callback': (lambda line: False)},
{'expr': 'Mute:', 'callback': (lambda line: self.mute(False if ' no' in line.lower() else True))},
{'expr': 'Volume:', 'callback': self.getvolume},
]
core.input.register(self, button=core.input.RIGHT_MOUSE, cmd='pavucontrol')
events = [
{'type': 'mute', 'action': self.toggle, 'button': core.input.LEFT_MOUSE},
{'type': 'volume', 'action': self.increase_volume, 'button': core.input.WHEEL_UP},
{'type': 'volume', 'action': self.decrease_volume, 'button': core.input.WHEEL_DOWN},
]
for event in events:
core.input.register(self, button=event['button'], cmd=event['action'])
def set_volume(self, amount):
util.cli.execute('pactl set-{}-{} @DEFAULT_{}@ {}'.format(
self._channel, 'volume', self._channel.upper(), amount))
def increase_volume(self, event):
if self._limit > 0: # we need to check the limit
left = int(self._left)
right = int(self._right)
if left + self._change >= self._limit or right + self._change >= self._limit:
if left == right:
# easy case, just set to limit
self.set_volume('{}%'.format(self._limit))
return
else:
# don't adjust anymore, since i don't know how to update only one channel
return
self.set_volume('+{}%'.format(self._change))
def decrease_volume(self, event):
self.set_volume('-{}%'.format(self._change))
def toggle(self, event):
util.cli.execute('pactl set-{}-mute @DEFAULT_{}@ toggle'.format(
self._channel, self._channel.upper()))
def mute(self, value):
self._mute = value
def getvolume(self, line):
if 'mono' in line:
m = re.search(r'mono:.*\s*\/\s*(\d+)%', line)
if m:
self._mono = m.group(1)
else:
m = re.search(r'left:.*\s*\/\s*(\d+)%.*right:.*\s*\/\s*(\d+)%', line)
if m:
self._left = m.group(1)
self._right = m.group(2)
def _default_device(self):
output = util.cli.execute('pactl info')
pattern = 'Default {}: '.format('Sink' if self._channel == 'sink' else 'Source')
for line in output.split('\n'):
if line.startswith(pattern):
return line.replace(pattern, '')
logging.error('no pulseaudio device found')
return 'n/a'
def volume(self, widget):
if self._failed == True:
return 'n/a'
if int(self._mono) > 0:
vol = '{}%'.format(self._mono)
if self._showbars:
vol = '{} {}'.format(
vol, util.graph.hbar(float(self._mono)))
return vol
elif self._left == self._right:
vol = '{}%'.format(self._left)
if self._showbars:
vol = '{} {}'.format(
vol, util.graph.hbar(float(self._left)))
return vol
else:
vol = '{}%/{}%'.format(self._left, self._right)
if self._showbars:
vol = '{} {}{}'.format(
vol,
util.graph.hbar(float(self._left)),
util.graph.hbar(float(self._right)))
return vol
def update(self):
try:
self._failed = False
channel = 'sinks' if self._channel == 'sink' else 'sources'
device = self._default_device()
result = util.cli.execute('pactl list {}'.format(channel))
found = False
for line in result.split('\n'):
if 'Name: {}'.format(device) in line:
found = True
continue
if found is False:
continue
for pattern in self._patterns:
if not pattern['expr'] in line:
continue
if pattern['callback'](line) is False and found == True:
return
except Exception as e:
self._failed = True
logging.exception(e)
if util.format.asbool(self.parameter('autostart', False)):
util.cli.execute('pulseaudio --start', ignore_errors=True)
else:
raise e
def state(self, widget):
if self._mute:
return ['warning', 'muted']
if int(self._left) > int(100):
return ['critical', 'unmuted']
return ['unmuted']
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

19
modules/core/date.py Normal file
View file

@ -0,0 +1,19 @@
# pylint: disable=C0111,R0903
'''Displays the current date and time.
Parameters:
* date.format: strftime()-compatible formatting string
* date.locale: locale to use rather than the system default
'''
from .datetime import Module
class Module(Module):
def __init__(self, config):
super().__init__(config)
def default_format(self):
return '%x'
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

43
modules/core/datetime.py Normal file
View file

@ -0,0 +1,43 @@
# pylint: disable=C0111,R0903
'''Displays the current date and time.
Parameters:
* datetime.format: strftime()-compatible formatting string
* datetime.locale: locale to use rather than the system default
'''
from __future__ import absolute_import
import datetime
import locale
import core.module
import core.widget
import core.input
class Module(core.module.Module):
def __init__(self, config):
super().__init__(config, core.widget.Widget(self.full_text))
core.input.register(self, button=core.input.LEFT_MOUSE, cmd='calendar')
self._fmt = self.parameter('format', self.default_format())
l = locale.getdefaultlocale()
if not l or l == (None, None):
l = ('en_US', 'UTF-8')
lcl = self.parameter('locale', '.'.join(l))
try:
locale.setlocale(locale.LC_TIME, lcl.split('.'))
except Exception as e:
locale.setlocale(locale.LC_TIME, ('en_US', 'UTF-8'))
def default_format(self):
return '%x %X'
def full_text(self, widget):
enc = locale.getpreferredencoding()
retval = datetime.datetime.now().strftime(self._fmt)
if hasattr(retval, 'decode'):
return retval.decode(enc)
return retval
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

14
modules/core/kernel.py Normal file
View file

@ -0,0 +1,14 @@
# pylint: disable=C0111,R0903
"""Shows Linux kernel version information"""
import platform
import core.module
import core.widget
class Module(core.module.Module):
def __init__(self, config=None):
super().__init__(config, core.widget.Widget(platform.release()))
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

38
modules/core/load.py Normal file
View file

@ -0,0 +1,38 @@
# pylint: disable=C0111,R0903
'''Displays system load.
Parameters:
* load.warning : Warning threshold for the one-minute load average (defaults to 70% of the number of CPUs)
* load.critical: Critical threshold for the one-minute load average (defaults to 80% of the number of CPUs)
'''
import os
import multiprocessing
import core.module
import core.input
class Module(core.module.Module):
def __init__(self, config):
super().__init__(config, core.widget.Widget(self.load))
self._load = [0, 0, 0]
try:
self._cpus = multiprocessing.cpu_count()
except NotImplementedError as e:
self._cpus = 1
core.input.register(self, button=core.input.LEFT_MOUSE,
cmd='gnome-system-monitor')
def load(self, widget):
return '{:.02f}/{:.02f}/{:.02f}'.format(
self._load[0], self._load[1], self._load[2]
)
def update(self):
self._load = os.getloadavg()
def state(self, widget):
return self.threshold_state(self._load[0], self._cpus*0.7, self._cpus*0.8)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

114
modules/core/nic.py Normal file
View file

@ -0,0 +1,114 @@
#pylint: disable=C0111,R0903
'''Displays the name, IP address(es) and status of each available network interface.
Requires the following python module:
* netifaces
Parameters:
* nic.exclude: Comma-separated list of interface prefixes to exclude (defaults to 'lo,virbr,docker,vboxnet,veth,br')
* nic.include: Comma-separated list of interfaces to include
* nic.states: Comma-separated list of states to show (prefix with '^' to invert - i.e. ^down -> show all devices that are not in state down)
* nic.format: Format string (defaults to '{intf} {state} {ip} {ssid}')
'''
import shutil
import netifaces
import subprocess
import core.widget
import core.module
import util.cli
import util.format
class Module(core.module.Module):
def __init__(self, config):
widgets = []
super().__init__(config, widgets)
self._exclude = tuple(filter(len, self.parameter('exclude', 'lo,virbr,docker,vboxnet,veth,br').split(',')))
self._include = self.parameter('include', '').split(',')
self._states = { 'include': [], 'exclude': [] }
for state in tuple(filter(len, util.format.aslist(self.parameter('states', '')))):
if state[0] == '^':
self._states['exclude'].append(state[1:])
else:
self._states['include'].append(state)
self._format = self.parameter('format','{intf} {state} {ip} {ssid}');
self.iwgetid = shutil.which('iwgetid')
self._update_widgets(widgets)
def update(self):
self._update_widgets(self.widgets())
def state(self, widget):
states = []
if widget.get('state') == 'down':
states.append('critical')
elif widget.get('state') != 'up':
states.append('warning')
intf = widget.get('intf')
iftype = 'wireless' if self._iswlan(intf) else 'wired'
iftype = 'tunnel' if self._istunnel(intf) else iftype
states.append('{}-{}'.format(iftype, widget.get('state')))
return states
def _iswlan(self, intf):
# wifi, wlan, wlp, seems to work for me
if intf.startswith('w'): return True
return False
def _istunnel(self, intf):
return intf.startswith('tun') or intf.startswith('wg')
def get_addresses(self, intf):
retval = []
try:
for ip in netifaces.ifaddresses(intf).get(netifaces.AF_INET, []):
if ip.get('addr', '') != '':
retval.append(ip.get('addr'))
except Exception:
return []
return retval
def _update_widgets(self, widgets):
interfaces = [i for i in netifaces.interfaces() if not i.startswith(self._exclude)]
interfaces.extend([i for i in netifaces.interfaces() if i in self._include])
for widget in widgets:
widget.set('visited', False)
for intf in interfaces:
addr = []
state = 'down'
for ip in self.get_addresses(intf):
addr.append(ip)
state = 'up'
if len(self._states['exclude']) > 0 and state in self._states['exclude']: continue
if len(self._states['include']) > 0 and state not in self._states['include']: continue
widget = self.widget(intf)
if not widget:
widget = core.widget.Widget(name=intf, module=self)
widgets.append(widget)
# join/split is used to get rid of multiple whitespaces (in case SSID is not available, for instance
widget.full_text(' '.join(self._format.format(ip=', '.join(addr),intf=intf,state=state,ssid=self.get_ssid(intf)).split()))
widget.set('intf', intf)
widget.set('state', state)
widget.set('visited', True)
for widget in widgets:
if widget.get('visited') is False:
widgets.remove(widget)
def get_ssid(self, intf):
if self._iswlan(intf) and self.iwgetid:
return util.cli.execute('{} -r {}'.format(self.iwgetid, intf), ignore_errors=True)
return ''
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

7
modules/core/pasink.py Normal file
View file

@ -0,0 +1,7 @@
from .__pulseaudio import Module
class Module(Module):
def __init__(self, config):
super().__init__(config, 'sink')
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

7
modules/core/pasource.py Normal file
View file

@ -0,0 +1,7 @@
from .__pulseaudio import Module
class Module(Module):
def __init__(self, config):
super().__init__(config, 'source')
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

13
modules/core/test.py Normal file
View file

@ -0,0 +1,13 @@
# pylint: disable=C0111,R0903
"""Test module
"""
import core.widget
import core.module
class Module(core.module.Module):
def __init__(self, config=None):
super().__init__(config, core.widget.Widget('test'))
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

19
modules/core/time.py Normal file
View file

@ -0,0 +1,19 @@
# pylint: disable=C0111,R0903
'''Displays the current date and time.
Parameters:
* time.format: strftime()-compatible formatting string
* time.locale: locale to use rather than the system default
'''
from .datetime import Module
class Module(Module):
def __init__(self, config):
super().__init__(config)
def default_format(self):
return '%X'
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4