[core] restructure to allow PIP packaging

OK - so I have to admit I *hate* the fact that PIP seems to require a
subdirectory named like the library.

But since the PIP package is something really nifty to have (thanks to
@tony again!!!), I updated the codebase to hopefully conform with what
PIP expects. Testruns so far look promising...
This commit is contained in:
tobi-wan-kenobi 2020-05-09 21:22:00 +02:00
parent 1d25be2059
commit 320827d577
146 changed files with 2509 additions and 2 deletions

View file

@ -0,0 +1,41 @@
# pylint: disable=C0111,R0903
"""Displays CPU utilization across all CPUs.
Parameters:
* cpu.warning : Warning threshold in % of CPU usage (defaults to 70%)
* cpu.critical: Critical threshold in % of CPU usage (defaults to 80%)
* cpu.format : Format string (defaults to '{:.01f}%')
"""
import psutil
import core.module
import core.widget
import core.input
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.utilization))
self.widget().set("theme.minwidth", self._format.format(100.0 - 10e-20))
self._utilization = psutil.cpu_percent(percpu=False)
core.input.register(
self, button=core.input.LEFT_MOUSE, cmd="gnome-system-monitor"
)
@property
def _format(self):
return self.parameter("format", "{:.01f}%")
def utilization(self, _):
return self._format.format(self._utilization)
def update(self):
self._utilization = psutil.cpu_percent(percpu=False)
def state(self, _):
return self.threshold_state(self._utilization, 70, 80)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,23 @@
# pylint: disable=C0111,R0903
"""Displays the current date and time.
Parameters:
* date.format: strftime()-compatible formatting string
* date.locale: locale to use rather than the system default
"""
import core.decorators
from .datetime import Module
class Module(Module):
@core.decorators.every(hours=1)
def __init__(self, config, theme):
super().__init__(config, theme)
def default_format(self):
return "%x"
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,45 @@
# pylint: disable=C0111,R0903
"""Displays the current date and time.
Parameters:
* datetime.format: strftime()-compatible formatting string
* datetime.locale: locale to use rather than the system default
"""
from __future__ import absolute_import
import datetime
import locale
import core.module
import core.widget
import core.input
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.full_text))
core.input.register(self, button=core.input.LEFT_MOUSE, cmd="calendar")
self._fmt = self.parameter("format", self.default_format())
l = locale.getdefaultlocale()
if not l or l == (None, None):
l = ("en_US", "UTF-8")
lcl = self.parameter("locale", ".".join(l))
try:
locale.setlocale(locale.LC_TIME, lcl.split("."))
except Exception as e:
locale.setlocale(locale.LC_TIME, ("en_US", "UTF-8"))
def default_format(self):
return "%x %X"
def full_text(self, widget):
enc = locale.getpreferredencoding()
retval = datetime.datetime.now().strftime(self._fmt)
if hasattr(retval, "decode"):
return retval.decode(enc)
return retval
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,24 @@
# pylint: disable=C0111,R0903
"""Shows that debug is enabled"""
import platform
import core.module
import core.widget
import core.decorators
class Module(core.module.Module):
@core.decorators.every(minutes=60)
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.full_text))
def full_text(self, widgets):
return "debug"
def state(self, widget):
return "warning"
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,65 @@
# pylint: disable=C0111,R0903
"""Shows free diskspace, total diskspace and the percentage of free disk space.
Parameters:
* disk.warning: Warning threshold in % of disk space (defaults to 80%)
* disk.critical: Critical threshold in % of disk space (defaults ot 90%)
* disk.path: Path to calculate disk usage from (defaults to /)
* disk.open: Which application / file manager to launch (default xdg-open)
* disk.format: Format string, tags {path}, {used}, {left}, {size} and {percent} (defaults to '{path} {used}/{size} ({percent:05.02f}%)')
"""
import os
import core.module
import core.widget
import core.input
import util.format
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.diskspace))
self._path = self.parameter("path", "/")
self._format = self.parameter("format", "{used}/{size} ({percent:05.02f}%)")
self._used = 0
self._left = 0
self._size = 0
self._percent = 0
core.input.register(
self,
button=core.input.LEFT_MOUSE,
cmd="{} {}".format(self.parameter("open", "xdg-open"), self._path),
)
def diskspace(self, widget):
used_str = util.format.byte(self._used)
size_str = util.format.byte(self._size)
left_str = util.format.byte(self._left)
percent_str = self._percent
return self._format.format(
path=self._path,
used=used_str,
left=left_str,
size=size_str,
percent=percent_str,
)
def update(self):
st = os.statvfs(self._path)
self._size = st.f_blocks * st.f_frsize
self._used = (st.f_blocks - st.f_bfree) * st.f_frsize
self._left = self._size - self._used
self._percent = 100.0 * self._used / self._size
def state(self, widget):
return self.threshold_state(self._percent, 80, 90)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,33 @@
# pylint: disable=C0111,R0903
"""Shows bumblebee-status errors"""
import platform
import core.module
import core.widget
import core.event
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.full_text))
self.__error = ""
self.__state = "critical"
core.event.register("error", self.__set_error)
def full_text(self, widgets):
return self.__error
def __set_error(self, error="n/a", state="critical"):
self.__error = error
self.__state = state
def state(self, widget):
if self.__error:
return [self.__state]
return []
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,77 @@
# pylint: disable=C0111,R0903
"""Print the branch and git status for the
currently focused window.
Requires:
* xcwd
* Python module 'pygit2'
"""
import os
import pygit2
import core.module
import util.cli
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, [])
self.__error = False
def hidden(self):
return self.__error
def update(self):
state = {}
self.clear_widgets()
try:
directory = util.cli.execute("xcwd").strip()
directory = self.__get_git_root(directory)
repo = pygit2.Repository(directory)
self.add_widget(name="git.main", full_text=repo.head.shorthand)
for filepath, flags in repo.status().items():
if (
flags == pygit2.GIT_STATUS_WT_NEW
or flags == pygit2.GIT_STATUS_INDEX_NEW
):
state["new"] = True
if (
flags == pygit2.GIT_STATUS_WT_DELETED
or flags == pygit2.GIT_STATUS_INDEX_DELETED
):
state["deleted"] = True
if (
flags == pygit2.GIT_STATUS_WT_MODIFIED
or flags == pygit2.GIT_STATUS_INDEX_MODIFIED
):
state["modified"] = True
self.__error = False
if "new" in state:
self.add_widget(name="git.new")
if "modified" in state:
self.add_widget(name="git.modified")
if "deleted" in state:
self.add_widget(name="git.deleted")
except Exception as e:
self.__error = True
def state(self, widget):
return widget.name.split(".")[1]
def __get_git_root(self, directory):
while len(directory) > 1:
if os.path.exists(os.path.join(directory, ".git")):
return directory
directory = "/".join(directory.split("/")[0:-1])
return "/"
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,71 @@
# pylint: disable=C0111,R0903
"""Displays the current keyboard layout using libX11
Requires the following library:
* libX11.so.6
and python module:
* xkbgroup
Parameters:
* layout-xkb.showname: Boolean that indicate whether the full name should be displayed. Defaults to false (only the symbol will be displayed)
* layout-xkb.show_variant: Boolean that indecates whether the variant name should be displayed. Defaults to true.
"""
from xkbgroup import *
import logging
log = logging.getLogger(__name__)
import core.module
import core.widget
import core.input
import util.format
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.current_layout))
core.input.register(self, button=core.input.LEFT_MOUSE, cmd=self.__next_keymap)
core.input.register(self, button=core.input.RIGHT_MOUSE, cmd=self.__prev_keymap)
self.__show_variant = util.format.asbool(self.parameter("show_variant", True))
def __next_keymap(self, event):
self.__set_keymap(1)
def __prev_keymap(self, event):
self.__set_keymap(-1)
def __set_keymap(self, rotation):
xkb = XKeyboard()
if xkb.groups_count < 2:
return # nothing to do
layouts = xkb.groups_symbols
idx = layouts.index(xkb.group_symbol)
xkb.group_symbol = str(layouts[(idx + rotation) % len(layouts)])
def current_layout(self, widget):
try:
xkb = XKeyboard()
log.debug("group num: {}".format(xkb.group_num))
name = (
xkb.group_name
if util.format.asbool(self.parameter("showname"), False)
else xkb.group_symbol
)
if self.__show_variant:
return (
"{} ({})".format(name, xkb.group_variant)
if xkb.group_variant
else name
)
return name
except Exception as e:
print("got exception: {}".format(e))
return "n/a"
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,41 @@
# pylint: disable=C0111,R0903
"""Displays system load.
Parameters:
* load.warning : Warning threshold for the one-minute load average (defaults to 70% of the number of CPUs)
* load.critical: Critical threshold for the one-minute load average (defaults to 80% of the number of CPUs)
"""
import os
import multiprocessing
import core.module
import core.input
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.load))
self._load = [0, 0, 0]
try:
self._cpus = multiprocessing.cpu_count()
except NotImplementedError as e:
self._cpus = 1
core.input.register(
self, button=core.input.LEFT_MOUSE, cmd="gnome-system-monitor"
)
def load(self, widget):
return "{:.02f}/{:.02f}/{:.02f}".format(
self._load[0], self._load[1], self._load[2]
)
def update(self):
self._load = os.getloadavg()
def state(self, widget):
return self.threshold_state(self._load[0], self._cpus * 0.7, self._cpus * 0.8)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,77 @@
# pylint: disable=C0111,R0903
"""Displays available RAM, total amount of RAM and percentage available.
Parameters:
* memory.warning : Warning threshold in % of memory used (defaults to 80%)
* memory.critical: Critical threshold in % of memory used (defaults to 90%)
* memory.format: Format string (defaults to '{used}/{total} ({percent:05.02f}%)')
* memory.usedonly: Only show the amount of RAM in use (defaults to False). Same as memory.format='{used}'
"""
import re
import core.module
import core.widget
import core.input
import util.format
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.memory_usage))
core.input.register(
self, button=core.input.LEFT_MOUSE, cmd="gnome-system-monitor"
)
@property
def _format(self):
if util.format.asbool(self.parameter("usedonly", False)):
return "{used}"
else:
return self.parameter("format", "{used}/{total} ({percent:05.02f}%)")
def memory_usage(self, widget):
return self._format.format(**self._mem)
def update(self):
data = {}
with open("/proc/meminfo", "r") as f:
for line in f:
tmp = re.split(r"[:\s]+", line)
value = int(tmp[1])
if tmp[2] == "kB":
value = value * 1024
if tmp[2] == "mB":
value = value * 1024 * 1024
if tmp[2] == "gB":
value = value * 1024 * 1024 * 1024
data[tmp[0]] = value
if "MemAvailable" in data:
used = data["MemTotal"] - data["MemAvailable"]
else:
used = (
data["MemTotal"]
- data["MemFree"]
- data["Buffers"]
- data["Cached"]
- data["Slab"]
)
self._mem = {
"total": util.format.byte(data["MemTotal"]),
"available": util.format.byte(data["MemAvailable"]),
"free": util.format.byte(data["MemFree"]),
"used": util.format.byte(used),
"percent": float(used) / float(data["MemTotal"]) * 100.0,
}
def state(self, widget):
if self._mem["percent"] > float(self.parameter("critical", 90)):
return "critical"
if self._mem["percent"] > float(self.parameter("warning", 80)):
return "warning"
return None
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,135 @@
# pylint: disable=C0111,R0903
"""Displays the name, IP address(es) and status of each available network interface.
Requires the following python module:
* netifaces
Parameters:
* nic.exclude: Comma-separated list of interface prefixes to exclude (defaults to 'lo,virbr,docker,vboxnet,veth,br')
* nic.include: Comma-separated list of interfaces to include
* nic.states: Comma-separated list of states to show (prefix with '^' to invert - i.e. ^down -> show all devices that are not in state down)
* nic.format: Format string (defaults to '{intf} {state} {ip} {ssid}')
"""
import shutil
import netifaces
import subprocess
import core.module
import core.decorators
import util.cli
import util.format
class Module(core.module.Module):
@core.decorators.every(seconds=10)
def __init__(self, config, theme):
widgets = []
super().__init__(config, theme, widgets)
self._exclude = tuple(
filter(
len,
self.parameter("exclude", "lo,virbr,docker,vboxnet,veth,br").split(","),
)
)
self._include = self.parameter("include", "").split(",")
self._states = {"include": [], "exclude": []}
for state in tuple(
filter(len, util.format.aslist(self.parameter("states", "")))
):
if state[0] == "^":
self._states["exclude"].append(state[1:])
else:
self._states["include"].append(state)
self._format = self.parameter("format", "{intf} {state} {ip} {ssid}")
self.iwgetid = shutil.which("iwgetid")
self._update_widgets(widgets)
def update(self):
self._update_widgets(self.widgets())
def state(self, widget):
states = []
if widget.get("state") == "down":
states.append("critical")
elif widget.get("state") != "up":
states.append("warning")
intf = widget.get("intf")
iftype = "wireless" if self._iswlan(intf) else "wired"
iftype = "tunnel" if self._istunnel(intf) else iftype
states.append("{}-{}".format(iftype, widget.get("state")))
return states
def _iswlan(self, intf):
# wifi, wlan, wlp, seems to work for me
if intf.startswith("w"):
return True
return False
def _istunnel(self, intf):
return intf.startswith("tun") or intf.startswith("wg")
def get_addresses(self, intf):
retval = []
try:
for ip in netifaces.ifaddresses(intf).get(netifaces.AF_INET, []):
if ip.get("addr", "") != "":
retval.append(ip.get("addr"))
except Exception:
return []
return retval
def _update_widgets(self, widgets):
self.clear_widgets()
interfaces = [
i for i in netifaces.interfaces() if not i.startswith(self._exclude)
]
interfaces.extend([i for i in netifaces.interfaces() if i in self._include])
for intf in interfaces:
addr = []
state = "down"
for ip in self.get_addresses(intf):
addr.append(ip)
state = "up"
if len(self._states["exclude"]) > 0 and state in self._states["exclude"]:
continue
if (
len(self._states["include"]) > 0
and state not in self._states["include"]
):
continue
widget = self.widget(intf)
if not widget:
widget = self.add_widget(name=intf)
# join/split is used to get rid of multiple whitespaces (in case SSID is not available, for instance
widget.full_text(
" ".join(
self._format.format(
ip=", ".join(addr),
intf=intf,
state=state,
ssid=self.get_ssid(intf),
).split()
)
)
widget.set("intf", intf)
widget.set("state", state)
def get_ssid(self, intf):
if self._iswlan(intf) and self.iwgetid:
return util.cli.execute(
"{} -r {}".format(self.iwgetid, intf), ignore_errors=True
)
return ""
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,9 @@
from .pulseaudio import Module
class Module(Module):
def __init__(self, config, theme):
super().__init__(config, theme, "sink")
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,9 @@
from .pulseaudio import Module
class Module(Module):
def __init__(self, config, theme):
super().__init__(config, theme, "source")
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,95 @@
# pylint: disable=C0111,R0903
"""Periodically checks the RTT of a configurable host using ICMP echos
Requires the following executable:
* ping
Parameters:
* ping.address : IP address to check
* ping.timeout : Timeout for waiting for a reply (defaults to 5.0)
* ping.probes : Number of probes to send (defaults to 5)
* ping.warning : Threshold for warning state, in seconds (defaults to 1.0)
* ping.critical: Threshold for critical state, in seconds (defaults to 2.0)
"""
import re
import time
import threading
import core.module
import core.widget
import core.event
import core.decorators
import util.cli
def get_rtt(module, widget):
try:
widget.set("rtt-unreachable", False)
res = util.cli.execute(
"ping -n -q -c {} -W {} {}".format(
widget.get("rtt-probes"),
widget.get("rtt-timeout"),
widget.get("address"),
)
)
for line in res.split("\n"):
if line.startswith(
"{} packets transmitted".format(widget.get("rtt-probes"))
):
m = re.search(r"(\d+)% packet loss", line)
widget.set("packet-loss", m.group(1))
if not line.startswith("rtt"):
continue
m = re.search(r"([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+)\s+(\S+)", line)
widget.set("rtt-min", float(m.group(1)))
widget.set("rtt-avg", float(m.group(2)))
widget.set("rtt-max", float(m.group(3)))
widget.set("rtt-unit", m.group(5))
except Exception as e:
widget.set("rtt-unreachable", True)
core.event.trigger("update", [module.id], redraw_only=True)
class Module(core.module.Module):
@core.decorators.every(seconds=60)
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.rtt))
widget = self.widget()
widget.set("address", self.parameter("address", "8.8.8.8"))
widget.set("rtt-probes", self.parameter("probes", 5))
widget.set("rtt-timeout", self.parameter("timeout", 5.0))
widget.set("rtt-avg", 0.0)
widget.set("rtt-unit", "")
widget.set("packet-loss", 0)
def rtt(self, widget):
if widget.get("rtt-unreachable"):
return "{}: unreachable".format(widget.get("address"))
return "{}: {:.1f}{} ({}%)".format(
widget.get("address"),
widget.get("rtt-avg"),
widget.get("rtt-unit"),
widget.get("packet-loss"),
)
def state(self, widget):
if widget.get("rtt-unreachable"):
return ["critical"]
return self.threshold_state(widget.get("rtt-avg"), 1000.0, 2000.0)
def update(self):
thread = threading.Thread(target=get_rtt, args=(self, self.widget(),))
thread.start()
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,200 @@
# pylint: disable=C0111,R0903
"""Displays volume and mute status and controls for PulseAudio devices. Use wheel up and down to change volume, left click mutes, right click opens pavucontrol.
Aliases: pasink (use this to control output instead of input), pasource
Parameters:
* pulseaudio.autostart: If set to 'true' (default is 'false'), automatically starts the pulseaudio daemon if it is not running
* pulseaudio.percent_change: How much to change volume by when scrolling on the module (default is 2%)
* pulseaudio.limit: Upper limit for setting the volume (default is 0%, which means 'no limit')
Note: If the left and right channels have different volumes, the limit might not be reached exactly.
* pulseaudio.showbars: 1 for showing volume bars, requires --markup=pango;
0 for not showing volume bars (default)
Requires the following executable:
* pulseaudio
* pactl
* pavucontrol
"""
import re
import logging
import core.module
import core.widget
import core.input
import util.cli
import util.graph
import util.format
class Module(core.module.Module):
def __init__(self, config, theme, channel):
super().__init__(config, theme, core.widget.Widget(self.volume))
if util.format.asbool(self.parameter("autostart", False)):
util.cli.execute("pulseaudio --start", ignore_errors=True)
self._change = util.format.asint(
self.parameter("percent_change", "2%").strip("%"), 0, 100
)
self._limit = util.format.asint(self.parameter("limit", "0%").strip("%"), 0)
self._left = 0
self._right = 0
self._mono = 0
self._mute = False
self._failed = False
self._channel = channel
self._showbars = util.format.asbool(self.parameter("showbars", 0))
self._patterns = [
{"expr": "Name:", "callback": (lambda line: False)},
{
"expr": "Mute:",
"callback": (
lambda line: self.mute(False if " no" in line.lower() else True)
),
},
{"expr": "Volume:", "callback": self.getvolume},
]
core.input.register(self, button=core.input.RIGHT_MOUSE, cmd="pavucontrol")
events = [
{"type": "mute", "action": self.toggle, "button": core.input.LEFT_MOUSE},
{
"type": "volume",
"action": self.increase_volume,
"button": core.input.WHEEL_UP,
},
{
"type": "volume",
"action": self.decrease_volume,
"button": core.input.WHEEL_DOWN,
},
]
for event in events:
core.input.register(self, button=event["button"], cmd=event["action"])
def set_volume(self, amount):
util.cli.execute(
"pactl set-{}-{} @DEFAULT_{}@ {}".format(
self._channel, "volume", self._channel.upper(), amount
)
)
def increase_volume(self, event):
if self._limit > 0: # we need to check the limit
left = int(self._left)
right = int(self._right)
if (
left + self._change >= self._limit
or right + self._change >= self._limit
):
if left == right:
# easy case, just set to limit
self.set_volume("{}%".format(self._limit))
return
else:
# don't adjust anymore, since i don't know how to update only one channel
return
self.set_volume("+{}%".format(self._change))
def decrease_volume(self, event):
self.set_volume("-{}%".format(self._change))
def toggle(self, event):
util.cli.execute(
"pactl set-{}-mute @DEFAULT_{}@ toggle".format(
self._channel, self._channel.upper()
)
)
def mute(self, value):
self._mute = value
def getvolume(self, line):
if "mono" in line:
m = re.search(r"mono:.*\s*\/\s*(\d+)%", line)
if m:
self._mono = m.group(1)
else:
m = re.search(r"left:.*\s*\/\s*(\d+)%.*right:.*\s*\/\s*(\d+)%", line)
if m:
self._left = m.group(1)
self._right = m.group(2)
def _default_device(self):
output = util.cli.execute("pactl info")
pattern = "Default {}: ".format("Sink" if self._channel == "sink" else "Source")
for line in output.split("\n"):
if line.startswith(pattern):
return line.replace(pattern, "")
logging.error("no pulseaudio device found")
return "n/a"
def volume(self, widget):
if self._failed == True:
return "n/a"
if int(self._mono) > 0:
vol = "{}%".format(self._mono)
if self._showbars:
vol = "{} {}".format(vol, util.graph.hbar(float(self._mono)))
return vol
elif self._left == self._right:
vol = "{}%".format(self._left)
if self._showbars:
vol = "{} {}".format(vol, util.graph.hbar(float(self._left)))
return vol
else:
vol = "{}%/{}%".format(self._left, self._right)
if self._showbars:
vol = "{} {}{}".format(
vol,
util.graph.hbar(float(self._left)),
util.graph.hbar(float(self._right)),
)
return vol
def update(self):
try:
self._failed = False
channel = "sinks" if self._channel == "sink" else "sources"
device = self._default_device()
result = util.cli.execute("pactl list {}".format(channel))
found = False
for line in result.split("\n"):
if "Name: {}".format(device) in line:
found = True
continue
if found is False:
continue
for pattern in self._patterns:
if not pattern["expr"] in line:
continue
if pattern["callback"](line) is False and found == True:
return
except Exception as e:
self._failed = True
logging.exception(e)
if util.format.asbool(self.parameter("autostart", False)):
util.cli.execute("pulseaudio --start", ignore_errors=True)
else:
raise e
def state(self, widget):
if self._mute:
return ["warning", "muted"]
if int(self._left) > int(100):
return ["critical", "unmuted"]
return ["unmuted"]
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,113 @@
# pylint: disable=C0111,R0903
"""Displays the current color temperature of redshift
Requires the following executable:
* redshift
Parameters:
* redshift.location : location provider, either of 'auto' (default), 'geoclue2',
'ipinfo' or 'manual'
'auto' uses whatever redshift is configured to do
* redshift.lat : latitude if location is set to 'manual'
* redshift.lon : longitude if location is set to 'manual'
* redshift.show_transition: information about the transitions (x% day) defaults to True
"""
import re
import threading
import core.module
import core.widget
import core.input
import core.decorators
import util.cli
import util.format
import util.location
def get_redshift_value(module):
widget = module.widget()
location = module.parameter("location", "auto")
lat = module.parameter("lat", None)
lon = module.parameter("lon", None)
# Even if location method is set to manual, if we have no lat or lon,
# fall back to the geoclue2 method.
if location == "manual" and (lat is None or lon is None):
location = "geoclue2"
command = ["redshift", "-p"]
if location == "manual":
command.extend(["-l", "{}:{}".format(lat, lon)])
if location == "geoclue2":
command.extend(["-l", "geoclue2"])
try:
res = util.cli.execute(" ".join(command))
except Exception:
res = ""
widget.set("temp", "n/a")
widget.set("transition", "")
widget.set("state", "day")
for line in res.split("\n"):
line = line.lower()
if "temperature" in line:
widget.set("temp", line.split(" ")[2])
if "period" in line:
state = line.split(" ")[1]
if "day" in state:
widget.set("state", "day")
elif "night" in state:
widget.set("state", "night")
else:
widget.set("state", "transition")
match = re.search(r"(\d+)\.\d+% ([a-z]+)", line)
widget.set(
"transition", "({}% {})".format(match.group(1), match.group(2))
)
core.event.trigger("update", [widget.module.id], redraw_only=True)
class Module(core.module.Module):
@core.decorators.every(seconds=10)
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.text))
self.__thread = None
self.show_transition = util.format.asbool(
self.parameter("show_transition", True)
)
if self.parameter("location", "") == "ipinfo":
# override lon/lat with ipinfo
try:
location = util.location.coordinates()
self.set("lat", location[0])
self.set("lon", location[1])
self.set("location", "manual")
except Exception:
# Fall back to geoclue2.
self.set("location", "geoclue2")
self._text = ""
def text(self, widget):
val = widget.get("temp", "n/a")
transition = widget.get("transition", "")
if transition and self.show_transition:
val = "{} {}".format(val, transition)
return val
def update(self):
if self.__thread is not None and self.__thread.isAlive():
return
self.__thread = threading.Thread(target=get_redshift_value, args=(self,))
self.__thread.start()
def state(self, widget):
return widget.get("state", None)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,242 @@
# -*- coding: UTF-8 -*-
"""Displays sensor temperature and CPU frequency
Parameters:
* sensors2.chip: 'sensors -u' compatible filter for chip to display (default to empty - show all chips)
* sensors2.showcpu: Enable or disable CPU frequency display (default: true)
* sensors2.showtemp: Enable or disable temperature display (default: true)
* sensors2.showfan: Enable or disable fan display (default: true)
* sensors2.showother: Enable or display 'other' sensor readings (default: false)
* sensors2.showname: Enable or disable show of sensor name (default: false)
* sensors2.chip_include: Comma-separated list of chip to include (defaults to '' will include all by default, example: 'coretemp,bat')
* sensors2.chip_exclude:Comma separated list of chip to exclude (defaults to '' will exlude none by default)
* sensors2.field_include: Comma separated list of chip to include (defaults to '' will include all by default, example: 'temp,fan')
* sensors2.field_exclude: Comma separated list of chip to exclude (defaults to '' will exclude none by default)
* sensors2.chip_field_exclude: Comma separated list of chip field to exclude (defaults to '' will exclude none by default, example: 'coretemp-isa-0000.temp1,coretemp-isa-0000.fan1')
* sensors2.chip_field_include: Comma-separated list of adaper field to include (defaults to '' will include all by default)
"""
import re
import core.module
import core.widget
import util.cli
import util.format
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, [])
self.__chip = self.parameter("chip", "")
self.__data = {}
self.__update()
self.__create_widgets()
def update(self):
self.__update()
for widget in self.widgets():
self.__update_widget(widget)
def state(self, widget):
widget_type = widget.get("type", "")
try:
data = self.__data[widget.get("adapter")][widget.get("package")][
widget.get("field")
]
if "crit" in data and float(data["input"]) > float(data["crit"]):
return ["critical", widget_type]
if "max" in data and float(data["input"]) > float(data["max"]):
return ["warning", widget_type]
except:
pass
return [widget_type]
def __create_widgets(self):
show_temp = util.format.asbool(self.parameter("showtemp", True))
show_fan = util.format.asbool(self.parameter("showfan", True))
show_other = util.format.asbool(self.parameter("showother", False))
include_chip = tuple(
filter(len, util.format.aslist(self.parameter("chip_include", "")))
)
exclude_chip = tuple(
filter(len, util.format.aslist(self.parameter("chip_exclude", "")))
)
include_field = tuple(
filter(len, util.format.aslist(self.parameter("field_include", "")))
)
exclude_field = tuple(
filter(len, util.format.aslist(self.parameter("field_exclude", "")))
)
include_chip_field = tuple(
filter(len, util.format.aslist(self.parameter("chip_field_include", "")))
)
exclude_chip_field = tuple(
filter(len, util.format.aslist(self.parameter("chip_field_exclude", "")))
)
if util.format.asbool(self.parameter("showcpu", True)):
widget = self.add_widget(full_text=self.__cpu)
widget.set("type", "cpu")
for adapter in self.__data:
if include_chip or exclude_chip:
if include_chip:
if all([chip not in adapter for chip in include_chip]):
continue
else:
if any([chip in adapter for chip in exclude_chip]):
continue
if include_chip_field:
try:
if all(
[i.split(".")[0] not in adapter for i in include_chip_field]
):
continue
except:
pass
for package in self.__data[adapter]:
if util.format.asbool(self.parameter("showname", False)):
widget = self.add_widget(full_text=package)
widget.set("data", self.__data[adapter][package])
widget.set("package", package)
widget.set("field", "")
widget.set("adapter", adapter)
for field in self.__data[adapter][package]:
if include_field or exclude_field:
if include_field:
if all(
[included not in field for included in include_field]
):
continue
else:
if any([excluded in field for excluded in exclude_field]):
continue
try:
if include_chip_field or exclude_chip_field:
if include_chip_field:
if all(
[
i.split(".")[1] not in field
for i in include_chip_field
if i.split(".")[0] in adapter
]
):
continue
else:
if any(
[
i.split(".")[1] in field
for i in exclude_chip_field
if i.split(".")[0] in adapter
]
):
continue
except:
pass
widget = None
if "temp" in field and show_temp:
# seems to be a temperature
widget = self.add_widget()
widget.set("type", "temp")
if "fan" in field and show_fan:
# seems to be a fan
widget = self.add_widget()
widget.set("type", "fan")
elif show_other:
# everything else
widget = self.add_widget()
widget.set("type", "other")
if widget:
widget.set("package", package)
widget.set("field", field)
widget.set("adapter", adapter)
def __update_widget(self, widget):
if widget.get("field", "") == "":
return # nothing to do
data = self.__data[widget.get("adapter")][widget.get("package")][
widget.get("field")
]
if "temp" in widget.get("field"):
widget.full_text("{:0.01f}°C".format(data["input"]))
elif "fan" in widget.get("field"):
widget.full_text("{:0.0f}RPM".format(data["input"]))
else:
widget.full_text("{:0.0f}".format(data["input"]))
def __update(self):
output = util.cli.execute(
"sensors -u {}".format(self.__chip), ignore_errors=True
)
self.__data = self.__parse(output)
def __parse(self, data):
output = {}
package = ""
adapter = None
chip = None
for line in data.split("\n"):
if "Adapter" in line:
# new adapter
line = line.replace("Adapter: ", "")
output[chip + " " + line] = {}
adapter = chip + " " + line
chip = line # default - line before adapter is always the chip
if not adapter:
continue
key, value = (line.split(":") + ["", ""])[:2]
if not line.startswith(" "):
# assume this starts a new package
if package in output[adapter] and output[adapter][package] == {}:
del output[adapter][package]
output[adapter][key] = {}
package = key
else:
# feature for this chip
try:
name, variant = (key.strip().split("_", 1) + ["", ""])[:2]
if not name in output[adapter][package]:
output[adapter][package][name] = {}
if variant:
output[adapter][package][name][variant] = {}
output[adapter][package][name][variant] = float(value)
except Exception as e:
pass
return output
def __cpu(self, _):
mhz = None
try:
output = open(
"/sys/devices/system/cpu/cpufreq/policy0/scaling_cur_freq"
).read()
mhz = int(float(output) / 1000.0)
except:
output = open("/proc/cpuinfo").read()
m = re.search(r"cpu MHz\s+:\s+(\d+)", output)
if m:
mhz = int(m.group(1))
else:
m = re.search(r"BogoMIPS\s+:\s+(\d+)", output)
if m:
return "{} BogoMIPS".format(int(m.group(1)))
if not mhz:
return "n/a"
if mhz < 1000:
return "{} MHz".format(mhz)
else:
return "{:0.01f} GHz".format(float(mhz) / 1000.0)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,24 @@
# pylint: disable=C0111,R0903
"""Draws a widget with configurable text content.
Parameters:
* spacer.text: Widget contents (defaults to empty string)
"""
import core.module
import core.widget
import core.decorators
class Module(core.module.Module):
@core.decorators.every(minutes=60)
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.text))
self.__text = self.parameter("text", "")
def text(self, _):
return self.__text
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,15 @@
# pylint: disable=C0111,R0903
"""Test module
"""
import core.widget
import core.module
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config=config, theme=theme, widgets=core.widget.Widget("test"))
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,23 @@
# pylint: disable=C0111,R0903
"""Displays the current date and time.
Parameters:
* time.format: strftime()-compatible formatting string
* time.locale: locale to use rather than the system default
"""
import core.decorators
from .datetime import Module
class Module(Module):
@core.decorators.every(seconds=59) # ensures one update per minute
def __init__(self, config, theme):
super().__init__(config, theme)
def default_format(self):
return "%X"
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,100 @@
# pylint: disable=C0111,R0903
"""Copy passwords from a password store into the clipboard (currently supports only 'pass')
Many thanks to [@bbernhard](https://github.com/bbernhard) for the idea!
Parameters:
* vault.duration: Duration until password is cleared from clipboard (defaults to 30)
* vault.location: Location of the password store (defaults to ~/.password-store)
* vault.offx: x-axis offset of popup menu (defaults to 0)
* vault.offy: y-axis offset of popup menu (defaults to 0)
Many thanks to `bbernhard <https://github.com/bbernhard>`_ for the idea!
"""
# TODO:
# - support multiple backends by abstracting the menu structure into a tree
# - build the menu and the actions based on that abstracted tree
#
import os
import time
import threading
import core.module
import core.widget
import core.input
import core.event
import util.cli
import util.popup
def build_menu(parent, current_directory, callback):
with os.scandir(current_directory) as it:
for entry in it:
if entry.name.startswith("."):
continue
if entry.is_file():
name = entry.name[: entry.name.rfind(".")]
parent.add_menuitem(
name,
callback=lambda: callback(os.path.join(current_directory, name)),
)
else:
submenu = util.popup.menu(parent, leave=False)
build_menu(
submenu, os.path.join(current_directory, entry.name), callback
)
parent.add_cascade(entry.name, submenu)
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.text))
self.__duration = int(self.parameter("duration", 30))
self.__offx = int(self.parameter("offx", 0))
self.__offy = int(self.parameter("offy", 0))
self.__path = os.path.expanduser(
self.parameter("location", "~/.password-store/")
)
self.__reset()
core.input.register(self, button=core.input.LEFT_MOUSE, cmd=self.popup)
def popup(self, widget):
menu = util.popup.menu(leave=False)
build_menu(menu, self.__path, self.__callback)
menu.show(widget, offset_x=self.__offx, offset_y=self.__offy)
def __reset(self):
self.__timer = None
self.__text = str(self.parameter("text", "<click-for-password>"))
def __callback(self, secret_name):
secret_name = secret_name.replace(self.__path, "") # remove common path
if self.__timer:
self.__timer.cancel()
res = util.cli.execute(
"pass -c {}".format(secret_name),
wait=False,
env={"PASSWORD_STORE_CLIP_TIME": self.__duration},
)
self.__timer = threading.Timer(self.__duration, self.__reset)
self.__timer.start()
self.__start = int(time.time())
self.__text = secret_name
def text(self, widget):
if self.__timer:
return "{} ({}s)".format(
self.__text, self.__duration - (int(time.time()) - self.__start)
)
return self.__text
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,138 @@
# pylint: disable=C0111,R0903
"""Shows a widget for each connected screen and allows the user to enable/disable screens.
Parameters:
* xrandr.overwrite_i3config: If set to 'true', this module assembles a new i3 config
every time a screen is enabled or disabled by taking the file '~/.config/i3/config.template'
and appending a file '~/.config/i3/config.<screen name>' for every screen.
* xrandr.autoupdate: If set to 'false', does *not* invoke xrandr automatically. Instead, the
module will only refresh when displays are enabled or disabled (defaults to true)
Requires the following python module:
* (optional) i3 - if present, the need for updating the widget list is auto-detected
Requires the following executable:
* xrandr
"""
import os
import re
import sys
import core.module
import core.input
import core.decorators
import util.cli
import util.format
try:
import i3
except:
pass
class Module(core.module.Module):
@core.decorators.every(seconds=5) # takes up to 5s to detect a new screen
def __init__(self, config, theme):
widgets = []
super().__init__(config, theme, widgets)
self._autoupdate = util.format.asbool(self.parameter("autoupdate", True))
self._needs_update = True
try:
i3.Subscription(self._output_update, "output")
except:
pass
def _output_update(self, event, data, _):
self._needs_update = True
def update(self):
self.clear_widgets()
if self._autoupdate == False and self._needs_update == False:
return
self._needs_update = False
for line in util.cli.execute("xrandr -q").split("\n"):
if not " connected" in line:
continue
display = line.split(" ", 2)[0]
m = re.search(r"\d+x\d+\+(\d+)\+\d+", line)
widget = self.widget(display)
if not widget:
widget = self.add_widget(
full_text=display, name=display
)
core.input.register(widget, button=1, cmd=self._toggle)
core.input.register(widget, button=3, cmd=self._toggle)
widget.set("state", "on" if m else "off")
widget.set("pos", int(m.group(1)) if m else sys.maxsize)
if self._autoupdate == False:
widget = self.add_widget(full_text="")
widget.set("state", "refresh")
core.input.register(widget, button=1, cmd=self._refresh)
def state(self, widget):
return widget.get("state", "off")
def _refresh(self, event):
self._needs_update = True
def _toggle(self, event):
self._refresh(self, event)
path = os.path.dirname(os.path.abspath(__file__))
if util.format.asbool(self.parameter("overwrite_i3config", False)) == True:
toggle_cmd = "{}/../../bin/toggle-display.sh".format(path)
else:
toggle_cmd = "xrandr"
widget = self.widget_by_id(event["instance"])
if widget.get("state") == "on":
util.cli.execute("{} --output {} --off".format(toggle_cmd, widget.name))
else:
first_neighbor = next(
(widget for widget in self.widgets() if widget.get("state") == "on"),
None,
)
last_neighbor = next(
(
widget
for widget in reversed(self.widgets())
if widget.get("state") == "on"
),
None,
)
neighbor = (
first_neighbor
if event["button"] == core.input.LEFT_MOUSE
else last_neighbor
)
if neighbor is None:
util.cli.execute(
"{} --output {} --auto".format(toggle_cmd, widget.name)
)
else:
util.cli.execute(
"{} --output {} --auto --{}-of {}".format(
toggle_cmd,
widget.name,
"left"
if event.get("button") == core.input.LEFT_MOUSE
else "right",
neighbor.name,
)
)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4