[core] Refactor engine
This is going to be a bit more comprehensive than anticipated. In order to cleanly refactor the core and the engine, basically start from scratch with the implementation. Goals: * Test coverage * Maintain backwards compatibility with module interface as much as possible (but still make modules easier to code) * Simplicity see #23
This commit is contained in:
parent
20858991b9
commit
a8a6c9bba2
72 changed files with 19 additions and 2155 deletions
|
@ -1,52 +0,0 @@
|
|||
import datetime
|
||||
import bumblebee.module
|
||||
import os.path
|
||||
|
||||
def description():
|
||||
return "Displays battery status, percentage and whether it's charging or discharging."
|
||||
|
||||
def parameters():
|
||||
return [ "battery.device: The device to read from (defaults to BAT0)" ]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._battery = config.parameter("device", "BAT0")
|
||||
self._capacity = 100
|
||||
self._status = "Unknown"
|
||||
|
||||
def widgets(self):
|
||||
self._AC = False;
|
||||
self._path = "/sys/class/power_supply/{}".format(self._battery)
|
||||
if not os.path.exists(self._path):
|
||||
self._AC = True;
|
||||
return bumblebee.output.Widget(self,"AC")
|
||||
|
||||
with open(self._path + "/capacity") as f:
|
||||
self._capacity = int(f.read())
|
||||
self._capacity = self._capacity if self._capacity < 100 else 100
|
||||
|
||||
return bumblebee.output.Widget(self,"{:02d}%".format(self._capacity))
|
||||
|
||||
def warning(self, widget):
|
||||
return self._capacity < self._config.parameter("warning", 20)
|
||||
|
||||
def critical(self, widget):
|
||||
return self._capacity < self._config.parameter("critical", 10)
|
||||
|
||||
def state(self, widget):
|
||||
if self._AC:
|
||||
return "AC"
|
||||
|
||||
with open(self._path + "/status") as f:
|
||||
self._status = f.read().strip()
|
||||
|
||||
if self._status == "Discharging":
|
||||
status = "discharging-{}".format(min([ 10, 25, 50, 80, 100] , key=lambda i:abs(i-self._capacity)))
|
||||
return status
|
||||
else:
|
||||
if self._capacity > 95:
|
||||
return "charged"
|
||||
return "charging"
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,33 +0,0 @@
|
|||
import bumblebee.module
|
||||
|
||||
def description():
|
||||
return "Displays brightness percentage"
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"brightness.step: Steps (in percent) to increase/decrease brightness on scroll (defaults to 2)",
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._brightness = 0
|
||||
self._max = 0
|
||||
self._percent = 0
|
||||
|
||||
step = self._config.parameter("step", 2)
|
||||
|
||||
output.add_callback(module=self.instance(), button=4, cmd="xbacklight +{}%".format(step))
|
||||
output.add_callback(module=self.instance(), button=5, cmd="xbacklight -{}%".format(step))
|
||||
|
||||
def widgets(self):
|
||||
with open("/sys/class/backlight/intel_backlight/brightness") as f:
|
||||
self._brightness = int(f.read())
|
||||
with open("/sys/class/backlight/intel_backlight/max_brightness") as f:
|
||||
self._max = int(f.read())
|
||||
self._brightness = self._brightness if self._brightness < self._max else self._max
|
||||
self._percent = int(round(self._brightness * 100 / self._max))
|
||||
|
||||
return bumblebee.output.Widget(self, "{:02d}%".format(self._percent))
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,38 +0,0 @@
|
|||
import subprocess
|
||||
import shlex
|
||||
|
||||
import bumblebee.module
|
||||
|
||||
def description():
|
||||
return "Enable/disable auto screen lock."
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._activated = 0
|
||||
output.add_callback(module="caffeine.activate", button=1, cmd=[ 'notify-send "Consuming caffeine"', 'xset s off' ])
|
||||
output.add_callback(module="caffeine.deactivate", button=1, cmd=[ 'notify-send "Out of coffee"', 'xset s default' ])
|
||||
|
||||
def widgets(self):
|
||||
output = subprocess.check_output(shlex.split("xset q"))
|
||||
xset_out = output.decode().split("\n")
|
||||
for line in xset_out:
|
||||
if line.startswith(" timeout"):
|
||||
timeout = int(line.split(" ")[4])
|
||||
if timeout == 0:
|
||||
self._activated = 1;
|
||||
else:
|
||||
self._activated = 0;
|
||||
break
|
||||
|
||||
if self._activated == 0:
|
||||
return bumblebee.output.Widget(self, "", instance="caffeine.activate")
|
||||
elif self._activated == 1:
|
||||
return bumblebee.output.Widget(self, "", instance="caffeine.deactivate")
|
||||
|
||||
def state(self, widget):
|
||||
if self._activated == 1:
|
||||
return "activated"
|
||||
else:
|
||||
return "deactivated"
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
import string
|
||||
import datetime
|
||||
import subprocess
|
||||
from collections import defaultdict
|
||||
|
||||
import bumblebee.util
|
||||
import bumblebee.module
|
||||
|
||||
def description():
|
||||
return "Displays the current song and artist playing in cmus"
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"cmus.format: Format of the displayed song information, arbitrary tags (as available from cmus-remote -Q) can be used (defaults to {artist} - {title} {position}/{duration})"
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._status = "default"
|
||||
self._fmt = self._config.parameter("format", "{artist} - {title} {position}/{duration}")
|
||||
|
||||
output.add_callback(module="cmus.prev", button=1, cmd="cmus-remote -r")
|
||||
output.add_callback(module="cmus.next", button=1, cmd="cmus-remote -n")
|
||||
output.add_callback(module="cmus.shuffle", button=1, cmd="cmus-remote -S")
|
||||
output.add_callback(module="cmus.repeat", button=1, cmd="cmus-remote -R")
|
||||
output.add_callback(module=self.instance(), button=1, cmd="cmus-remote -u")
|
||||
|
||||
def _loadsong(self):
|
||||
process = subprocess.Popen(["cmus-remote", "-Q"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
self._query, self._error = process.communicate()
|
||||
self._query = self._query.decode("utf-8").split("\n")
|
||||
self._status = "default"
|
||||
|
||||
def _tags(self):
|
||||
tags = defaultdict(lambda: '')
|
||||
self._repeat = False
|
||||
self._shuffle = False
|
||||
for line in self._query:
|
||||
if line.startswith("status"):
|
||||
status = line.split(" ", 2)[1]
|
||||
self._status = status
|
||||
if line.startswith("tag"):
|
||||
key, value = line.split(" ", 2)[1:]
|
||||
tags.update({ key: value })
|
||||
if line.startswith("duration"):
|
||||
sec = line.split(" ")[1]
|
||||
tags.update({ "duration": bumblebee.util.durationfmt(int(sec)) })
|
||||
if line.startswith("position"):
|
||||
sec = line.split(" ")[1]
|
||||
tags.update({ "position": bumblebee.util.durationfmt(int(sec)) })
|
||||
if line.startswith("set repeat "):
|
||||
self._repeat = False if line.split(" ")[2] == "false" else True
|
||||
if line.startswith("set shuffle "):
|
||||
self._shuffle = False if line.split(" ")[2] == "false" else True
|
||||
|
||||
return tags
|
||||
|
||||
def widgets(self):
|
||||
self._loadsong()
|
||||
tags = self._tags()
|
||||
|
||||
return [
|
||||
bumblebee.output.Widget(self, "", instance="cmus.prev"),
|
||||
bumblebee.output.Widget(self, string.Formatter().vformat(self._fmt, (), tags)),
|
||||
bumblebee.output.Widget(self, "", instance="cmus.next"),
|
||||
bumblebee.output.Widget(self, "", instance="cmus.shuffle"),
|
||||
bumblebee.output.Widget(self, "", instance="cmus.repeat"),
|
||||
]
|
||||
|
||||
def state(self, widget):
|
||||
if widget.instance() == "cmus.shuffle":
|
||||
return "on" if self._shuffle else "off"
|
||||
if widget.instance() == "cmus.repeat":
|
||||
return "on" if self._repeat else "off"
|
||||
return self._status
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,30 +0,0 @@
|
|||
import bumblebee.module
|
||||
import psutil
|
||||
|
||||
def description():
|
||||
return "Displays CPU utilization across all CPUs."
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"cpu.warning: Warning threshold in % of disk usage (defaults to 70%)",
|
||||
"cpu.critical: Critical threshold in % of disk usage (defaults to 80%)",
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._perc = psutil.cpu_percent(percpu=False)
|
||||
|
||||
output.add_callback(module=self.instance(), button=1, cmd="gnome-system-monitor")
|
||||
|
||||
def widgets(self):
|
||||
self._perc = psutil.cpu_percent(percpu=False)
|
||||
return bumblebee.output.Widget(self, "{:05.02f}%".format(self._perc))
|
||||
|
||||
def warning(self, widget):
|
||||
return self._perc > self._config.parameter("warning", 70)
|
||||
|
||||
def critical(self, widget):
|
||||
return self._perc > self._config.parameter("critical", 80)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1 +0,0 @@
|
|||
datetime.py
|
|
@ -1 +0,0 @@
|
|||
time.py
|
|
@ -1,40 +0,0 @@
|
|||
import os
|
||||
import bumblebee.util
|
||||
import bumblebee.module
|
||||
|
||||
def description():
|
||||
return "Shows free diskspace, total diskspace and the percentage of free disk space."
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"disk.warning: Warning threshold in % (defaults to 80%)",
|
||||
"disk.critical: Critical threshold in % (defaults to 90%)"
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._path = self._config.parameter("path", "/")
|
||||
|
||||
output.add_callback(module=self.instance(), button=1, cmd="nautilus {}".format(self._path))
|
||||
|
||||
def widgets(self):
|
||||
st = os.statvfs(self._path)
|
||||
|
||||
self._size = st.f_frsize*st.f_blocks
|
||||
self._used = self._size - st.f_frsize*st.f_bavail
|
||||
self._perc = 100.0*self._used/self._size
|
||||
|
||||
return bumblebee.output.Widget(self,
|
||||
"{} {}/{} ({:05.02f}%)".format(self._path,
|
||||
bumblebee.util.bytefmt(self._used),
|
||||
bumblebee.util.bytefmt(self._size), self._perc)
|
||||
)
|
||||
|
||||
def warning(self, widget):
|
||||
return self._perc > self._config.parameter("warning", 80)
|
||||
|
||||
def critical(self, widget):
|
||||
return self._perc > self._config.parameter("critical", 90)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,98 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import time
|
||||
import shlex
|
||||
import threading
|
||||
import subprocess
|
||||
|
||||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
|
||||
def description():
|
||||
return "Checks DNF for updated packages and displays the number of <security>/<bugfixes>/<enhancements>/<other> pending updates."
|
||||
|
||||
def parameters():
|
||||
return [ "dnf.interval: Time in seconds between two checks for updates (defaults to 1800)" ]
|
||||
|
||||
def get_dnf_info(obj):
|
||||
loops = obj.interval()
|
||||
|
||||
for thread in threading.enumerate():
|
||||
if thread.name == "MainThread":
|
||||
main = thread
|
||||
|
||||
while main.is_alive():
|
||||
loops += 1
|
||||
if loops < obj.interval():
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
loops = 0
|
||||
try:
|
||||
res = subprocess.check_output(shlex.split("dnf updateinfo"))
|
||||
except Exception as e:
|
||||
break
|
||||
|
||||
security = 0
|
||||
bugfixes = 0
|
||||
enhancements = 0
|
||||
other = 0
|
||||
for line in res.decode().split("\n"):
|
||||
|
||||
if not line.startswith(" "): continue
|
||||
elif "ecurity" in line:
|
||||
for s in line.split():
|
||||
if s.isdigit(): security += int(s)
|
||||
elif "ugfix" in line:
|
||||
for s in line.split():
|
||||
if s.isdigit(): bugfixes += int(s)
|
||||
elif "hancement" in line:
|
||||
for s in line.split():
|
||||
if s.isdigit(): enhancements += int(s)
|
||||
else:
|
||||
for s in line.split():
|
||||
if s.isdigit(): other += int(s)
|
||||
|
||||
obj.set("security", security)
|
||||
obj.set("bugfixes", bugfixes)
|
||||
obj.set("enhancements", enhancements)
|
||||
obj.set("other", other)
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
self._counter = {}
|
||||
self._thread = threading.Thread(target=get_dnf_info, args=(self,))
|
||||
self._thread.start()
|
||||
|
||||
def interval(self):
|
||||
return self._config.parameter("interval", 30*60)
|
||||
|
||||
def set(self, what, value):
|
||||
self._counter[what] = value
|
||||
|
||||
def get(self, what):
|
||||
return self._counter.get(what, 0)
|
||||
|
||||
def widgets(self):
|
||||
result = []
|
||||
for t in [ "security", "bugfixes", "enhancements", "other" ]:
|
||||
result.append(str(self.get(t)))
|
||||
|
||||
return bumblebee.output.Widget(self, "/".join(result))
|
||||
|
||||
def state(self, widget):
|
||||
total = sum(self._counter.values())
|
||||
if total == 0: return "good"
|
||||
return "default"
|
||||
|
||||
def warning(self, widget):
|
||||
total = sum(self._counter.values())
|
||||
return total > 0
|
||||
|
||||
def critical(self, widget):
|
||||
total = sum(self._counter.values())
|
||||
return total > 50 or self._counter.get("security", 0) > 0
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,67 +0,0 @@
|
|||
import subprocess
|
||||
import shlex
|
||||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
|
||||
def description():
|
||||
return "Showws current keyboard layout and change it on click."
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"layout.lang: pipe-separated list of languages to cycle through (e.g. us|rs|de). Default: en"
|
||||
]
|
||||
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
self._languages = self._config.parameter("lang", "en").split("|")
|
||||
self._idx = 0
|
||||
|
||||
output.add_callback(module=self.instance(), button=1, cmd=self.next_keymap)
|
||||
output.add_callback(module=self.instance(), button=3, cmd=self.prev_keymap)
|
||||
|
||||
def next_keymap(self, event, widget):
|
||||
self._idx = self._idx + 1 if self._idx < len(self._languages) - 1 else 0
|
||||
self.set_keymap()
|
||||
|
||||
def prev_keymap(self, event, widget):
|
||||
self._idx = self._idx - 1 if self._idx > 0 else len(self._languages) - 1
|
||||
self.set_keymap()
|
||||
|
||||
def set_keymap(self):
|
||||
tmp = self._languages[self._idx].split(":")
|
||||
layout = tmp[0]
|
||||
variant = ""
|
||||
if len(tmp) > 1:
|
||||
variant = "-variant {}".format(tmp[1])
|
||||
bumblebee.util.execute("setxkbmap -layout {} {}".format(layout, variant))
|
||||
|
||||
def widgets(self):
|
||||
res = bumblebee.util.execute("setxkbmap -query")
|
||||
layout = None
|
||||
variant = None
|
||||
for line in res.split("\n"):
|
||||
if not line:
|
||||
continue
|
||||
if "layout" in line:
|
||||
layout = line.split(":")[1].strip()
|
||||
if "variant" in line:
|
||||
variant = line.split(":")[1].strip()
|
||||
if variant:
|
||||
layout += ":" + variant
|
||||
|
||||
lang = self._languages[self._idx]
|
||||
|
||||
if lang != layout:
|
||||
if layout in self._languages:
|
||||
self._idx = self._languages.index(layout)
|
||||
else:
|
||||
self._languages.append(layout)
|
||||
self._idx = len(self._languages) - 1
|
||||
lang = layout
|
||||
|
||||
return bumblebee.output.Widget(self, lang)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,37 +0,0 @@
|
|||
import bumblebee.module
|
||||
import multiprocessing
|
||||
import os
|
||||
|
||||
def description():
|
||||
return "Displays system load."
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"load.warning: Warning threshold for the one-minute load average (defaults to 70% of the number of CPUs)",
|
||||
"load.critical: Critical threshold for the one-minute load average (defaults 80% of the number of CPUs)"
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._cpus = 1
|
||||
try:
|
||||
self._cpus = multiprocessing.cpu_count()
|
||||
except multiprocessing.NotImplementedError as e:
|
||||
pass
|
||||
|
||||
output.add_callback(module=self.instance(), button=1, cmd="gnome-system-monitor")
|
||||
|
||||
def widgets(self):
|
||||
self._load = os.getloadavg()
|
||||
|
||||
return bumblebee.output.Widget(self, "{:.02f}/{:.02f}/{:.02f}".format(
|
||||
self._load[0], self._load[1], self._load[2]))
|
||||
|
||||
def warning(self, widget):
|
||||
return self._load[0] > self._config.parameter("warning", self._cpus*0.7)
|
||||
|
||||
def critical(self, widget):
|
||||
return self._load[0] > self._config.parameter("critical", self._cpus*0.8)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,38 +0,0 @@
|
|||
import psutil
|
||||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
|
||||
def description():
|
||||
return "Shows available RAM, total amount of RAM and the percentage of available RAM."
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"memory.warning: Warning threshold in % of memory used (defaults to 80%)",
|
||||
"memory.critical: Critical threshold in % of memory used (defaults to 90%)",
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._mem = psutil.virtual_memory()
|
||||
|
||||
output.add_callback(module=self.instance(), button=1, cmd="gnome-system-monitor")
|
||||
|
||||
def widgets(self):
|
||||
self._mem = psutil.virtual_memory()
|
||||
|
||||
used = self._mem.total - self._mem.available
|
||||
|
||||
return bumblebee.output.Widget(self, "{}/{} ({:05.02f}%)".format(
|
||||
bumblebee.util.bytefmt(used),
|
||||
bumblebee.util.bytefmt(self._mem.total),
|
||||
self._mem.percent)
|
||||
)
|
||||
|
||||
def warning(self, widget):
|
||||
return self._mem.percent > self._config.parameter("warning", 80)
|
||||
|
||||
def critical(self, widget):
|
||||
return self._mem.percent > self._config.parameter("critical", 90)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,62 +0,0 @@
|
|||
import netifaces
|
||||
import bumblebee.module
|
||||
|
||||
def description():
|
||||
return "Displays the names, IP addresses and status of each available interface."
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"nic.exclude: Comma-separated list of interface prefixes to exlude (defaults to: \"lo,virbr,docker,vboxnet,veth\")"
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._exclude = tuple(filter(len, self._config.parameter("exclude", "lo,virbr,docker,vboxnet,veth").split(",")))
|
||||
|
||||
def widgets(self):
|
||||
result = []
|
||||
interfaces = [ i for i in netifaces.interfaces() if not i.startswith(self._exclude) ]
|
||||
for intf in interfaces:
|
||||
addr = []
|
||||
state = "down"
|
||||
try:
|
||||
if netifaces.AF_INET in netifaces.ifaddresses(intf):
|
||||
for ip in netifaces.ifaddresses(intf)[netifaces.AF_INET]:
|
||||
if "addr" in ip and ip["addr"] != "":
|
||||
addr.append(ip["addr"])
|
||||
state = "up"
|
||||
except Exception as e:
|
||||
addr = []
|
||||
widget = bumblebee.output.Widget(self, "{} {} {}".format(
|
||||
intf, state, ", ".join(addr)
|
||||
))
|
||||
widget.set("intf", intf)
|
||||
widget.set("state", state)
|
||||
result.append(widget)
|
||||
|
||||
return result
|
||||
|
||||
def _iswlan(self, intf):
|
||||
# wifi, wlan, wlp, seems to work for me
|
||||
if intf.startswith("w"): return True
|
||||
return False
|
||||
|
||||
def _istunnel(self, intf):
|
||||
return intf.startswith("tun")
|
||||
|
||||
def state(self, widget):
|
||||
intf = widget.get("intf")
|
||||
|
||||
iftype = "wireless" if self._iswlan(intf) else "wired"
|
||||
iftype = "tunnel" if self._istunnel(intf) else iftype
|
||||
|
||||
return "{}-{}".format(iftype, widget.get("state"))
|
||||
|
||||
def warning(self, widget):
|
||||
return widget.get("state") != "up"
|
||||
|
||||
def critical(self, widget):
|
||||
return widget.get("state") == "down"
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,58 +0,0 @@
|
|||
import bumblebee.module
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
def description():
|
||||
return "Displays available updates per repository for pacman."
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
self._count = 0
|
||||
|
||||
def widgets(self):
|
||||
path = os.path.dirname(os.path.abspath(__file__))
|
||||
if self._count == 0:
|
||||
self._out = "?/?/?/?"
|
||||
process = subprocess.Popen([ "{}/../../bin/customupdates".format(path) ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
self._query, self._error = process.communicate()
|
||||
|
||||
if not process.returncode == 0:
|
||||
self._out = "?/?/?/?"
|
||||
else:
|
||||
self._community = 0
|
||||
self._core = 0
|
||||
self._extra = 0
|
||||
self._other = 0
|
||||
|
||||
for line in self._query.splitlines():
|
||||
if line.startswith(b'http'):
|
||||
if b"community" in line:
|
||||
self._community += 1
|
||||
continue
|
||||
if b"core" in line:
|
||||
self._core += 1;
|
||||
continue
|
||||
if b"extra" in line:
|
||||
self._extra += 1
|
||||
continue
|
||||
self._other += 1
|
||||
self._out = str(self._core)+"/"+str(self._extra)+"/"+str(self._community)+"/"+str(self._other)
|
||||
|
||||
self._count += 1
|
||||
self._count = 0 if self._count > 300 else self._count
|
||||
return bumblebee.output.Widget(self, "{}".format(self._out))
|
||||
|
||||
def sumUpdates(self):
|
||||
return self._core + self._community + self._extra + self._other
|
||||
|
||||
def critical(self, widget):
|
||||
#return self._sumUpdates(self)
|
||||
return self.sumUpdates() > 0
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1 +0,0 @@
|
|||
pulseaudio.py
|
|
@ -1 +0,0 @@
|
|||
pulseaudio.py
|
|
@ -1,97 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import re
|
||||
import time
|
||||
import shlex
|
||||
import threading
|
||||
import subprocess
|
||||
|
||||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
|
||||
def description():
|
||||
return "Periodically checks the RTT of a configurable IP"
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
"ping.interval: Time in seconds between two RTT checks (defaults to 60)",
|
||||
"ping.address: IP address to check",
|
||||
"ping.warning: Threshold for warning state, in seconds (defaults to 1.0)",
|
||||
"ping.critical: Threshold for critical state, in seconds (defaults to 2.0)",
|
||||
"ping.timeout: Timeout for waiting for a reply (defaults to 5.0)",
|
||||
"ping.probes: Number of probes to send (defaults to 5)",
|
||||
]
|
||||
|
||||
def get_rtt(obj):
|
||||
loops = obj.get("interval")
|
||||
|
||||
for thread in threading.enumerate():
|
||||
if thread.name == "MainThread":
|
||||
main = thread
|
||||
|
||||
interval = obj.get("interval")
|
||||
while main.is_alive():
|
||||
loops += 1
|
||||
if loops < interval:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
loops = 0
|
||||
try:
|
||||
res = subprocess.check_output(shlex.split("ping -n -q -c {} -W {} {}".format(
|
||||
obj.get("rtt-probes"), obj.get("rtt-timeout"), obj.get("address")
|
||||
)))
|
||||
obj.set("rtt-unreachable", False)
|
||||
|
||||
for line in res.decode().split("\n"):
|
||||
if not line.startswith("rtt"): continue
|
||||
m = re.search(r'([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+)\s+(\S+)', line)
|
||||
|
||||
obj.set("rtt-min", float(m.group(1)))
|
||||
obj.set("rtt-avg", float(m.group(2)))
|
||||
obj.set("rtt-max", float(m.group(3)))
|
||||
obj.set("rtt-unit", m.group(5))
|
||||
except Exception as e:
|
||||
obj.set("rtt-unreachable", True)
|
||||
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
self._counter = {}
|
||||
|
||||
self.set("address", self._config.parameter("address", "8.8.8.8"))
|
||||
self.set("interval", self._config.parameter("interval", 60))
|
||||
self.set("rtt-probes", self._config.parameter("probes", 5))
|
||||
self.set("rtt-timeout", self._config.parameter("timeout", 5.0))
|
||||
|
||||
self._thread = threading.Thread(target=get_rtt, args=(self,))
|
||||
self._thread.start()
|
||||
|
||||
def set(self, what, value):
|
||||
self._counter[what] = value
|
||||
|
||||
def get(self, what):
|
||||
return self._counter.get(what, 0)
|
||||
|
||||
def widgets(self):
|
||||
text = "{}: {:.1f}{}".format(
|
||||
self.get("address"),
|
||||
self.get("rtt-avg"),
|
||||
self.get("rtt-unit")
|
||||
)
|
||||
|
||||
if self.get("rtt-unreachable"):
|
||||
text = "{}: unreachable".format(self.get("address"))
|
||||
|
||||
return bumblebee.output.Widget(self, text)
|
||||
|
||||
def warning(self, widget):
|
||||
return self.get("rtt-avg") > float(self._config.parameter("warning", 1.0))*1000.0
|
||||
|
||||
def critical(self, widget):
|
||||
if self.get("rtt-unreachable"): return True
|
||||
return self.get("rtt-avg") > float(self._config.parameter("critical", 2.0))*1000.0
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,92 +0,0 @@
|
|||
import re
|
||||
import shlex
|
||||
import subprocess
|
||||
|
||||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
|
||||
def description():
|
||||
module = __name__.split(".")[-1]
|
||||
if module == "pasink":
|
||||
return "Shows volume and mute status of the default PulseAudio Sink."
|
||||
if module == "pasource":
|
||||
return "Shows volume and mute status of the default PulseAudio Source."
|
||||
return "See 'pasource'."
|
||||
|
||||
def parameters():
|
||||
return [ "none" ]
|
||||
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
self._module = self.__module__.split(".")[-1]
|
||||
self._left = 0
|
||||
self._right = 0
|
||||
self._mono = 0
|
||||
self._mute = False
|
||||
channel = "sink" if self._module == "pasink" else "source"
|
||||
|
||||
output.add_callback(module=self.instance(), button=3,
|
||||
cmd="pavucontrol")
|
||||
output.add_callback(module=self.instance(), button=1,
|
||||
cmd="pactl set-{}-mute @DEFAULT_{}@ toggle".format(channel, channel.upper()))
|
||||
output.add_callback(module=self.instance(), button=4,
|
||||
cmd="pactl set-{}-volume @DEFAULT_{}@ +2%".format(channel, channel.upper()))
|
||||
output.add_callback(module=self.instance(), button=5,
|
||||
cmd="pactl set-{}-volume @DEFAULT_{}@ -2%".format(channel, channel.upper()))
|
||||
|
||||
def widgets(self):
|
||||
res = subprocess.check_output(shlex.split("pactl info"))
|
||||
channel = "sinks" if self._module == "pasink" else "sources"
|
||||
name = None
|
||||
for line in res.decode().split("\n"):
|
||||
if line.startswith("Default Sink: ") and channel == "sinks":
|
||||
name = line[14:]
|
||||
if line.startswith("Default Source: ") and channel == "sources":
|
||||
name = line[16:]
|
||||
|
||||
res = subprocess.check_output(shlex.split("pactl list {}".format(channel)))
|
||||
|
||||
found = False
|
||||
for line in res.decode().split("\n"):
|
||||
if "Name:" in line and found == True:
|
||||
break
|
||||
if name in line:
|
||||
found = True
|
||||
if "Mute:" in line and found == True:
|
||||
self._mute = False if " no" in line.lower() else True
|
||||
|
||||
if "Volume:" in line and found == True:
|
||||
m = None
|
||||
if "mono" in line:
|
||||
m = re.search(r'mono:.*\s*\/\s*(\d+)%', line)
|
||||
else:
|
||||
m = re.search(r'left:.*\s*\/\s*(\d+)%.*right:.*\s*\/\s*(\d+)%', line)
|
||||
if not m: continue
|
||||
|
||||
if "mono" in line:
|
||||
self._mono = m.group(1)
|
||||
else:
|
||||
self._left = m.group(1)
|
||||
self._right = m.group(2)
|
||||
result = ""
|
||||
if int(self._mono) > 0:
|
||||
result = "{}%".format(self._mono)
|
||||
elif self._left == self._right:
|
||||
result = "{}%".format(self._left)
|
||||
else:
|
||||
result="{}%/{}%".format(self._left, self._right)
|
||||
return bumblebee.output.Widget(self, result)
|
||||
|
||||
def state(self, widget):
|
||||
return "muted" if self._mute is True else "unmuted"
|
||||
|
||||
def warning(self, widget):
|
||||
return self._mute
|
||||
|
||||
def critical(self, widget):
|
||||
return False
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,17 +0,0 @@
|
|||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
|
||||
def description():
|
||||
return "Draws a widget with configurable content."
|
||||
|
||||
def parameters():
|
||||
return [ "spacer.text: Text to draw (defaults to '')" ]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
def widgets(self):
|
||||
return bumblebee.output.Widget(self, self._config.parameter("text", ""))
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,34 +0,0 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import datetime
|
||||
import bumblebee.module
|
||||
|
||||
def description():
|
||||
return "Displays the current time, using the optional format string as input for strftime."
|
||||
|
||||
def parameters():
|
||||
module = __name__.split(".")[-1]
|
||||
return [
|
||||
"{}.format: strftime specification (defaults to {})".format(module, default_format(module))
|
||||
]
|
||||
|
||||
def default_format(module):
|
||||
default = "%x %X"
|
||||
if module == "date":
|
||||
default = "%x"
|
||||
if module == "time":
|
||||
default = "%X"
|
||||
return default
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
module = self.__module__.split(".")[-1]
|
||||
|
||||
self._fmt = self._config.parameter("format", default_format(module))
|
||||
|
||||
def widgets(self):
|
||||
return bumblebee.output.Widget(self, datetime.datetime.now().strftime(self._fmt))
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -1,89 +0,0 @@
|
|||
import bumblebee.module
|
||||
import bumblebee.util
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
def description():
|
||||
return "Shows all connected screens"
|
||||
|
||||
def parameters():
|
||||
return [
|
||||
]
|
||||
|
||||
class Module(bumblebee.module.Module):
|
||||
def __init__(self, output, config):
|
||||
super(Module, self).__init__(output, config)
|
||||
|
||||
self._widgets = []
|
||||
|
||||
def toggle(self, event, widget):
|
||||
path = os.path.dirname(os.path.abspath(__file__))
|
||||
toggle_cmd = "{}/../../bin/toggle-display.sh".format(path)
|
||||
|
||||
if widget.get("state") == "on":
|
||||
bumblebee.util.execute("{} --output {} --off".format(toggle_cmd, widget.get("display")))
|
||||
else:
|
||||
neighbor = None
|
||||
for w in self._widgets:
|
||||
if w.get("state") == "on":
|
||||
neighbor = w
|
||||
if event.get("button") == 1:
|
||||
break
|
||||
|
||||
if neighbor == None:
|
||||
bumblebee.util.execute("{} --output {} --auto".format(toggle_cmd,
|
||||
widget.get("display")))
|
||||
else:
|
||||
bumblebee.util.execute("{} --output {} --auto --{}-of {}".format(toggle_cmd,
|
||||
widget.get("display"), "left" if event.get("button") == 1 else "right",
|
||||
neighbor.get("display")))
|
||||
|
||||
def widgets(self):
|
||||
process = subprocess.Popen([ "xrandr", "-q" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
|
||||
widgets = []
|
||||
|
||||
for line in output.split("\n"):
|
||||
if not " connected" in line:
|
||||
continue
|
||||
display = line.split(" ", 2)[0]
|
||||
m = re.search(r'\d+x\d+\+(\d+)\+\d+', line)
|
||||
|
||||
widget = bumblebee.output.Widget(self, display, instance=display)
|
||||
widget.set("display", display)
|
||||
|
||||
# not optimal (add callback once per interval), but since
|
||||
# add_callback() just returns if the callback has already
|
||||
# been registered, it should be "ok"
|
||||
self._output.add_callback(module=display, button=1,
|
||||
cmd=self.toggle)
|
||||
self._output.add_callback(module=display, button=3,
|
||||
cmd=self.toggle)
|
||||
if m:
|
||||
widget.set("state", "on")
|
||||
widget.set("pos", int(m.group(1)))
|
||||
else:
|
||||
widget.set("state", "off")
|
||||
widget.set("pos", sys.maxint)
|
||||
|
||||
widgets.append(widget)
|
||||
|
||||
widgets.sort(key=lambda widget : widget.get("pos"))
|
||||
|
||||
self._widgets = widgets
|
||||
|
||||
return widgets
|
||||
|
||||
def state(self, widget):
|
||||
return widget.get("state", "off")
|
||||
|
||||
def warning(self, widget):
|
||||
return False
|
||||
|
||||
def critical(self, widget):
|
||||
return False
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
Loading…
Add table
Add a link
Reference in a new issue