diff --git a/.codeclimate.yml b/.codeclimate.yml new file mode 100644 index 0000000..ec80874 --- /dev/null +++ b/.codeclimate.yml @@ -0,0 +1,15 @@ +engines: + duplication: + enabled: true + config: + languages: + - python + fixme: + enabled: true + radon: + enabled: true +ratings: + paths: + - "**.py" +exclude_paths: +- tests/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..2029c77 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: python +python: + - "2.7" + - "3.3" + - "3.4" + - "3.5" +install: + - pip install psutil + - pip install netifaces +script: nosetests -v tests/ +#addons: +# code_climate: +# repo_token: 40cb00907f7a10e04868e856570bb997ab9c42fd3b63d980f2b2269433195fdf diff --git a/README.md b/README.md index d039179..0157695 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # bumblebee-status +[![Build Status](https://travis-ci.org/tobi-wan-kenobi/bumblebee-status.svg?branch=engine-rework)](https://travis-ci.org/tobi-wan-kenobi/bumblebee-status) [![Code Climate](https://codeclimate.com/github/tobi-wan-kenobi/bumblebee-status/badges/gpa.svg)](https://codeclimate.com/github/tobi-wan-kenobi/bumblebee-status) [![Issue Count](https://codeclimate.com/github/tobi-wan-kenobi/bumblebee-status/badges/issue_count.svg)](https://codeclimate.com/github/tobi-wan-kenobi/bumblebee-status) + bumblebee-status is a modular, theme-able status line generator for the [i3 window manager](https://i3wm.org/). Focus is on: diff --git a/bin/customupdates b/bin/pacman-updates similarity index 99% rename from bin/customupdates rename to bin/pacman-updates index 3176c9d..baf0b93 100755 --- a/bin/customupdates +++ b/bin/pacman-updates @@ -1,4 +1,5 @@ #!/usr/bin/bash + if ! type -P fakeroot >/dev/null; then error 'Cannot find the fakeroot binary.' exit 1 diff --git a/bumblebee-status b/bumblebee-status index 42258a4..248ff52 100755 --- a/bumblebee-status +++ b/bumblebee-status @@ -1,16 +1,36 @@ #!/usr/bin/env python import sys -import bumblebee.config +import bumblebee.theme import bumblebee.engine +import bumblebee.config +import bumblebee.output +import bumblebee.input def main(): config = bumblebee.config.Config(sys.argv[1:]) - - engine = bumblebee.engine.Engine(config) - engine.load_modules() + theme = bumblebee.theme.Theme(config.theme()) + output = bumblebee.output.I3BarOutput(theme=theme) + inp = bumblebee.input.I3BarInput() + engine = bumblebee.engine.Engine( + config=config, + output=output, + inp=inp, + ) engine.run() +# try: +# except KeyboardInterrupt as error: +# inp.stop() +# sys.exit(0) +# except bumblebee.error.BaseError as error: +# inp.stop() +# sys.stderr.write("fatal: {}\n".format(error)) +# sys.exit(1) +# except Exception as error: +# inp.stop() +# sys.stderr.write("fatal: {}\n".format(error)) +# sys.exit(2) if __name__ == "__main__": main() diff --git a/bumblebee/config.py b/bumblebee/config.py index 501393d..2a72599 100644 --- a/bumblebee/config.py +++ b/bumblebee/config.py @@ -1,116 +1,50 @@ -import os +"""Configuration handling + +This module provides configuration information (loaded modules, +module parameters, etc.) to all other components +""" + import argparse -import textwrap +import bumblebee.store -import bumblebee.theme -import bumblebee.module +MODULE_HELP = "" +THEME_HELP = "" +PARAMETER_HELP = "" -class print_usage(argparse.Action): - def __init__(self, option_strings, dest, nargs=None, **kwargs): - argparse.Action.__init__(self, option_strings, dest, nargs, **kwargs) - self._indent = " "*4 +def create_parser(): + """Create the argument parser""" + parser = argparse.ArgumentParser(description="display system data in the i3bar") + parser.add_argument("-m", "--modules", nargs="+", default=[], + help=MODULE_HELP) + parser.add_argument("-t", "--theme", default="default", help=THEME_HELP) + parser.add_argument("-p", "--parameters", nargs="+", default=[], + help=PARAMETER_HELP) + return parser - def __call__(self, parser, namespace, value, option_string=None): - if value == "modules": - self.print_modules() - elif value == "themes": - self.print_themes() - else: - parser.print_help() - parser.exit() +class Config(bumblebee.store.Store): + """Top-level configuration class - def print_themes(self): - print(textwrap.fill(", ".join(bumblebee.theme.themes()), - 80, initial_indent = self._indent, subsequent_indent = self._indent - )) + Parses commandline arguments and provides non-module + specific configuration information. + """ + def __init__(self, args=None): + super(Config, self).__init__() + parser = create_parser() + self._args = parser.parse_args(args if args else []) - def print_modules(self): - for m in bumblebee.module.modules(): - print(textwrap.fill("{}: {}".format(m.name(), m.description()), - 80, initial_indent=self._indent*2, subsequent_indent=self._indent*3)) - print("{}Parameters:".format(self._indent*2)) - for p in m.parameters(): - print(textwrap.fill("* {}".format(p), - 80, initial_indent=self._indent*3, subsequent_indent=self._indent*4)) - print("") - -class ModuleConfig(object): - def __init__(self, config, prefix): - self._prefix = prefix - self._config = config - - def set(self, name, value): - name = self._prefix + name - return self._config.set(name, value) - - def parameter(self, name, default=None): - name = self._prefix + name - return self._config.parameter(name, default) - - def increase(self, name, limit, default): - name = self._prefix + name - return self._config.increase(name, limit, default) - -class Config(object): - def __init__(self, args): - self._parser = self._parser() - self._store = {} - - if len(args) == 0: - self._parser.print_help() - self._parser.exit() - - self._args = self._parser.parse_args(args) - - for p in self._args.parameters: - key, value = p.split("=") - self.parameter(key, value) - - def set(self, name, value): - self._store[name] = value - - def parameter(self, name, default=None): - if not name in self._store: - self.set(name, default) - return self._store.get(name, default) - - def increase(self, name, limit, default): - self._store[name] += 1 - if self._store[name] >= limit: - self._store[name] = default - return self._store[name] - - def theme(self): - return self._args.theme + for param in self._args.parameters: + key, value = param.split("=") + self.set(key, value) def modules(self): - result = [] - for m in self._args.modules: - items = m.split(":") - result.append({ "name": items[0], "alias": items[1] if len(items) > 1 else None }) - return result + """Return a list of all activated modules""" + return [{ + "module": x.split(":")[0], + "name": x if not ":" in x else x.split(":")[1], + } for x in self._args.modules] - def _parser(self): - parser = argparse.ArgumentParser(description="display system data in the i3bar") - parser.add_argument("-m", "--modules", nargs="+", - help="List of modules to load. The order of the list determines " - "their order in the i3bar (from left to right)", - default=[], - ) - parser.add_argument("-l", "--list", - help="List: 'modules', 'themes' ", - choices = [ "modules", "themes" ], - action=print_usage, - ) - parser.add_argument("-p", "--parameters", nargs="+", - help="Provide configuration parameters to individual modules.", - default=[] - ) - parser.add_argument("-t", "--theme", help="Specify which theme to use for " - "drawing the modules", - default="default", - ) - - return parser + def theme(self): + """Return the name of the selected theme""" + return self._args.theme # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/engine.py b/bumblebee/engine.py index 1625ee7..0698171 100644 --- a/bumblebee/engine.py +++ b/bumblebee/engine.py @@ -1,34 +1,145 @@ +"""Core application engine""" + +import os +import time +import pkgutil import importlib -import bumblebee.theme -import bumblebee.output +import bumblebee.error import bumblebee.modules -class Engine: - def __init__(self, config): - self._modules = [] +def all_modules(): + """Return a list of available modules""" + result = [] + path = os.path.dirname(bumblebee.modules.__file__) + for mod in [name for _, name, _ in pkgutil.iter_modules([path])]: + result.append({ + "name": mod + }) + return result + +class Module(object): + """Module instance base class + + Objects of this type represent the modules that + the user configures. Concrete module implementations + (e.g. CPU utilization, disk usage, etc.) derive from + this base class. + """ + def __init__(self, engine, config={}, widgets=[]): + self.name = config.get("name", self.__module__.split(".")[-1]) self._config = config - self._theme = bumblebee.theme.Theme(config) - self._output = bumblebee.output.output(config) + self.id = self.name + self._widgets = [] + if widgets: + self._widgets = widgets if isinstance(widgets, list) else [widgets] - def load_module(self, modulespec): - name = modulespec["name"] - module = importlib.import_module("bumblebee.modules.{}".format(name)) - return getattr(module, "Module")(self._output, self._config, modulespec["alias"]) + def widgets(self): + """Return the widgets to draw for this module""" + return self._widgets - def load_modules(self): - for m in self._config.modules(): - self._modules.append(self.load_module(m)) + def widget(self, name): + for widget in self._widgets: + if widget.name == name: + return widget + + def widget_by_id(self, uid): + for widget in self._widgets: + if widget.id == uid: + return widget + return None + + def update(self, widgets): + """By default, update() is a NOP""" + pass + + def parameter(self, name, default=None): + """Return the config parameter 'name' for this module""" + name = "{}.{}".format(self.name, name) + return self._config["config"].get(name, default) + + def threshold_state(self, value, warn, crit): + if value > float(self.parameter("critical", crit)): + return "critical" + if value > float(self.parameter("warning", warn)): + return "warning" + return None + +class Engine(object): + """Engine for driving the application + + This class connects input/output, instantiates all + required modules and drives the "event loop" + """ + def __init__(self, config, output=None, inp=None): + self._output = output + self._config = config + self._running = True + self._modules = [] + self.input = inp + self._aliases = self._read_aliases() + self.load_modules(config.modules()) + + self.input.register_callback(None, bumblebee.input.WHEEL_UP, + "i3-msg workspace prev_on_output") + self.input.register_callback(None, bumblebee.input.WHEEL_DOWN, + "i3-msg workspace next_on_output") + + self.input.start() + + def load_modules(self, modules): + """Load specified modules and return them as list""" + for module in modules: + self._modules.append(self._load_module(module["module"], module["name"])) + return self._modules + + def _read_aliases(self): + result = {} + for module in all_modules(): + mod = importlib.import_module("bumblebee.modules.{}".format(module["name"])) + for alias in getattr(mod, "ALIASES", []): + result[alias] = module["name"] + return result + + def _load_module(self, module_name, config_name=None): + """Load specified module and return it as object""" + if module_name in self._aliases: + config_name is config_name if config_name else module_name + module_name = self._aliases[module_name] + if config_name is None: + config_name = module_name + try: + module = importlib.import_module("bumblebee.modules.{}".format(module_name)) + except ImportError as error: + raise bumblebee.error.ModuleLoadError(error) + return getattr(module, "Module")(self, { + "name": config_name, + "config": self._config + }) + + def running(self): + """Check whether the event loop is running""" + return self._running + + def stop(self): + """Stop the event loop""" + self._running = False def run(self): + """Start the event loop""" self._output.start() - - while True: - self._theme.begin() - for m in self._modules: - self._output.draw(m.widgets(), self._theme) + while self.running(): + self._output.begin() + for module in self._modules: + module.update(module.widgets()) + for widget in module.widgets(): + widget.link_module(module) + self._output.draw(widget=widget, module=module, engine=self) self._output.flush() - self._output.wait() + self._output.end() + if self.running(): + self.input.wait(self._config.get("interval", 1)) self._output.stop() + self.input.stop() # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/error.py b/bumblebee/error.py new file mode 100644 index 0000000..129f02d --- /dev/null +++ b/bumblebee/error.py @@ -0,0 +1,15 @@ +"""Collection of all exceptions raised by this tool""" + +class BaseError(Exception): + """Base class for all exceptions generated by this tool""" + pass + +class ModuleLoadError(BaseError): + """Raised whenever loading a module fails""" + pass + +class ThemeLoadError(BaseError): + """Raised whenever loading a theme fails""" + pass + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/input.py b/bumblebee/input.py new file mode 100644 index 0000000..ba41362 --- /dev/null +++ b/bumblebee/input.py @@ -0,0 +1,122 @@ +"""Input classes""" + +import sys +import json +import uuid +import time +import select +import threading +import bumblebee.util + +LEFT_MOUSE = 1 +RIGHT_MOUSE = 3 +WHEEL_UP = 4 +WHEEL_DOWN = 5 + +def read_input(inp): + """Read i3bar input and execute callbacks""" + while inp.running: + for thread in threading.enumerate(): + if thread.name == "MainThread" and not thread.is_alive(): + return + + rlist, _, _ = select.select([sys.stdin], [], [], 1) + if not rlist: + continue + line = sys.stdin.readline().strip(",").strip() + inp.has_event = True + try: + event = json.loads(line) + inp.callback(event) + inp.redraw() + except ValueError: + pass + inp.has_event = True + inp.clean_exit = True + +class I3BarInput(object): + """Process incoming events from the i3bar""" + def __init__(self): + self.running = True + self._callbacks = {} + self.clean_exit = False + self.global_id = str(uuid.uuid4()) + self.need_event = False + self.has_event = False + self._condition = threading.Condition() + + def start(self): + """Start asynchronous input processing""" + self.has_event = False + self.running = True + self._condition.acquire() + self._thread = threading.Thread(target=read_input, args=(self,)) + self._thread.start() + + def redraw(self): + self._condition.acquire() + self._condition.notify() + self._condition.release() + + def alive(self): + """Check whether the input processing is still active""" + return self._thread.is_alive() + + def wait(self, timeout): + self._condition.wait(timeout) + + def _wait(self): + while not self.has_event: + time.sleep(0.1) + self.has_event = False + + def stop(self): + """Stop asynchronous input processing""" + self._condition.release() + if self.need_event: + self._wait() + self.running = False + self._thread.join() + return self.clean_exit + + def _uuidstr(self, name, button): + return "{}::{}".format(name, button) + + def _uid(self, obj, button): + uid = self.global_id + if obj: + uid = obj.id + return self._uuidstr(uid, button) + + def deregister_callbacks(self, obj): + to_delete = [] + uid = obj.id if obj else self.global_id + for key in self._callbacks: + if uid in key: + to_delete.append(key) + for key in to_delete: + del self._callbacks[key] + + def register_callback(self, obj, button, cmd): + """Register a callback function or system call""" + uid = self._uid(obj, button) + if uid not in self._callbacks: + self._callbacks[uid] = {} + self._callbacks[uid] = cmd + + def callback(self, event): + """Execute callback action for an incoming event""" + button = event["button"] + + cmd = self._callbacks.get(self._uuidstr(self.global_id, button), None) + cmd = self._callbacks.get(self._uuidstr(event["name"], button), cmd) + cmd = self._callbacks.get(self._uuidstr(event["instance"], button), cmd) + + if cmd is None: + return + if callable(cmd): + cmd(event) + else: + bumblebee.util.execute(cmd, False) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/module.py b/bumblebee/module.py deleted file mode 100644 index 6152777..0000000 --- a/bumblebee/module.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -import pkgutil -import importlib - -import bumblebee.config -import bumblebee.modules - -def modules(): - result = [] - path = os.path.dirname(bumblebee.modules.__file__) - for mod in [ name for _, name, _ in pkgutil.iter_modules([path])]: - result.append(ModuleDescription(mod)) - return result - -class ModuleDescription(object): - def __init__(self, name): - self._name = name - self._mod =importlib.import_module("bumblebee.modules.{}".format(name)) - - def name(self): - return str(self._name) - - def description(self): - return getattr(self._mod, "description", lambda: "n/a")() - - def parameters(self): - return getattr(self._mod, "parameters", lambda: [ "n/a" ])() - -class Module(object): - def __init__(self, output, config, alias=None): - self._output = output - self._alias = alias - name = "{}.".format(alias if alias else self.__module__.split(".")[-1]) - self._config = bumblebee.config.ModuleConfig(config, name) - - buttons = [ - { "name": "left-click", "id": 1 }, - { "name": "middle-click", "id": 2 }, - { "name": "right-click", "id": 3 }, - { "name": "wheel-up", "id": 4 }, - { "name": "wheel-down", "id": 5 }, - ] - for button in buttons: - if self._config.parameter(button["name"], None): - output.add_callback( - module=self.instance(), - button=button["id"], - cmd=self._config.parameter(button["name"]) - ) - - def critical(self, widget): - return False - - def warning(self, widget): - return False - - def state(self, widget): - return "default" - - def instance(self, widget=None): - return self._alias if self._alias else self.__module__.split(".")[-1] - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/battery.py b/bumblebee/modules/battery.py index 2a570df..90e58f6 100644 --- a/bumblebee/modules/battery.py +++ b/bumblebee/modules/battery.py @@ -1,52 +1,76 @@ -import datetime -import bumblebee.module -import os.path +# pylint: disable=C0111,R0903 -def description(): - return "Displays battery status, percentage and whether it's charging or discharging." +"""Displays battery status, remaining percentage and charging information. -def parameters(): - return [ "battery.device: The device to read from (defaults to BAT0)" ] +Parameters: + * battery.device : The device to read information from (defaults to BAT0) + * battery.warning : Warning threshold in % of remaining charge (defaults to 20) + * battery.critical: Critical threshold in % of remaining charge (defaults to 10) +""" -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._battery = config.parameter("device", "BAT0") +import os + +import bumblebee.input +import bumblebee.output +import bumblebee.engine + +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.capacity) + ) + battery = self.parameter("device", "BAT0") + self._path = "/sys/class/power_supply/{}".format(battery) self._capacity = 100 - self._status = "Unknown" + self._ac = False - def widgets(self): - self._AC = False; - self._path = "/sys/class/power_supply/{}".format(self._battery) + def capacity(self, widget): + if self._ac: + return "ac" + if self._capacity == -1: + return "n/a" + return "{:03d}%".format(self._capacity) + + def update(self, widgets): + widget = widgets[0] + self._ac = False if not os.path.exists(self._path): - self._AC = True; - return bumblebee.output.Widget(self,"AC") + self._ac = True + self._capacity = 100 + return - with open(self._path + "/capacity") as f: - self._capacity = int(f.read()) + try: + with open(self._path + "/capacity") as f: + self._capacity = int(f.read()) + except IOError: + self._capacity = -1 self._capacity = self._capacity if self._capacity < 100 else 100 - return bumblebee.output.Widget(self,"{:02d}%".format(self._capacity)) - - def warning(self, widget): - return self._capacity < self._config.parameter("warning", 20) - - def critical(self, widget): - return self._capacity < self._config.parameter("critical", 10) - def state(self, widget): - if self._AC: - return "AC" + state = [] - with open(self._path + "/status") as f: - self._status = f.read().strip() + if self._capacity < 0: + return ["critical", "unknown"] - if self._status == "Discharging": - status = "discharging-{}".format(min([ 10, 25, 50, 80, 100] , key=lambda i:abs(i-self._capacity))) - return status + if self._capacity < int(self.parameter("critical", 10)): + state.append("critical") + elif self._capacity < int(self.parameter("warning", 20)): + state.append("warning") + + if self._ac: + state.append("AC") else: - if self._capacity > 95: - return "charged" - return "charging" + charge = "" + with open(self._path + "/status") as f: + charge = f.read().strip() + if charge == "Discharging": + state.append("discharging-{}".format(min([10, 25, 50, 80, 100] , key=lambda i:abs(i-self._capacity)))) + else: + if self._capacity > 95: + state.append("charged") + else: + state.append("charging") + + return state # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/brightness.py b/bumblebee/modules/brightness.py index 114c11e..99712f0 100644 --- a/bumblebee/modules/brightness.py +++ b/bumblebee/modules/brightness.py @@ -1,33 +1,33 @@ -import bumblebee.module +# pylint: disable=C0111,R0903 -def description(): - return "Displays brightness percentage" +"""Displays the brightness of a display -def parameters(): - return [ - "brightness.step: Steps (in percent) to increase/decrease brightness on scroll (defaults to 2)", - ] +Parameters: + * brightness.step: The amount of increase/decrease on scroll in % (defaults to 2) +""" -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) +import bumblebee.input +import bumblebee.output +import bumblebee.engine + +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.brightness) + ) self._brightness = 0 - self._max = 0 - self._percent = 0 - step = self._config.parameter("step", 2) + step = self.parameter("step", 2) - output.add_callback(module=self.instance(), button=4, cmd="xbacklight +{}%".format(step)) - output.add_callback(module=self.instance(), button=5, cmd="xbacklight -{}%".format(step)) + engine.input.register_callback(self, button=bumblebee.input.WHEEL_UP, + cmd="xbacklight +{}%".format(step)) + engine.input.register_callback(self, button=bumblebee.input.WHEEL_DOWN, + cmd="xbacklight -{}%".format(step)) - def widgets(self): - with open("/sys/class/backlight/intel_backlight/brightness") as f: - self._brightness = int(f.read()) - with open("/sys/class/backlight/intel_backlight/max_brightness") as f: - self._max = int(f.read()) - self._brightness = self._brightness if self._brightness < self._max else self._max - self._percent = int(round(self._brightness * 100 / self._max)) + def brightness(self, widget): + return "{:03.0f}%".format(self._brightness) - return bumblebee.output.Widget(self, "{:02d}%".format(self._percent)) + def update(self, widgets): + self._brightness = float(bumblebee.util.execute("xbacklight -get")) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/caffeine.py b/bumblebee/modules/caffeine.py index 39f20d5..bf57be6 100644 --- a/bumblebee/modules/caffeine.py +++ b/bumblebee/modules/caffeine.py @@ -1,38 +1,44 @@ -import subprocess -import shlex +# pylint: disable=C0111,R0903 -import bumblebee.module +"""Enable/disable automatic screen locking. +""" -def description(): - return "Enable/disable auto screen lock." +import bumblebee.input +import bumblebee.output +import bumblebee.engine -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._activated = 0 - output.add_callback(module="caffeine.activate", button=1, cmd=[ 'notify-send "Consuming caffeine"', 'xset s off' ]) - output.add_callback(module="caffeine.deactivate", button=1, cmd=[ 'notify-send "Out of coffee"', 'xset s default' ]) +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.caffeine) + ) + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, + cmd=self._toggle + ) - def widgets(self): - output = subprocess.check_output(shlex.split("xset q")) - xset_out = output.decode().split("\n") - for line in xset_out: - if line.startswith(" timeout"): - timeout = int(line.split(" ")[4]) - if timeout == 0: - self._activated = 1; - else: - self._activated = 0; - break - - if self._activated == 0: - return bumblebee.output.Widget(self, "", instance="caffeine.activate") - elif self._activated == 1: - return bumblebee.output.Widget(self, "", instance="caffeine.deactivate") + def caffeine(self, widget): + return "" def state(self, widget): - if self._activated == 1: + if self._active(): return "activated" - else: - return "deactivated" + return "deactivated" + def _active(self): + for line in bumblebee.util.execute("xset q").split("\n"): + if "timeout" in line: + timeout = int(line.split(" ")[4]) + if timeout == 0: + return True + return False + return False + + def _toggle(self, widget): + if self._active(): + bumblebee.util.execute("xset s default") + bumblebee.util.execute("notify-send \"Out of coffee\"") + else: + bumblebee.util.execute("xset s off") + bumblebee.util.execute("notify-send \"Consuming caffeine\"") + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/cmus.py b/bumblebee/modules/cmus.py index b847ad2..991a1a1 100644 --- a/bumblebee/modules/cmus.py +++ b/bumblebee/modules/cmus.py @@ -1,78 +1,85 @@ -import string -import datetime -import subprocess +# pylint: disable=C0111,R0903 + +"""Displays information about the current song in cmus. + +Parameters: + * cmus.format: Format string for the song information. Tag values can be put in curly brackets (i.e. {artist}) +""" + from collections import defaultdict +import string + import bumblebee.util -import bumblebee.module +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - return "Displays the current song and artist playing in cmus" - -def parameters(): - return [ - "cmus.format: Format of the displayed song information, arbitrary tags (as available from cmus-remote -Q) can be used (defaults to {artist} - {title} {position}/{duration})" - ] - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._status = "default" - self._fmt = self._config.parameter("format", "{artist} - {title} {position}/{duration}") - - output.add_callback(module="cmus.prev", button=1, cmd="cmus-remote -r") - output.add_callback(module="cmus.next", button=1, cmd="cmus-remote -n") - output.add_callback(module="cmus.shuffle", button=1, cmd="cmus-remote -S") - output.add_callback(module="cmus.repeat", button=1, cmd="cmus-remote -R") - output.add_callback(module=self.instance(), button=1, cmd="cmus-remote -u") - - def _loadsong(self): - process = subprocess.Popen(["cmus-remote", "-Q"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self._query, self._error = process.communicate() - self._query = self._query.decode("utf-8").split("\n") - self._status = "default" - - def _tags(self): - tags = defaultdict(lambda: '') - self._repeat = False - self._shuffle = False - for line in self._query: - if line.startswith("status"): - status = line.split(" ", 2)[1] - self._status = status - if line.startswith("tag"): - key, value = line.split(" ", 2)[1:] - tags.update({ key: value }) - if line.startswith("duration"): - sec = line.split(" ")[1] - tags.update({ "duration": bumblebee.util.durationfmt(int(sec)) }) - if line.startswith("position"): - sec = line.split(" ")[1] - tags.update({ "position": bumblebee.util.durationfmt(int(sec)) }) - if line.startswith("set repeat "): - self._repeat = False if line.split(" ")[2] == "false" else True - if line.startswith("set shuffle "): - self._shuffle = False if line.split(" ")[2] == "false" else True - - return tags - - def widgets(self): - self._loadsong() - tags = self._tags() - - return [ - bumblebee.output.Widget(self, "", instance="cmus.prev"), - bumblebee.output.Widget(self, string.Formatter().vformat(self._fmt, (), tags)), - bumblebee.output.Widget(self, "", instance="cmus.next"), - bumblebee.output.Widget(self, "", instance="cmus.shuffle"), - bumblebee.output.Widget(self, "", instance="cmus.repeat"), +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + widgets = [ + bumblebee.output.Widget(name="cmus.prev"), + bumblebee.output.Widget(name="cmus.main", full_text=self.description), + bumblebee.output.Widget(name="cmus.next"), + bumblebee.output.Widget(name="cmus.shuffle"), + bumblebee.output.Widget(name="cmus.repeat"), ] + super(Module, self).__init__(engine, config, widgets) + + engine.input.register_callback(widgets[0], button=bumblebee.input.LEFT_MOUSE, + cmd="cmus-remote -r") + engine.input.register_callback(widgets[1], button=bumblebee.input.LEFT_MOUSE, + cmd="cmus-remote -u") + engine.input.register_callback(widgets[2], button=bumblebee.input.LEFT_MOUSE, + cmd="cmus-remote -n") + engine.input.register_callback(widgets[3], button=bumblebee.input.LEFT_MOUSE, + cmd="cmus-remote -S") + engine.input.register_callback(widgets[4], button=bumblebee.input.LEFT_MOUSE, + cmd="cmus-remote -R") + + self._fmt = self.parameter("format", "{artist} - {title} {position}/{duration}") + self._status = None + self._shuffle = False + self._repeat = False + self._tags = defaultdict(lambda: '') + + def description(self, widget): + return string.Formatter().vformat(self._fmt, (), self._tags) + + def update(self, widgets): + self._load_song() def state(self, widget): - if widget.instance() == "cmus.shuffle": - return "on" if self._shuffle else "off" - if widget.instance() == "cmus.repeat": - return "on" if self._repeat else "off" + if widget.name == "cmus.shuffle": + return "shuffle-on" if self._shuffle else "shuffle-off" + if widget.name == "cmus.repeat": + return "repeat-on" if self._repeat else "repeat-off" + if widget.name == "cmus.prev": + return "prev" + if widget.name == "cmus.next": + return "next" return self._status + def _load_song(self): + info = "" + try: + info = bumblebee.util.execute("cmus-remote -Q") + except RuntimeError: + pass + self._tags = defaultdict(lambda: '') + for line in info.split("\n"): + if line.startswith("status"): + self._status = line.split(" ", 2)[1] + if line.startswith("tag"): + key, value = line.split(" ", 2)[1:] + self._tags.update({ key: value }) + for key in ["duration", "position"]: + if line.startswith(key): + dur = int(line.split(" ")[1]) + self._tags.update({key:bumblebee.util.durationfmt(dur)}) + if line.startswith("set repeat "): + self._repeat = False if "false" in line else True + if line.startswith("set shuffle "): + self._shuffle = False if "false" in line else True + # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/cpu.py b/bumblebee/modules/cpu.py index b156b37..79e9229 100644 --- a/bumblebee/modules/cpu.py +++ b/bumblebee/modules/cpu.py @@ -1,30 +1,33 @@ -import bumblebee.module +# pylint: disable=C0111,R0903 + +"""Displays CPU utilization across all CPUs. + +Parameters: + * cpu.warning : Warning threshold in % of CPU usage (defaults to 70%) + * cpu.critical: Critical threshold in % of CPU usage (defaults to 80%) +""" + import psutil +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - return "Displays CPU utilization across all CPUs." +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.utilization) + ) + self._utilization = psutil.cpu_percent(percpu=False) + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, + cmd="gnome-system-monitor") -def parameters(): - return [ - "cpu.warning: Warning threshold in % of disk usage (defaults to 70%)", - "cpu.critical: Critical threshold in % of disk usage (defaults to 80%)", - ] + def utilization(self, widget): + return "{:06.02f}%".format(self._utilization) -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._perc = psutil.cpu_percent(percpu=False) + def update(self, widgets): + self._utilization = psutil.cpu_percent(percpu=False) - output.add_callback(module=self.instance(), button=1, cmd="gnome-system-monitor") - - def widgets(self): - self._perc = psutil.cpu_percent(percpu=False) - return bumblebee.output.Widget(self, "{:05.02f}%".format(self._perc)) - - def warning(self, widget): - return self._perc > self._config.parameter("warning", 70) - - def critical(self, widget): - return self._perc > self._config.parameter("critical", 80) + def state(self, widget): + return self.threshold_state(self._utilization, 70, 80) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/date.py b/bumblebee/modules/date.py deleted file mode 120000 index bde6404..0000000 --- a/bumblebee/modules/date.py +++ /dev/null @@ -1 +0,0 @@ -datetime.py \ No newline at end of file diff --git a/bumblebee/modules/datetime.py b/bumblebee/modules/datetime.py deleted file mode 120000 index 5a370f9..0000000 --- a/bumblebee/modules/datetime.py +++ /dev/null @@ -1 +0,0 @@ -time.py \ No newline at end of file diff --git a/bumblebee/modules/datetime.py b/bumblebee/modules/datetime.py new file mode 100644 index 0000000..4141a53 --- /dev/null +++ b/bumblebee/modules/datetime.py @@ -0,0 +1,35 @@ +# pylint: disable=C0111,R0903 + +"""Displays the current date and time. + +Parameters: + * datetime.format: strftime()-compatible formatting string + * date.format : alias for datetime.format + * time.format : alias for datetime.format +""" + +from __future__ import absolute_import +import datetime +import bumblebee.engine + +ALIASES = [ "date", "time" ] + +def default_format(module): + default = "%x %X" + if module == "date": + default = "%x" + if module == "time": + default = "%X" + return default + +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.get_time) + ) + self._fmt = self.parameter("format", default_format(self.name)) + + def get_time(self, widget): + return datetime.datetime.now().strftime(self._fmt) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/disk.py b/bumblebee/modules/disk.py index 679e271..e052b88 100644 --- a/bumblebee/modules/disk.py +++ b/bumblebee/modules/disk.py @@ -1,40 +1,45 @@ +# pylint: disable=C0111,R0903 + +"""Shows free diskspace, total diskspace and the percentage of free disk space. + +Parameters: + * disk.warning: Warning threshold in % of disk space (defaults to 80%) + * disk.critical: Critical threshold in % of disk space (defaults ot 90%) + * disk.path: Path to calculate disk usage from (defaults to /) +""" + import os -import bumblebee.util -import bumblebee.module -def description(): - return "Shows free diskspace, total diskspace and the percentage of free disk space." +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def parameters(): - return [ - "disk.warning: Warning threshold in % (defaults to 80%)", - "disk.critical: Critical threshold in % (defaults to 90%)" - ] +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.diskspace) + ) + self._path = self.parameter("path", "/") + self._perc = 0 + self._used = 0 + self._size = 0 -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._path = self._config.parameter("path", "/") + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, + cmd="nautilus {}".format(self._path)) - output.add_callback(module=self.instance(), button=1, cmd="nautilus {}".format(self._path)) + def diskspace(self, widget): + return "{} {}/{} ({:05.02f}%)".format(self._path, + bumblebee.util.bytefmt(self._used), + bumblebee.util.bytefmt(self._size), self._perc + ) - def widgets(self): + def update(self, widgets): st = os.statvfs(self._path) - self._size = st.f_frsize*st.f_blocks self._used = self._size - st.f_frsize*st.f_bavail self._perc = 100.0*self._used/self._size - return bumblebee.output.Widget(self, - "{} {}/{} ({:05.02f}%)".format(self._path, - bumblebee.util.bytefmt(self._used), - bumblebee.util.bytefmt(self._size), self._perc) - ) - - def warning(self, widget): - return self._perc > self._config.parameter("warning", 80) - - def critical(self, widget): - return self._perc > self._config.parameter("critical", 90) + def state(self, widget): + return self.threshold_state(self._perc, 80, 90) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/dnf.py b/bumblebee/modules/dnf.py index ead5065..f6acf2a 100644 --- a/bumblebee/modules/dnf.py +++ b/bumblebee/modules/dnf.py @@ -1,98 +1,79 @@ -from __future__ import absolute_import +# pylint: disable=C0111,R0903 + +"""Displays DNF package update information (///) + +Parameters: + * dnf.interval: Time in seconds between two consecutive update checks (defaulst to 1800) + +""" import time -import shlex import threading -import subprocess -import bumblebee.module import bumblebee.util +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - return "Checks DNF for updated packages and displays the number of /// pending updates." +def get_dnf_info(widget): + try: + res = bumblebee.util.execute("dnf updateinfo") + except RuntimeError: + pass -def parameters(): - return [ "dnf.interval: Time in seconds between two checks for updates (defaults to 1800)" ] + security = 0 + bugfixes = 0 + enhancements = 0 + other = 0 + for line in res.decode().split("\n"): -def get_dnf_info(obj): - loops = obj.interval() + if not line.startswith(" "): continue + elif "ecurity" in line: + for s in line.split(): + if s.isdigit(): security += int(s) + elif "ugfix" in line: + for s in line.split(): + if s.isdigit(): bugfixes += int(s) + elif "hancement" in line: + for s in line.split(): + if s.isdigit(): enhancements += int(s) + else: + for s in line.split(): + if s.isdigit(): other += int(s) - for thread in threading.enumerate(): - if thread.name == "MainThread": - main = thread + widget.set("security", security) + widget.set("bugfixes", bugfixes) + widget.set("enhancements", enhancements) + widget.set("other", other) - while main.is_alive(): - loops += 1 - if loops < obj.interval(): - time.sleep(1) - continue +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + widget = bumblebee.output.Widget(full_text=self.updates) + super(Module, self).__init__(engine, config, widget) - loops = 0 - try: - res = subprocess.check_output(shlex.split("dnf updateinfo")) - except Exception as e: - break + self._next_check = 0 + widget - security = 0 - bugfixes = 0 - enhancements = 0 - other = 0 - for line in res.decode().split("\n"): - - if not line.startswith(" "): continue - elif "ecurity" in line: - for s in line.split(): - if s.isdigit(): security += int(s) - elif "ugfix" in line: - for s in line.split(): - if s.isdigit(): bugfixes += int(s) - elif "hancement" in line: - for s in line.split(): - if s.isdigit(): enhancements += int(s) - else: - for s in line.split(): - if s.isdigit(): other += int(s) - - obj.set("security", security) - obj.set("bugfixes", bugfixes) - obj.set("enhancements", enhancements) - obj.set("other", other) - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - - self._counter = {} - self._thread = threading.Thread(target=get_dnf_info, args=(self,)) - self._thread.start() - - def interval(self): - return self._config.parameter("interval", 30*60) - - def set(self, what, value): - self._counter[what] = value - - def get(self, what): - return self._counter.get(what, 0) - - def widgets(self): + def updates(self, widget): result = [] - for t in [ "security", "bugfixes", "enhancements", "other" ]: - result.append(str(self.get(t))) + for t in ["security", "bugfixes", "enhancements", "other"]: + result.append(str(widget.get(t, 0))) + return "/".join(result) - return bumblebee.output.Widget(self, "/".join(result)) + def update(self, widgets): + if int(time.time()) < self._next_check: + return + thread = threading.Thread(target=get_dnf_info, args=(widgets[0],)) + thread.start() + self._next_check = int(time.time()) + self.parameter("interval", 30*60) def state(self, widget): - total = sum(self._counter.values()) - if total == 0: return "good" - return "default" - - def warning(self, widget): - total = sum(self._counter.values()) - return total > 0 - - def critical(self, widget): - total = sum(self._counter.values()) - return total > 50 or self._counter.get("security", 0) > 0 + cnt = 0 + for t in ["security", "bugfixes", "enhancements", "other"]: + cnt += widget.get(t, 0) + if cnt == 0: + return "good" + if cnt > 50 or widget.get("security", 0) > 0: + return "critical" # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/layout.py b/bumblebee/modules/layout.py deleted file mode 100644 index 084d27d..0000000 --- a/bumblebee/modules/layout.py +++ /dev/null @@ -1,67 +0,0 @@ -import subprocess -import shlex -import bumblebee.module -import bumblebee.util - -def description(): - return "Showws current keyboard layout and change it on click." - -def parameters(): - return [ - "layout.lang: pipe-separated list of languages to cycle through (e.g. us|rs|de). Default: en" - ] - - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - - self._languages = self._config.parameter("lang", "en").split("|") - self._idx = 0 - - output.add_callback(module=self.instance(), button=1, cmd=self.next_keymap) - output.add_callback(module=self.instance(), button=3, cmd=self.prev_keymap) - - def next_keymap(self, event, widget): - self._idx = self._idx + 1 if self._idx < len(self._languages) - 1 else 0 - self.set_keymap() - - def prev_keymap(self, event, widget): - self._idx = self._idx - 1 if self._idx > 0 else len(self._languages) - 1 - self.set_keymap() - - def set_keymap(self): - tmp = self._languages[self._idx].split(":") - layout = tmp[0] - variant = "" - if len(tmp) > 1: - variant = "-variant {}".format(tmp[1]) - bumblebee.util.execute("setxkbmap -layout {} {}".format(layout, variant)) - - def widgets(self): - res = bumblebee.util.execute("setxkbmap -query") - layout = None - variant = None - for line in res.split("\n"): - if not line: - continue - if "layout" in line: - layout = line.split(":")[1].strip() - if "variant" in line: - variant = line.split(":")[1].strip() - if variant: - layout += ":" + variant - - lang = self._languages[self._idx] - - if lang != layout: - if layout in self._languages: - self._idx = self._languages.index(layout) - else: - self._languages.append(layout) - self._idx = len(self._languages) - 1 - lang = layout - - return bumblebee.output.Widget(self, lang) - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/load.py b/bumblebee/modules/load.py index fba5dbd..c44ae5a 100644 --- a/bumblebee/modules/load.py +++ b/bumblebee/modules/load.py @@ -1,37 +1,41 @@ -import bumblebee.module -import multiprocessing +# pylint: disable=C0111,R0903 + +"""Displays system load. + +Parameters: + * load.warning : Warning threshold for the one-minute load average (defaults to 70% of the number of CPUs) + * load.critical: Critical threshold for the one-minute load average (defaults to 80% of the number of CPUs) +""" + import os +import multiprocessing -def description(): - return "Displays system load." +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def parameters(): - return [ - "load.warning: Warning threshold for the one-minute load average (defaults to 70% of the number of CPUs)", - "load.critical: Critical threshold for the one-minute load average (defaults 80% of the number of CPUs)" - ] - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._cpus = 1 +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.load) + ) + self._load = [0, 0, 0] try: self._cpus = multiprocessing.cpu_count() except multiprocessing.NotImplementedError as e: - pass + self._cpus = 1 + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, + cmd="gnome-system-monitor") - output.add_callback(module=self.instance(), button=1, cmd="gnome-system-monitor") + def load(self, widget): + return "{:.02f}/{:.02f}/{:.02f}".format( + self._load[0], self._load[1], self._load[2] + ) - def widgets(self): + def update(self, widgets): self._load = os.getloadavg() - return bumblebee.output.Widget(self, "{:.02f}/{:.02f}/{:.02f}".format( - self._load[0], self._load[1], self._load[2])) - - def warning(self, widget): - return self._load[0] > self._config.parameter("warning", self._cpus*0.7) - - def critical(self, widget): - return self._load[0] > self._config.parameter("critical", self._cpus*0.8) + def state(self, widget): + return self.threshold_state(self._load[0], self._cpus*0.7, self._cpus*0.8) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/memory.py b/bumblebee/modules/memory.py index 70b10b2..0ab1176 100644 --- a/bumblebee/modules/memory.py +++ b/bumblebee/modules/memory.py @@ -1,38 +1,44 @@ +# pylint: disable=C0111,R0903 + +"""Displays available RAM, total amount of RAM and percentage available. + +Parameters: + * cpu.warning : Warning threshold in % of memory used (defaults to 80%) + * cpu.critical: Critical threshold in % of memory used (defaults to 90%) +""" + import psutil -import bumblebee.module + import bumblebee.util +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - return "Shows available RAM, total amount of RAM and the percentage of available RAM." - -def parameters(): - return [ - "memory.warning: Warning threshold in % of memory used (defaults to 80%)", - "memory.critical: Critical threshold in % of memory used (defaults to 90%)", - ] - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._mem = psutil.virtual_memory() - - output.add_callback(module=self.instance(), button=1, cmd="gnome-system-monitor") - - def widgets(self): +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.memory_usage) + ) self._mem = psutil.virtual_memory() + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, + cmd="gnome-system-monitor") + def memory_usage(self, widget): used = self._mem.total - self._mem.available - - return bumblebee.output.Widget(self, "{}/{} ({:05.02f}%)".format( + return "{}/{} ({:05.02f}%)".format( bumblebee.util.bytefmt(used), bumblebee.util.bytefmt(self._mem.total), - self._mem.percent) + self._mem.percent ) - def warning(self, widget): - return self._mem.percent > self._config.parameter("warning", 80) + def update(self, widgets): + self._mem = psutil.virtual_memory() - def critical(self, widget): - return self._mem.percent > self._config.parameter("critical", 90) + def state(self, widget): + if self._mem.percent > float(self.parameter("critical", 90)): + return "critical" + if self._mem.percent > float(self.parameter("warning", 80)): + return "warning" + return None # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/nic.py b/bumblebee/modules/nic.py index 94f8bc4..bd94f94 100644 --- a/bumblebee/modules/nic.py +++ b/bumblebee/modules/nic.py @@ -1,24 +1,58 @@ +#pylint: disable=C0111,R0903 + +"""Displays the name, IP address(es) and status of each available network interface. + +Parameters: + * nic.exclude: Comma-separated list of interface prefixes to exclude (defaults to "lo,virbr,docker,vboxnet,veth") +""" + import netifaces -import bumblebee.module -def description(): - return "Displays the names, IP addresses and status of each available interface." +import bumblebee.util +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def parameters(): - return [ - "nic.exclude: Comma-separated list of interface prefixes to exlude (defaults to: \"lo,virbr,docker,vboxnet,veth\")" - ] +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + widgets = [] + super(Module, self).__init__(engine, config, widgets) + self._exclude = tuple(filter(len, self.parameter("exclude", "lo,virbr,docker,vboxnet,veth").split(","))) + self._update_widgets(widgets) -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - self._exclude = tuple(filter(len, self._config.parameter("exclude", "lo,virbr,docker,vboxnet,veth").split(","))) - self._state = "down" - self._typecache = {} + def update(self, widgets): + self._update_widgets(widgets) - def widgets(self): - result = [] + def state(self, widget): + states = [] + + if widget.get("state") == "down": + states.append("critical") + elif widget.get("state") != "up": + states.append("warning") + + intf = widget.get("intf") + iftype = "wireless" if self._iswlan(intf) else "wired" + iftype = "tunnel" if self._istunnel(intf) else iftype + + states.append("{}-{}".format(iftype, widget.get("state"))) + + return states + + def _iswlan(self, intf): + # wifi, wlan, wlp, seems to work for me + if intf.startswith("w"): return True + return False + + def _istunnel(self, intf): + return intf.startswith("tun") + + def _update_widgets(self, widgets): interfaces = [ i for i in netifaces.interfaces() if not i.startswith(self._exclude) ] + + for widget in widgets: + widget.set("visited", False) + for intf in interfaces: addr = [] state = "down" @@ -30,37 +64,17 @@ class Module(bumblebee.module.Module): state = "up" except Exception as e: addr = [] - widget = bumblebee.output.Widget(self, "{} {} {}".format( - intf, state, ", ".join(addr) - )) + widget = self.widget(intf) + if not widget: + widget = bumblebee.output.Widget(name=intf) + widgets.append(widget) + widget.full_text("{} {} {}".format(intf, state, ", ".join(addr))) widget.set("intf", intf) widget.set("state", state) - result.append(widget) + widget.set("visited", True) - return result - - def _iswlan(self, intf): - # wifi, wlan, wlp, seems to work for me - if intf.startswith("w"): return True - return False - - def _istunnel(self, intf): - return intf.startswith("tun") - - def state(self, widget): - intf = widget.get("intf") - - if not intf in self._typecache: - t = "wireless" if self._iswlan(intf) else "wired" - t = "tunnel" if self._istunnel(intf) else t - self._typecache[intf] = t - - return "{}-{}".format(self._typecache[intf], widget.get("state")) - - def warning(self, widget): - return widget.get("state") != "up" - - def critical(self, widget): - return widget.get("state") == "down" + for widget in widgets: + if widget.get("visited") == False: + widgets.remove(widget) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/pacman.py b/bumblebee/modules/pacman.py index 66ece38..da3e66f 100644 --- a/bumblebee/modules/pacman.py +++ b/bumblebee/modules/pacman.py @@ -1,58 +1,60 @@ -import bumblebee.module -import subprocess +# pylint: disable=C0111,R0903 + +"""Displays update information per repository for pacman." +""" + import os +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - return "Displays available updates per repository for pacman." - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.updates) + ) self._count = 0 + self._out = "" - def widgets(self): + def updates(self, widget): + return self._out + + def update(self, widgets): path = os.path.dirname(os.path.abspath(__file__)) if self._count == 0: self._out = "?/?/?/?" - process = subprocess.Popen([ "{}/../../bin/customupdates".format(path) ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - self._query, self._error = process.communicate() - - if not process.returncode == 0: - self._out = "?/?/?/?" - else: + try: + result = bumblebee.util.execute("{}/../../bin/pacman-updates".format(path)) self._community = 0 self._core = 0 self._extra = 0 self._other = 0 - for line in self._query.splitlines(): - if line.startswith(b'http'): - if b"community" in line: + for line in result.splitlines(): + if line.startswith("http"): + if "community" in line: self._community += 1 continue - if b"core" in line: + if "core" in line: self._core += 1; continue - if b"extra" in line: + if "extra" in line: self._extra += 1 continue self._other += 1 self._out = str(self._core)+"/"+str(self._extra)+"/"+str(self._community)+"/"+str(self._other) - + except RuntimeError: + self._out = "?/?/?/?" + + # TODO: improve this waiting mechanism a bit self._count += 1 self._count = 0 if self._count > 300 else self._count - return bumblebee.output.Widget(self, "{}".format(self._out)) def sumUpdates(self): return self._core + self._community + self._extra + self._other - def critical(self, widget): - #return self._sumUpdates(self) - return self.sumUpdates() > 0 - - - - + def state(self, widget): + if self.sumUpdates() > 0: + return "critical" # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/pasink.py b/bumblebee/modules/pasink.py deleted file mode 120000 index 85ea20d..0000000 --- a/bumblebee/modules/pasink.py +++ /dev/null @@ -1 +0,0 @@ -pulseaudio.py \ No newline at end of file diff --git a/bumblebee/modules/pasource.py b/bumblebee/modules/pasource.py deleted file mode 120000 index 85ea20d..0000000 --- a/bumblebee/modules/pasource.py +++ /dev/null @@ -1 +0,0 @@ -pulseaudio.py \ No newline at end of file diff --git a/bumblebee/modules/ping.py b/bumblebee/modules/ping.py index 20fec39..6217438 100644 --- a/bumblebee/modules/ping.py +++ b/bumblebee/modules/ping.py @@ -1,97 +1,74 @@ -from __future__ import absolute_import +# pylint: disable=C0111,R0903 + +"""Periodically checks the RTT of a configurable host using ICMP echos + +Parameters: + * ping.interval: Time in seconds between two RTT checks (defaults to 60) + * ping.address : IP address to check + * ping.timeout : Timeout for waiting for a reply (defaults to 5.0) + * ping.probes : Number of probes to send (defaults to 5) + * ping.warning : Threshold for warning state, in seconds (defaults to 1.0) + * ping.critical: Threshold for critical state, in seconds (defaults to 2.0) +""" import re import time -import shlex import threading -import subprocess -import bumblebee.module -import bumblebee.util +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - return "Periodically checks the RTT of a configurable IP" +def get_rtt(module, widget): + try: + widget.set("rtt-unreachable", False) + res = bumblebee.util.execute("ping -n -q -c {} -W {} {}".format( + widget.get("rtt-probes"), widget.get("rtt-timeout"), widget.get("address") + )) -def parameters(): - return [ - "ping.interval: Time in seconds between two RTT checks (defaults to 60)", - "ping.address: IP address to check", - "ping.warning: Threshold for warning state, in seconds (defaults to 1.0)", - "ping.critical: Threshold for critical state, in seconds (defaults to 2.0)", - "ping.timeout: Timeout for waiting for a reply (defaults to 5.0)", - "ping.probes: Number of probes to send (defaults to 5)", - ] + for line in res.split("\n"): + if not line.startswith("rtt"): continue + m = re.search(r'([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+)\s+(\S+)', line) -def get_rtt(obj): - loops = obj.get("interval") + widget.set("rtt-min", float(m.group(1))) + widget.set("rtt-avg", float(m.group(2))) + widget.set("rtt-max", float(m.group(3))) + widget.set("rtt-unit", m.group(5)) + except Exception as e: + widget.set("rtt-unreachable", True) - for thread in threading.enumerate(): - if thread.name == "MainThread": - main = thread +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + widget = bumblebee.output.Widget(full_text=self.rtt) + super(Module, self).__init__(engine, config, widget) - interval = obj.get("interval") - while main.is_alive(): - loops += 1 - if loops < interval: - time.sleep(1) - continue + widget.set("address", self.parameter("address", "8.8.8.8")) + widget.set("interval", self.parameter("interval", 60)) + widget.set("rtt-probes", self.parameter("probes", 5)) + widget.set("rtt-timeout", self.parameter("timeout", 5.0)) + widget.set("rtt-avg", 0.0) + widget.set("rtt-unit", "") - loops = 0 - try: - res = subprocess.check_output(shlex.split("ping -n -q -c {} -W {} {}".format( - obj.get("rtt-probes"), obj.get("rtt-timeout"), obj.get("address") - ))) - obj.set("rtt-unreachable", False) + self._next_check = 0 - for line in res.decode().split("\n"): - if not line.startswith("rtt"): continue - m = re.search(r'([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+)\s+(\S+)', line) - - obj.set("rtt-min", float(m.group(1))) - obj.set("rtt-avg", float(m.group(2))) - obj.set("rtt-max", float(m.group(3))) - obj.set("rtt-unit", m.group(5)) - except Exception as e: - obj.set("rtt-unreachable", True) - - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - - self._counter = {} - - self.set("address", self._config.parameter("address", "8.8.8.8")) - self.set("interval", self._config.parameter("interval", 60)) - self.set("rtt-probes", self._config.parameter("probes", 5)) - self.set("rtt-timeout", self._config.parameter("timeout", 5.0)) - - self._thread = threading.Thread(target=get_rtt, args=(self,)) - self._thread.start() - - def set(self, what, value): - self._counter[what] = value - - def get(self, what): - return self._counter.get(what, 0) - - def widgets(self): - text = "{}: {:.1f}{}".format( - self.get("address"), - self.get("rtt-avg"), - self.get("rtt-unit") + def rtt(self, widget): + if widget.get("rtt-unreachable"): + return "{}: unreachable".format(widget.get("address")) + return "{}: {:.1f}{}".format( + widget.get("address"), + widget.get("rtt-avg"), + widget.get("rtt-unit") ) - if self.get("rtt-unreachable"): - text = "{}: unreachable".format(self.get("address")) + def state(self, widget): + if widget.get("rtt-unreachable"): return ["critical"] + return self.threshold_state(widget.get("rtt-avg"), 1000.0, 2000.0) - return bumblebee.output.Widget(self, text) - - def warning(self, widget): - return self.get("rtt-avg") > float(self._config.parameter("warning", 1.0))*1000.0 - - def critical(self, widget): - if self.get("rtt-unreachable"): return True - return self.get("rtt-avg") > float(self._config.parameter("critical", 2.0))*1000.0 + def update(self, widgets): + if int(time.time()) < self._next_check: + return + thread = threading.Thread(target=get_rtt, args=(self,widgets[0],)) + thread.start() + self._next_check = int(time.time()) + widgets[0].get("interval") # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/pulseaudio.py b/bumblebee/modules/pulseaudio.py index a8cda45..b13eda3 100644 --- a/bumblebee/modules/pulseaudio.py +++ b/bumblebee/modules/pulseaudio.py @@ -1,60 +1,68 @@ +# pylint: disable=C0111,R0903 + +"""Displays volume and mute status of PulseAudio devices. + +Aliases: pasink, pasource +""" + import re -import shlex -import subprocess -import bumblebee.module import bumblebee.util +import bumblebee.input +import bumblebee.output +import bumblebee.engine -def description(): - module = __name__.split(".")[-1] - if module == "pasink": - return "Shows volume and mute status of the default PulseAudio Sink." - if module == "pasource": - return "Shows volume and mute status of the default PulseAudio Source." - return "See 'pasource'." +ALIASES = [ "pasink", "pasource" ] -def parameters(): - return [ "none" ] - +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.volume) + ) -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - - self._module = self.__module__.split(".")[-1] self._left = 0 self._right = 0 self._mono = 0 self._mute = False - channel = "sink" if self._module == "pasink" else "source" + channel = "sink" if self.name == "pasink" else "source" - output.add_callback(module=self.instance(), button=3, - cmd="pavucontrol") - output.add_callback(module=self.instance(), button=1, + engine.input.register_callback(self, button=bumblebee.input.RIGHT_MOUSE, cmd="pavucontrol") + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, cmd="pactl set-{}-mute @DEFAULT_{}@ toggle".format(channel, channel.upper())) - output.add_callback(module=self.instance(), button=4, + engine.input.register_callback(self, button=bumblebee.input.WHEEL_UP, cmd="pactl set-{}-volume @DEFAULT_{}@ +2%".format(channel, channel.upper())) - output.add_callback(module=self.instance(), button=5, + engine.input.register_callback(self, button=bumblebee.input.WHEEL_DOWN, cmd="pactl set-{}-volume @DEFAULT_{}@ -2%".format(channel, channel.upper())) - def widgets(self): - res = subprocess.check_output(shlex.split("pactl info")) - channel = "sinks" if self._module == "pasink" else "sources" - name = None - for line in res.decode().split("\n"): - if line.startswith("Default Sink: ") and channel == "sinks": - name = line[14:] - if line.startswith("Default Source: ") and channel == "sources": - name = line[16:] - - res = subprocess.check_output(shlex.split("pactl list {}".format(channel))) + def _default_device(self): + output = bumblebee.util.execute("pactl info") + pattern = "Default Sink: " if self.name == "pasink" else "Default Source: " + for line in output.split("\n"): + if line.startswith(pattern): + return line.replace(pattern, "") + return "n/a" + def volume(self, widget): + if int(self._mono) > 0: + return "{}%".format(self._mono) + elif self._left == self._right: + return "{}%".format(self._left) + else: + return "{}%/{}%".format(self._left, self._right) + return "n/a" + + def update(self, widgets): + channel = "sinks" if self.name == "pasink" else "sources" + device = self._default_device() + + result = bumblebee.util.execute("pactl list {}".format(channel)) found = False - for line in res.decode().split("\n"): + for line in result.split("\n"): if "Name:" in line and found == True: break - if name in line: + if device in line: found = True + if "Mute:" in line and found == True: self._mute = False if " no" in line.lower() else True @@ -71,22 +79,10 @@ class Module(bumblebee.module.Module): else: self._left = m.group(1) self._right = m.group(2) - result = "" - if int(self._mono) > 0: - result = "{}%".format(self._mono) - elif self._left == self._right: - result = "{}%".format(self._left) - else: - result="{}%/{}%".format(self._left, self._right) - return bumblebee.output.Widget(self, result) def state(self, widget): - return "muted" if self._mute is True else "unmuted" - - def warning(self, widget): - return self._mute - - def critical(self, widget): - return False + if self._mute: + return [ "warning", "muted" ] + return [ "unmuted" ] # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/spacer.py b/bumblebee/modules/spacer.py index 798338e..b89c233 100644 --- a/bumblebee/modules/spacer.py +++ b/bumblebee/modules/spacer.py @@ -1,17 +1,23 @@ -import bumblebee.module -import bumblebee.util +# pylint: disable=C0111,R0903 -def description(): - return "Draws a widget with configurable content." +"""Draws a widget with configurable text content. -def parameters(): - return [ "spacer.text: Text to draw (defaults to '')" ] +Parameters: + * spacer.text: Widget contents (defaults to empty string) +""" -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) +import bumblebee.input +import bumblebee.output +import bumblebee.engine - def widgets(self): - return bumblebee.output.Widget(self, self._config.parameter("text", "")) +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.text) + ) + self._text = self.parameter("text", "") + + def text(self, widget): + return self._text # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/test.py b/bumblebee/modules/test.py new file mode 100644 index 0000000..9f7485f --- /dev/null +++ b/bumblebee/modules/test.py @@ -0,0 +1,13 @@ +# pylint: disable=C0111,R0903 + +"""Test module""" + +import bumblebee.engine + +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text="test") + ) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/time.py b/bumblebee/modules/time.py deleted file mode 100644 index 4a19649..0000000 --- a/bumblebee/modules/time.py +++ /dev/null @@ -1,34 +0,0 @@ -from __future__ import absolute_import - -import datetime -import bumblebee.module - -def description(): - return "Displays the current time, using the optional format string as input for strftime." - -def parameters(): - module = __name__.split(".")[-1] - return [ - "{}.format: strftime specification (defaults to {})".format(module, default_format(module)) - ] - -def default_format(module): - default = "%x %X" - if module == "date": - default = "%x" - if module == "time": - default = "%X" - return default - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - - module = self.__module__.split(".")[-1] - - self._fmt = self._config.parameter("format", default_format(module)) - - def widgets(self): - return bumblebee.output.Widget(self, datetime.datetime.now().strftime(self._fmt)) - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/modules/xrandr.py b/bumblebee/modules/xrandr.py index 51998a9..def9235 100644 --- a/bumblebee/modules/xrandr.py +++ b/bumblebee/modules/xrandr.py @@ -1,89 +1,72 @@ -import bumblebee.module -import bumblebee.util -import re +# pylint: disable=C0111,R0903 + +"""Shows a widget for each connected screen and allows the user to enable/disable screens. + +""" + import os +import re import sys -import subprocess -def description(): - return "Shows all connected screens" - -def parameters(): - return [ - ] - -class Module(bumblebee.module.Module): - def __init__(self, output, config, alias): - super(Module, self).__init__(output, config, alias) - - self._widgets = [] - - def toggle(self, event, widget): - path = os.path.dirname(os.path.abspath(__file__)) - toggle_cmd = "{}/../../bin/toggle-display.sh".format(path) - - if widget.get("state") == "on": - bumblebee.util.execute("{} --output {} --off".format(toggle_cmd, widget.get("display"))) - else: - neighbor = None - for w in self._widgets: - if w.get("state") == "on": - neighbor = w - if event.get("button") == 1: - break - - if neighbor == None: - bumblebee.util.execute("{} --output {} --auto".format(toggle_cmd, - widget.get("display"))) - else: - bumblebee.util.execute("{} --output {} --auto --{}-of {}".format(toggle_cmd, - widget.get("display"), "left" if event.get("button") == 1 else "right", - neighbor.get("display"))) - - def widgets(self): - process = subprocess.Popen([ "xrandr", "-q" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, error = process.communicate() +import bumblebee.util +import bumblebee.input +import bumblebee.output +import bumblebee.engine +class Module(bumblebee.engine.Module): + def __init__(self, engine, config): widgets = [] + self._engine = engine + super(Module, self).__init__(engine, config, widgets) + self.update_widgets(widgets) - for line in output.split("\n"): + def update_widgets(self, widgets): + new_widgets = [] + for line in bumblebee.util.execute("xrandr -q").split("\n"): if not " connected" in line: continue display = line.split(" ", 2)[0] m = re.search(r'\d+x\d+\+(\d+)\+\d+', line) - widget = bumblebee.output.Widget(self, display, instance=display) - widget.set("display", display) - - # not optimal (add callback once per interval), but since - # add_callback() just returns if the callback has already - # been registered, it should be "ok" - self._output.add_callback(module=display, button=1, - cmd=self.toggle) - self._output.add_callback(module=display, button=3, - cmd=self.toggle) - if m: - widget.set("state", "on") - widget.set("pos", int(m.group(1))) - else: - widget.set("state", "off") - widget.set("pos", sys.maxint) + widget = self.widget(display) + if not widget: + widget = bumblebee.output.Widget(full_text=display, name=display) + self._engine.input.register_callback(widget, button=1, cmd=self._toggle) + self._engine.input.register_callback(widget, button=3, cmd=self._toggle) + new_widgets.append(widget) + widget.set("state", "on" if m else "off") + widget.set("pos", int(m.group(1)) if m else sys.maxint) + while len(widgets) > 0: + del widgets[0] + for widget in new_widgets: widgets.append(widget) - widgets.sort(key=lambda widget : widget.get("pos")) - - self._widgets = widgets - - return widgets + def update(self, widgets): + self.update_widgets(widgets) def state(self, widget): return widget.get("state", "off") - def warning(self, widget): - return False + def _toggle(self, event): + path = os.path.dirname(os.path.abspath(__file__)) + toggle_cmd = "{}/../../bin/toggle-display.sh".format(path) - def critical(self, widget): - return False + widget = self.widget_by_id(event["instance"]) + + if widget.get("state") == "on": + bumblebee.util.execute("{} --output {} --off".format(toggle_cmd, widget.name)) + else: + first_neighbor = next((widget for widget in self.widgets() if widget.get("state") == "on"), None) + last_neighbor = next((widget for widget in reversed(self.widgets()) if widget.get("state") == "on"), None) + + neighbor = first_neighbor if event["button"] == bumblebee.input.LEFT_MOUSE else last_neighbor + + if neighbor == None: + bumblebee.util.execute("{} --output {} --auto".format(toggle_cmd, widget.name)) + else: + bumblebee.util.execute("{} --output {} --auto --{}-of {}".format(toggle_cmd, widget.name, + "left" if event.get("button") == bumblebee.input.LEFT_MOUSE else "right", + neighbor.name)) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/output.py b/bumblebee/output.py index 1f494ad..974e651 100644 --- a/bumblebee/output.py +++ b/bumblebee/output.py @@ -1,123 +1,105 @@ -import os -import shlex -import inspect -import threading -import subprocess +# pylint: disable=R0201 -def output(args): - import bumblebee.outputs.i3 - return bumblebee.outputs.i3.Output(args) +"""Output classes""" -class Widget(object): - def __init__(self, obj, text, instance=None): - self._obj = obj - self._text = text - self._store = {} - self._instance = instance +import sys +import json +import uuid - obj._output.register_widget(self.instance(), self) +import bumblebee.store - def set(self, key, value): - self._store[key] = value +class Widget(bumblebee.store.Store): + """Represents a single visible block in the status bar""" + def __init__(self, full_text="", name=""): + super(Widget, self).__init__() + self._full_text = full_text + self.module = None + self._module = None + self.name = name + self.id = str(uuid.uuid4()) - def get(self, key, default=None): - return self._store.get(key, default) + def link_module(self, module): + """Set the module that spawned this widget + + This is done outside the constructor to avoid having to + pass in the module name in every concrete module implementation""" + self.module = module.name + self._module = module def state(self): - return self._obj.state(self) + """Return the widget's state""" + if self._module and hasattr(self._module, "state"): + states = self._module.state(self) + if not isinstance(states, list): + return [states] + return states + return [] - def warning(self): - return self._obj.warning(self) - - def critical(self): - return self._obj.critical(self) - - def module(self): - return self._obj.__module__.split(".")[-1] - - def instance(self): - return self._instance if self._instance else getattr(self._obj, "instance")(self) - - def text(self): - return self._text - -class Command(object): - def __init__(self, command, event, widget): - self._command = command - self._event = event - self._widget = widget - - def __call__(self, *args, **kwargs): - if not isinstance(self._command, list): - self._command = [ self._command ] - - for cmd in self._command: - if not cmd: continue - if inspect.ismethod(cmd): - cmd(self._event, self._widget) + def full_text(self, value=None): + """Set or retrieve the full text to display in the widget""" + if value: + self._full_text = value + else: + if callable(self._full_text): + return self._full_text(self) else: - c = cmd.format(*args, **kwargs) - DEVNULL = open(os.devnull, 'wb') - subprocess.Popen(shlex.split(c), stdout=DEVNULL, stderr=DEVNULL) + return self._full_text -class Output(object): - def __init__(self, config): - self._config = config - self._callbacks = {} - self._wait = threading.Condition() - self._wait.acquire() - self._widgets = {} - - def register_widget(self, identity, widget): - self._widgets[identity] = widget - - def redraw(self): - self._wait.acquire() - self._wait.notify() - self._wait.release() - - def add_callback(self, cmd, button, module=None): - if module: - module = module.replace("bumblebee.modules.", "") - - if self._callbacks.get((button, module)): return - - self._callbacks[( - button, - module, - )] = cmd - - def callback(self, event): - cb = self._callbacks.get(( - event.get("button", -1), - None, - ), None) - cb = self._callbacks.get(( - event.get("button", -1), - event.get("instance", event.get("module", None)), - ), cb) - - identity = event.get("instance", event.get("module", None)) - return Command(cb, event, self._widgets.get(identity, None)) - - def wait(self): - self._wait.wait(self._config.parameter("interval", 1)) +class I3BarOutput(object): + """Manage output according to the i3bar protocol""" + def __init__(self, theme): + self._theme = theme + self._widgets = [] def start(self): - pass - - def draw(self, widgets, theme): - if not type(widgets) is list: - widgets = [ widgets ] - self._draw(widgets, theme) - - def _draw(self, widgets, theme): - pass - - def flush(self): - pass + """Print start preamble for i3bar protocol""" + sys.stdout.write(json.dumps({"version": 1, "click_events": True}) + "[\n") def stop(self): - pass + """Finish i3bar protocol""" + sys.stdout.write("]\n") + + def draw(self, widget, module=None, engine=None): + """Draw a single widget""" + full_text = widget.full_text() + padding = self._theme.padding(widget) + prefix = self._theme.prefix(widget, padding) + suffix = self._theme.suffix(widget, padding) + if prefix: + full_text = u"{}{}".format(prefix, full_text) + if suffix: + full_text = u"{}{}".format(full_text, suffix) + separator = self._theme.separator(widget) + if separator: + self._widgets.append({ + u"full_text": separator, + "separator": False, + "color": self._theme.separator_fg(widget), + "background": self._theme.separator_bg(widget), + "separator_block_width": self._theme.separator_block_width(widget), + }) + self._widgets.append({ + u"full_text": full_text, + "color": self._theme.fg(widget), + "background": self._theme.bg(widget), + "separator_block_width": self._theme.separator_block_width(widget), + "separator": True if separator is None else False, + "instance": widget.id, + "name": module.id, + }) + + def begin(self): + """Start one output iteration""" + self._widgets = [] + self._theme.reset() + + def flush(self): + """Flushes output""" + sys.stdout.write(json.dumps(self._widgets)) + + def end(self): + """Finalizes output""" + sys.stdout.write(",\n") + sys.stdout.flush() # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/outputs/__init__.py b/bumblebee/outputs/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bumblebee/outputs/i3.py b/bumblebee/outputs/i3.py deleted file mode 100644 index c0800dd..0000000 --- a/bumblebee/outputs/i3.py +++ /dev/null @@ -1,82 +0,0 @@ -from __future__ import unicode_literals - -import os -import sys -import json -import shlex -import threading -import subprocess -import bumblebee.output - -def read_input(output): - while True: - line = sys.stdin.readline().strip(",").strip() - if line == "[": continue - if line == "]": break - - DEVNULL = open(os.devnull, 'wb') - - event = json.loads(line) - cb = output.callback(event) - if cb: - cb( - name=event.get("name", ""), - instance=event.get("instance", ""), - button=event.get("button", -1) - ) - output.redraw() - -class Output(bumblebee.output.Output): - def __init__(self, args): - super(Output, self).__init__(args) - self._data = [] - - self.add_callback("i3-msg workspace prev_on_output", 4) - self.add_callback("i3-msg workspace next_on_output", 5) - - self._thread = threading.Thread(target=read_input, args=(self,)) - self._thread.start() - - def start(self): - print(json.dumps({ "version": 1, "click_events": True }) + "[") - - def _draw(self, widgets, theme): - for widget in widgets: - if theme.separator(widget): - self._data.append({ - u"full_text": theme.separator(widget), - "color": theme.separator_color(widget), - "background": theme.separator_background(widget), - "separator": False, - "separator_block_width": 0, - }) - - sep = theme.default_separators(widget) - sep = sep if sep else False - width = theme.separator_block_width(widget) - width = width if width else 0 - self._data.append({ - u"full_text": " {} {} {}".format( - theme.prefix(widget), - widget.text(), - theme.suffix(widget) - ), - "color": theme.color(widget), - "background": theme.background(widget), - "name": widget.module(), - "instance": widget.instance(), - "separator": sep, - "separator_block_width": width, - }) - theme.next_widget() - - def flush(self): - data = json.dumps(self._data) - self._data = [] - print(data + ",") - sys.stdout.flush() - - def stop(self): - return "]" - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/store.py b/bumblebee/store.py new file mode 100644 index 0000000..8fac71b --- /dev/null +++ b/bumblebee/store.py @@ -0,0 +1,21 @@ +"""Store interface + +Allows arbitrary classes to offer a simple get/set +store interface by deriving from the Store class in +this module +""" + +class Store(object): + """Interface for storing and retrieving simple values""" + def __init__(self): + self._data = {} + + def set(self, key, value): + """Set 'key' to 'value', overwriting 'key' if it exists already""" + self._data[key] = value + + def get(self, key, default=None): + """Return the current value of 'key', or 'default' if 'key' is not set""" + return self._data.get(key, default) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/theme.py b/bumblebee/theme.py index cd8d860..3486f70 100644 --- a/bumblebee/theme.py +++ b/bumblebee/theme.py @@ -1,46 +1,154 @@ +# pylint: disable=C0103 + +"""Theme support""" + import os import copy import json -import yaml -import glob -def getpath(): +import bumblebee.error + +def theme_path(): + """Return the path of the theme directory""" return os.path.dirname("{}/../themes/".format(os.path.dirname(os.path.realpath(__file__)))) -def themes(): - d = getpath() - return [ os.path.basename(f).replace(".json", "") for f in glob.iglob("{}/*.json".format(d)) ] +class Theme(object): + """Represents a collection of icons and colors""" + def __init__(self, name): + self._init(self.load(name)) + self._widget = None + self._cycle_idx = 0 + self._cycle = {} + self._prevbg = None -class Theme: - def __init__(self, config): - self._config = config + def _init(self, data): + """Initialize theme from data structure""" + for iconset in data.get("icons", []): + self._merge(data, self._load_icons(iconset)) + self._theme = data + self._defaults = data.get("defaults", {}) + self._cycles = self._theme.get("cycle", []) + self.reset() - self._data = self.get_theme(config.theme()) + def data(self): + """Return the raw theme data""" + return self._theme - for iconset in self._data.get("icons", []): - self.merge(self._data, self.get_theme(iconset)) + def reset(self): + """Reset theme to initial state""" + self._cycle = self._cycles[0] if len(self._cycles) > 0 else {} + self._cycle_idx = 0 + self._widget = None + self._prevbg = None - self._defaults = self._data.get("defaults", {}) - self._cycles = self._defaults.get("cycle", []) - self.begin() + def padding(self, widget): + """Return padding for widget""" + return self._get(widget, "padding", "") - def get_theme(self, name): - for path in [ getpath(), "{}/icons/".format(getpath()) ]: - if os.path.isfile("{}/{}.yaml".format(path, name)): - with open("{}/{}.yaml".format(path, name)) as f: - return yaml.load(f) - if os.path.isfile("{}/{}.json".format(path, name)): - with open("{}/{}.json".format(path, name)) as f: - return json.load(f) - return None + def prefix(self, widget, default=None): + """Return the theme prefix for a widget's full text""" + padding = self.padding(widget) + pre = self._get(widget, "prefix", None) + return u"{}{}{}".format(padding, pre, padding) if pre else default + + def suffix(self, widget, default=None): + """Return the theme suffix for a widget's full text""" + padding = self._get(widget, "padding", "") + suf = self._get(widget, "suffix", None) + return u"{}{}{}".format(padding, suf, padding) if suf else default + + def fg(self, widget): + """Return the foreground color for this widget""" + return self._get(widget, "fg", None) + + def bg(self, widget): + """Return the background color for this widget""" + return self._get(widget, "bg", None) + + def separator(self, widget): + """Return the separator between widgets""" + return self._get(widget, "separator", None) + + def separator_fg(self, widget): + """Return the separator's foreground/text color""" + return self.bg(widget) + + def separator_bg(self, widget): + """Return the separator's background color""" + return self._prevbg + + def separator_block_width(self, widget): + """Return the SBW""" + return self._get(widget, "separator-block-width", None) + + def loads(self, data): + """Initialize the theme from a JSON string""" + theme = json.loads(data) + self._init(theme) + + def _load_icons(self, name): + """Load icons for a theme""" + path = "{}/icons/".format(theme_path()) + return self.load(name, path=path) + + def load(self, name, path=theme_path()): + """Load and parse a theme file""" + themefile = "{}/{}.json".format(path, name) + + if os.path.isfile(themefile): + try: + with open(themefile) as data: + return json.load(data) + except ValueError as exception: + raise bumblebee.error.ThemeLoadError("JSON error: {}".format(exception)) + else: + raise bumblebee.error.ThemeLoadError("no such theme: {}".format(name)) + + def _get(self, widget, name, default=None): + """Return the config value 'name' for 'widget'""" + + if not self._widget: + self._widget = widget + + if self._widget != widget: + self._prevbg = self.bg(self._widget) + self._widget = widget + if len(self._cycles) > 0: + self._cycle_idx = (self._cycle_idx + 1) % len(self._cycles) + self._cycle = self._cycles[self._cycle_idx] + + module_theme = self._theme.get(widget.module, {}) + + state_themes = [] + # avoid infinite recursion + states = widget.state() + if name not in states: + for state in states: + state_themes.append(self._get(widget, state, {})) + + value = self._defaults.get(name, default) + value = self._cycle.get(name, value) + value = module_theme.get(name, value) + + for theme in state_themes: + value = theme.get(name, value) + + if isinstance(value, list): + key = "{}-idx".format(name) + idx = widget.get(key, 0) + widget.set(key, (idx + 1) % len(value)) + value = value[idx] + + return value # algorithm copied from # http://blog.impressiver.com/post/31434674390/deep-merge-multiple-python-dicts # nicely done :) - def merge(self, target, *args): + def _merge(self, target, *args): + """Merge two arbitrarily nested data structures""" if len(args) > 1: for item in args: - self.merge(item) + self._merge(item) return target item = args[0] @@ -48,83 +156,9 @@ class Theme: return item for key, value in item.items(): if key in target and isinstance(target[key], dict): - self.merge(target[key], value) + self._merge(target[key], value) else: target[key] = copy.deepcopy(value) return target - def begin(self): - self._config.set("theme.cycleidx", 0) - self._cycle = self._cycles[0] if len(self._cycles) > 0 else {} - self._background = [ None, None ] - - def next_widget(self): - self._background[1] = self._background[0] - idx = self._config.increase("theme.cycleidx", len(self._cycles), 0) - self._cycle = self._cycles[idx] if len(self._cycles) > idx else {} - - def prefix(self, widget): - return self._get(widget, "prefix", "") - - def suffix(self, widget): - return self._get(widget, "suffix", "") - - def color(self, widget): - result = self._get(widget, "fg") - if widget.warning(): - result = self._get(widget, "fg-warning") - if widget.critical(): - result = self._get(widget, "fg-critical") - return result - - def background(self, widget): - result = self._get(widget, "bg") - if widget.warning(): - result = self._get(widget, "bg-warning") - if widget.critical(): - result = self._get(widget, "bg-critical") - self._background[0] = result - return result - - def separator(self, widget): - return self._get(widget, "separator") - - def default_separators(self, widget): - return self._get(widget, "default-separators") - - def separator_color(self, widget): - return self.background(widget) - - def separator_background(self, widget): - return self._background[1] - - def separator_block_width(self, widget): - return self._get(widget, "separator-block-width") - - def _get(self, widget, name, default = None): - module = widget.module() - state = widget.state() - inst = widget.instance() - inst = inst.replace("{}.".format(module), "") - module_theme = self._data.get(module, {}) - state_theme = module_theme.get("states", {}).get(state, {}) - instance_theme = module_theme.get(inst, {}) - instance_state_theme = instance_theme.get("states", {}).get(state, {}) - - value = None - value = self._defaults.get(name, value) - value = self._cycle.get(name, value) - value = module_theme.get(name, value) - value = state_theme.get(name, value) - value = instance_theme.get(name, value) - value = instance_state_theme.get(name, value) - - if type(value) is list: - key = "{}{}".format(repr(widget), value) - idx = self._config.parameter(key, 0) - self._config.increase(key, len(value), 0) - value = value[idx] - - return value if value else default - # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/bumblebee/util.py b/bumblebee/util.py index c38bb79..f35ce89 100644 --- a/bumblebee/util.py +++ b/bumblebee/util.py @@ -1,11 +1,23 @@ import shlex import subprocess + try: from exceptions import RuntimeError except ImportError: # Python3 doesn't require this anymore pass +def execute(cmd, wait=True): + args = shlex.split(cmd) + proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + if wait: + out, _ = proc.communicate() + if proc.returncode != 0: + raise RuntimeError("{} exited with {}".format(cmd, proc.returncode)) + return out.decode("utf-8") + return None + def bytefmt(num): for unit in [ "", "Ki", "Mi", "Gi" ]: if num < 1024.0: @@ -21,12 +33,4 @@ def durationfmt(duration): return res -def execute(cmd): - args = shlex.split(cmd) - p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - out, err = p.communicate() - - if p.returncode != 0: - raise RuntimeError("{} exited with {}".format(cmd, p.returncode)) - - return out.decode("utf-8") +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/runlint.sh b/runlint.sh new file mode 100755 index 0000000..6902ce9 --- /dev/null +++ b/runlint.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +find . -name "*.py"|xargs pylint --disable=R0903,R0201,C0330 diff --git a/runtests.sh b/runtests.sh index 58521b5..cbc5a2a 100755 --- a/runtests.sh +++ b/runtests.sh @@ -1,3 +1,13 @@ #!/bin/sh -nosetests --rednose -v tests/ +test=$(which nosetests) + +echo "testing with $(python2 -V 2>&1)" +python2 $test --rednose -v tests/ + +if [ $? == 0 ]; then + echo + + echo "testing with $(python3 -V 2>&1)" + python3 $test --rednose -v tests/ +fi diff --git a/screenshots/cpu.png b/screenshots/cpu.png index f160ba2..0b5527c 100644 Binary files a/screenshots/cpu.png and b/screenshots/cpu.png differ diff --git a/testjson.sh b/testjson.sh new file mode 100755 index 0000000..ce473ea --- /dev/null +++ b/testjson.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +find themes/ -name "*.json"|xargs cat|json_verify -s diff --git a/tests/modules/test_battery.py b/tests/modules/test_battery.py new file mode 100644 index 0000000..499a0fa --- /dev/null +++ b/tests/modules/test_battery.py @@ -0,0 +1,68 @@ +# pylint: disable=C0103,C0111 + +import sys +import json +import unittest +import mock + +from contextlib import contextmanager + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.battery import Module +from tests.util import MockEngine, MockConfig, assertPopen + +class MockOpen(object): + def __init__(self): + self._value = "" + + def returns(self, value): + self._value = value + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + pass + + def read(self): + return self._value + +class TestBatteryModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) + for widget in self.module.widgets(): + widget.link_module(self.module) + + @mock.patch("sys.stdout") + def test_format(self, mock_output): + for widget in self.module.widgets(): + self.assertEquals(len(widget.full_text()), len("100%")) + + @mock.patch("os.path.exists") + @mock.patch("{}.open".format("__builtin__" if sys.version_info[0] < 3 else "builtins")) + @mock.patch("subprocess.Popen") + def test_critical(self, mock_output, mock_open, mock_exists): + mock_open.return_value = MockOpen() + mock_open.return_value.returns("19") + mock_exists.return_value = True + self.config.set("battery.critical", "20") + self.config.set("battery.warning", "25") + self.module.update(self.module.widgets()) + self.assertTrue("critical" in self.module.widgets()[0].state()) + + @mock.patch("os.path.exists") + @mock.patch("{}.open".format("__builtin__" if sys.version_info[0] < 3 else "builtins")) + @mock.patch("subprocess.Popen") + def test_warning(self, mock_output, mock_open, mock_exists): + mock_open.return_value = MockOpen() + mock_exists.return_value = True + mock_open.return_value.returns("22") + self.config.set("battery.critical", "20") + self.config.set("battery.warning", "25") + self.module.update(self.module.widgets()) + self.assertTrue("warning" in self.module.widgets()[0].state()) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_brightness.py b/tests/modules/test_brightness.py new file mode 100644 index 0000000..0594fc3 --- /dev/null +++ b/tests/modules/test_brightness.py @@ -0,0 +1,56 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.brightness import Module +from tests.util import MockEngine, MockConfig, assertPopen, assertMouseEvent + +class TestBrightnessModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) + for widget in self.module.widgets(): + widget.link_module(self.module) + + @mock.patch("sys.stdout") + def test_format(self, mock_output): + for widget in self.module.widgets(): + self.assertEquals(len(widget.full_text()), len("100%")) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_wheel_up(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.WHEEL_UP, + "xbacklight +2%" + ) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_wheel_down(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.WHEEL_DOWN, + "xbacklight -2%" + ) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_custom_step(self, mock_input, mock_output, mock_select): + self.config.set("brightness.step", "10") + module = Module(engine=self.engine, config={ "config": self.config }) + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + module, bumblebee.input.WHEEL_DOWN, + "xbacklight -10%" + ) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_caffeine.py b/tests/modules/test_caffeine.py new file mode 100644 index 0000000..995eece --- /dev/null +++ b/tests/modules/test_caffeine.py @@ -0,0 +1,21 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.caffeine import Module +from tests.util import MockEngine, MockConfig, assertPopen + +class TestCaffeineModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.engine.input.need_valid_event = True + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) + for widget in self.module.widgets(): + widget.link_module(self.module) diff --git a/tests/modules/test_cmus.py b/tests/modules/test_cmus.py new file mode 100644 index 0000000..1bc8b67 --- /dev/null +++ b/tests/modules/test_cmus.py @@ -0,0 +1,57 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.cmus import Module +from tests.util import MockEngine, MockConfig, assertPopen + +class TestCmusModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.module = Module(engine=self.engine, config={"config": MockConfig()}) + + @mock.patch("subprocess.Popen") + def test_read_song(self, mock_output): + rv = mock.Mock() + rv.configure_mock(**{ + "communicate.return_value": ("out", None) + }) + mock_output.return_value = rv + self.module.update(self.module.widgets()) + assertPopen(mock_output, "cmus-remote -Q") + + def test_widgets(self): + self.assertTrue(len(self.module.widgets()), 5) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_interaction(self, mock_input, mock_output, mock_select): + events = [ + {"widget": "cmus.shuffle", "action": "cmus-remote -S"}, + {"widget": "cmus.repeat", "action": "cmus-remote -R"}, + {"widget": "cmus.next", "action": "cmus-remote -n"}, + {"widget": "cmus.prev", "action": "cmus-remote -r"}, + {"widget": "cmus.main", "action": "cmus-remote -u"}, + ] + + mock_select.return_value = (1,2,3) + + for event in events: + mock_input.readline.return_value = json.dumps({ + "name": self.module.id, + "button": bumblebee.input.LEFT_MOUSE, + "instance": self.module.widget(event["widget"]).id + }) + self.engine.input.start() + self.engine.input.stop() + mock_input.readline.assert_any_call() + assertPopen(mock_output, event["action"]) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_cpu.py b/tests/modules/test_cpu.py index 3b3fa65..a00d2c2 100644 --- a/tests/modules/test_cpu.py +++ b/tests/modules/test_cpu.py @@ -1,26 +1,48 @@ +# pylint: disable=C0103,C0111 + +import json import unittest +import mock -import bumblebee.config -import bumblebee.modules.cpu +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.cpu import Module +from tests.util import MockEngine, MockConfig, assertPopen, assertMouseEvent, assertStateContains -class FakeOutput(object): - def add_callback(self, cmd, button, module=None): - pass - -class TestCpuModule(unittest.TestCase): +class TestCPUModule(unittest.TestCase): def setUp(self): - output = FakeOutput() - config = bumblebee.config.Config(["-m", "cpu"]) - self.cpu = bumblebee.modules.cpu.Module(output, config, None) + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) - def test_documentation(self): - self.assertTrue(hasattr(bumblebee.modules.cpu, "description")) - self.assertTrue(hasattr(bumblebee.modules.cpu, "parameters")) + @mock.patch("sys.stdout") + def test_format(self, mock_output): + for widget in self.module.widgets(): + self.assertEquals(len(widget.full_text()), len("100.00%")) - def test_warning(self): - self.assertTrue(hasattr(self.cpu, "warning")) + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_leftclick(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.LEFT_MOUSE, + "gnome-system-monitor" + ) - def test_critical(self): - self.assertTrue(hasattr(self.cpu, "critical")) + @mock.patch("psutil.cpu_percent") + def test_warning(self, mock_psutil): + self.config.set("cpu.critical", "20") + self.config.set("cpu.warning", "18") + mock_psutil.return_value = 19.0 + assertStateContains(self, self.module, "warning") + + @mock.patch("psutil.cpu_percent") + def test_critical(self, mock_psutil): + self.config.set("cpu.critical", "20") + self.config.set("cpu.warning", "19") + mock_psutil.return_value = 21.0 + assertStateContains(self, self.module, "critical") # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_disk.py b/tests/modules/test_disk.py new file mode 100644 index 0000000..f8fee4f --- /dev/null +++ b/tests/modules/test_disk.py @@ -0,0 +1,56 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.disk import Module +from tests.util import MockEngine, MockConfig, assertPopen, assertStateContains + +class MockVFS(object): + def __init__(self, perc): + self.f_blocks = 1024*1024 + self.f_frsize = 1 + self.f_bavail = self.f_blocks - self.f_blocks*(perc/100.0) + +class TestDiskModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.config = MockConfig() + self.config.set("disk.path", "somepath") + self.module = Module(engine=self.engine, config={"config": self.config}) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_leftclick(self, mock_input, mock_output, mock_select): + mock_input.readline.return_value = json.dumps({ + "name": self.module.id, + "button": bumblebee.input.LEFT_MOUSE, + "instance": None + }) + mock_select.return_value = (1,2,3) + self.engine.input.start() + self.engine.input.stop() + mock_input.readline.assert_any_call() + assertPopen(mock_output, "nautilus {}".format(self.module.parameter("path"))) + + @mock.patch("os.statvfs") + def test_warning(self, mock_stat): + self.config.set("disk.critical", "80") + self.config.set("disk.warning", "70") + mock_stat.return_value = MockVFS(75.0) + assertStateContains(self, self.module, "warning") + + @mock.patch("os.statvfs") + def test_critical(self, mock_stat): + self.config.set("disk.critical", "80") + self.config.set("disk.warning", "70") + mock_stat.return_value = MockVFS(85.0) + assertStateContains(self, self.module, "critical") + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_load.py b/tests/modules/test_load.py new file mode 100644 index 0000000..d8120e2 --- /dev/null +++ b/tests/modules/test_load.py @@ -0,0 +1,47 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.load import Module +from tests.util import MockEngine, MockConfig, assertStateContains, assertMouseEvent + +class TestLoadModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_leftclick(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.LEFT_MOUSE, + "gnome-system-monitor" + ) + + @mock.patch("multiprocessing.cpu_count") + @mock.patch("os.getloadavg") + def test_warning(self, mock_loadavg, mock_cpucount): + self.config.set("load.critical", "1") + self.config.set("load.warning", "0.8") + mock_cpucount.return_value = 1 + mock_loadavg.return_value = [ 0.9, 0, 0 ] + assertStateContains(self, self.module, "warning") + + @mock.patch("multiprocessing.cpu_count") + @mock.patch("os.getloadavg") + def test_critical(self, mock_loadavg, mock_cpucount): + self.config.set("load.critical", "1") + self.config.set("load.warning", "0.8") + mock_cpucount.return_value = 1 + mock_loadavg.return_value = [ 1.1, 0, 0 ] + assertStateContains(self, self.module, "critical") + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_memory.py b/tests/modules/test_memory.py new file mode 100644 index 0000000..4533357 --- /dev/null +++ b/tests/modules/test_memory.py @@ -0,0 +1,47 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.memory import Module +from tests.util import MockEngine, MockConfig, assertPopen, assertMouseEvent, assertStateContains + +class VirtualMemory(object): + def __init__(self, percent): + self.percent = percent + +class TestMemoryModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_leftclick(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.LEFT_MOUSE, + "gnome-system-monitor" + ) + + @mock.patch("psutil.virtual_memory") + def test_warning(self, mock_vmem): + self.config.set("memory.critical", "80") + self.config.set("memory.warning", "70") + mock_vmem.return_value = VirtualMemory(75) + assertStateContains(self, self.module, "warning") + + @mock.patch("psutil.virtual_memory") + def test_critical(self, mock_vmem): + self.config.set("memory.critical", "80") + self.config.set("memory.warning", "70") + mock_vmem.return_value = VirtualMemory(85) + assertStateContains(self, self.module, "critical") + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_modules.py b/tests/modules/test_modules.py new file mode 100644 index 0000000..4c54a93 --- /dev/null +++ b/tests/modules/test_modules.py @@ -0,0 +1,57 @@ +# pylint: disable=C0103,C0111 + +import unittest +import importlib +import mock + +from bumblebee.engine import all_modules +from bumblebee.config import Config +from tests.util import assertWidgetAttributes, MockEngine + +class MockCommunicate(object): + def __init__(self): + self.returncode = 0 + + def communicate(self): + return (str.encode("1"), "error") + +class TestGenericModules(unittest.TestCase): + @mock.patch("subprocess.Popen") + def setUp(self, mock_output): + mock_output.return_value = MockCommunicate() + engine = MockEngine() + config = Config() + self.objects = {} + for mod in all_modules(): + cls = importlib.import_module("bumblebee.modules.{}".format(mod["name"])) + self.objects[mod["name"]] = getattr(cls, "Module")(engine, {"config": config}) + for widget in self.objects[mod["name"]].widgets(): + self.assertEquals(widget.get("variable", None), None) + + @mock.patch("subprocess.Popen") + def test_widgets(self, mock_output): + mock_output.return_value = MockCommunicate() + for mod in self.objects: + widgets = self.objects[mod].widgets() + for widget in widgets: + widget.link_module(self.objects[mod]) + self.assertEquals(widget.module, mod) + assertWidgetAttributes(self, widget) + widget.set("variable", "value") + self.assertEquals(widget.get("variable", None), "value") + self.assertTrue(isinstance(widget.full_text(), str)) + + @mock.patch("subprocess.Popen") + def test_update(self, mock_output): + mock_output.return_value = MockCommunicate() + rv = mock.Mock() + rv.configure_mock(**{ + "communicate.return_value": ("out", None) + }) + for mod in self.objects: + widgets = self.objects[mod].widgets() + self.objects[mod].update(widgets) + self.test_widgets() + self.assertEquals(widgets, self.objects[mod].widgets()) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/modules/test_pulseaudio.py b/tests/modules/test_pulseaudio.py new file mode 100644 index 0000000..6515589 --- /dev/null +++ b/tests/modules/test_pulseaudio.py @@ -0,0 +1,56 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from bumblebee.modules.pulseaudio import Module +from tests.util import MockEngine, MockConfig, assertPopen, assertMouseEvent, assertStateContains + +class TestPulseAudioModule(unittest.TestCase): + def setUp(self): + self.engine = MockEngine() + self.engine.input = I3BarInput() + self.engine.input.need_event = True + self.config = MockConfig() + self.module = Module(engine=self.engine, config={ "config": self.config }) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_leftclick(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.LEFT_MOUSE, + "pactl set-source-mute @DEFAULT_SOURCE@ toggle" + ) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_rightclick(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.RIGHT_MOUSE, + "pavucontrol" + ) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_wheelup(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.WHEEL_UP, + "pactl set-source-volume @DEFAULT_SOURCE@ +2%" + ) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_wheeldown(self, mock_input, mock_output, mock_select): + assertMouseEvent(mock_input, mock_output, mock_select, self.engine, + self.module, bumblebee.input.WHEEL_DOWN, + "pactl set-source-volume @DEFAULT_SOURCE@ -2%" + ) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/test_config.py b/tests/test_config.py index 6891eab..90e9816 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,10 +1,28 @@ +# pylint: disable=C0103,C0111 import unittest -import bumblebee.config +from bumblebee.config import Config -class TestConfigCreation(unittest.TestCase): +class TestConfig(unittest.TestCase): def setUp(self): - pass + self.defaultConfig = Config() + self.someSimpleModules = ["foo", "bar", "baz"] + self.someAliasModules = ["foo:a", "bar:b", "baz:c"] + def test_no_modules_by_default(self): + self.assertEquals(self.defaultConfig.modules(), []) + + def test_simple_modules(self): + cfg = Config(["-m"] + self.someSimpleModules) + self.assertEquals(cfg.modules(), [{ + "name": x, "module": x + } for x in self.someSimpleModules]) + + def test_alias_modules(self): + cfg = Config(["-m"] + self.someAliasModules) + self.assertEquals(cfg.modules(), [{ + "module": x.split(":")[0], + "name": x.split(":")[1], + } for x in self.someAliasModules]) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/test_engine.py b/tests/test_engine.py new file mode 100644 index 0000000..069a71d --- /dev/null +++ b/tests/test_engine.py @@ -0,0 +1,55 @@ +# pylint: disable=C0103,C0111,W0703,W0212 + +import unittest + +from bumblebee.error import ModuleLoadError +from bumblebee.engine import Engine +from bumblebee.config import Config + +from tests.util import MockOutput, MockInput + +class TestEngine(unittest.TestCase): + def setUp(self): + self.engine = Engine(config=Config(), output=MockOutput(), inp=MockInput()) + self.singleWidgetModule = [{"module": "test", "name": "a"}] + self.testModule = "test" + self.invalidModule = "no-such-module" + self.testModuleSpec = "bumblebee.modules.{}".format(self.testModule) + self.testModules = [ + {"module": "test", "name": "a"}, + {"module": "test", "name": "b"}, + ] + + def test_stop(self): + self.assertTrue(self.engine.running()) + self.engine.stop() + self.assertFalse(self.engine.running()) + + def test_load_module(self): + module = self.engine._load_module(self.testModule) + self.assertEquals(module.__module__, self.testModuleSpec) + + def test_load_invalid_module(self): + with self.assertRaises(ModuleLoadError): + self.engine._load_module(self.invalidModule) + + def test_load_none(self): + with self.assertRaises(ModuleLoadError): + self.engine._load_module(None) + + def test_load_modules(self): + modules = self.engine.load_modules(self.testModules) + self.assertEquals(len(modules), len(self.testModules)) + self.assertEquals( + [module.__module__ for module in modules], + [self.testModuleSpec for module in modules] + ) + + def test_run(self): + self.engine.load_modules(self.singleWidgetModule) + try: + self.engine.run() + except Exception as e: + self.fail(e) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/test_i3barinput.py b/tests/test_i3barinput.py new file mode 100644 index 0000000..e2758d6 --- /dev/null +++ b/tests/test_i3barinput.py @@ -0,0 +1,115 @@ +# pylint: disable=C0103,C0111 + +import unittest +import json +import subprocess +import mock + +import bumblebee.input +from bumblebee.input import I3BarInput +from tests.util import MockWidget, MockModule, assertPopen, assertMouseEvent + +class TestI3BarInput(unittest.TestCase): + def setUp(self): + self.input = I3BarInput() + self.input.need_event = True + self.anyModule = MockModule() + self.anyWidget = MockWidget("test") + self.anyModule.id = "test-module" + self._called = 0 + + def callback(self, event): + self._called += 1 + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_basic_read_event(self, mock_input, mock_select): + mock_select.return_value = (1,2,3) + mock_input.readline.return_value = "" + self.input.start() + self.input.stop() + mock_input.readline.assert_any_call() + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_ignore_invalid_data(self, mock_input, mock_select): + mock_select.return_value = (1,2,3) + mock_input.readline.return_value = "garbage" + self.input.start() + self.assertEquals(self.input.alive(), True) + self.assertEquals(self.input.stop(), True) + mock_input.readline.assert_any_call() + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_ignore_invalid_event(self, mock_input, mock_select): + mock_select.return_value = (1,2,3) + mock_input.readline.return_value = json.dumps({ + "name": None, + "instance": None, + "button": None, + }) + self.input.start() + self.assertEquals(self.input.alive(), True) + self.assertEquals(self.input.stop(), True) + mock_input.readline.assert_any_call() + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_global_callback(self, mock_input, mock_select): + self.input.register_callback(None, button=1, cmd=self.callback) + assertMouseEvent(mock_input, None, mock_select, self, None, + bumblebee.input.LEFT_MOUSE, None, "someinstance") + self.assertTrue(self._called > 0) + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_remove_global_callback(self, mock_input, mock_select): + self.input.register_callback(None, button=1, cmd=self.callback) + self.input.deregister_callbacks(None) + assertMouseEvent(mock_input, None, mock_select, self, None, + bumblebee.input.LEFT_MOUSE, None, "someinstance") + self.assertTrue(self._called == 0) + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_global_callback_button_missmatch(self, mock_input, mock_select): + self.input.register_callback(self.anyModule, button=1, cmd=self.callback) + assertMouseEvent(mock_input, None, mock_select, self, None, + bumblebee.input.RIGHT_MOUSE, None, "someinstance") + self.assertTrue(self._called == 0) + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_module_callback(self, mock_input, mock_select): + self.input.register_callback(self.anyModule, button=1, cmd=self.callback) + assertMouseEvent(mock_input, None, mock_select, self, self.anyModule, + bumblebee.input.LEFT_MOUSE, None) + self.assertTrue(self._called > 0) + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_remove_module_callback(self, mock_input, mock_select): + self.input.register_callback(self.anyModule, button=1, cmd=self.callback) + self.input.deregister_callbacks(self.anyModule) + assertMouseEvent(mock_input, None, mock_select, self, None, + bumblebee.input.LEFT_MOUSE, None, self.anyWidget.id) + self.assertTrue(self._called == 0) + + @mock.patch("select.select") + @mock.patch("sys.stdin") + def test_widget_callback(self, mock_input, mock_select): + self.input.register_callback(self.anyWidget, button=1, cmd=self.callback) + assertMouseEvent(mock_input, None, mock_select, self, None, + bumblebee.input.LEFT_MOUSE, None, self.anyWidget.id) + self.assertTrue(self._called > 0) + + @mock.patch("select.select") + @mock.patch("subprocess.Popen") + @mock.patch("sys.stdin") + def test_widget_cmd_callback(self, mock_input, mock_output, mock_select): + self.input.register_callback(self.anyWidget, button=1, cmd="echo") + assertMouseEvent(mock_input, mock_output, mock_select, self, None, + bumblebee.input.LEFT_MOUSE, "echo", self.anyWidget.id) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/test_i3baroutput.py b/tests/test_i3baroutput.py new file mode 100644 index 0000000..f4c4cca --- /dev/null +++ b/tests/test_i3baroutput.py @@ -0,0 +1,104 @@ +# pylint: disable=C0103,C0111 + +import json +import unittest +import mock +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +from bumblebee.output import I3BarOutput +from tests.util import MockWidget, MockTheme, MockModule + +class TestI3BarOutput(unittest.TestCase): + def setUp(self): + self.theme = MockTheme() + self.output = I3BarOutput(self.theme) + self.expectedStart = json.dumps({"version": 1, "click_events": True}) + "[\n" + self.expectedStop = "]\n" + self.someWidget = MockWidget("foo bar baz") + self.anyModule = MockModule(None, None) + self.anyColor = "#ababab" + self.anotherColor = "#cccccc" + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_start(self, stdout): + self.output.start() + self.assertEquals(self.expectedStart, stdout.getvalue()) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_stop(self, stdout): + self.output.stop() + self.assertEquals(self.expectedStop, stdout.getvalue()) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_draw_single_widget(self, stdout): + self.output.draw(self.someWidget, self.anyModule) + self.output.flush() + result = json.loads(stdout.getvalue())[0] + self.assertEquals(result["full_text"], self.someWidget.full_text()) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_draw_multiple_widgets(self, stdout): + for widget in [self.someWidget, self.someWidget]: + self.output.draw(widget, self.anyModule) + self.output.flush() + result = json.loads(stdout.getvalue()) + for res in result: + self.assertEquals(res["full_text"], self.someWidget.full_text()) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_begin(self, stdout): + self.output.begin() + self.assertEquals("", stdout.getvalue()) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_end(self, stdout): + self.output.end() + self.assertEquals(",\n", stdout.getvalue()) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_prefix(self, stdout): + self.theme.attr_prefix = " - " + self.output.draw(self.someWidget, self.anyModule) + self.output.flush() + result = json.loads(stdout.getvalue())[0] + self.assertEquals(result["full_text"], "{}{}".format( + self.theme.prefix(self.someWidget), self.someWidget.full_text()) + ) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_suffix(self, stdout): + self.theme.attr_suffix = " - " + self.output.draw(self.someWidget, self.anyModule) + self.output.flush() + result = json.loads(stdout.getvalue())[0] + self.assertEquals(result["full_text"], "{}{}".format( + self.someWidget.full_text(), self.theme.suffix(self.someWidget)) + ) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_bothfix(self, stdout): + self.theme.attr_suffix = " - " + self.theme.attr_prefix = " * " + self.output.draw(self.someWidget, self.anyModule) + self.output.flush() + result = json.loads(stdout.getvalue())[0] + self.assertEquals(result["full_text"], "{}{}{}".format( + self.theme.prefix(self.someWidget), + self.someWidget.full_text(), + self.theme.suffix(self.someWidget) + )) + + @mock.patch("sys.stdout", new_callable=StringIO) + def test_colors(self, stdout): + self.theme.attr_fg = self.anyColor + self.theme.attr_bg = self.anotherColor + self.output.draw(self.someWidget, self.anyModule) + self.output.flush() + result = json.loads(stdout.getvalue())[0] + self.assertEquals(result["color"], self.anyColor) + self.assertEquals(result["background"], self.anotherColor) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/test_module.py b/tests/test_module.py new file mode 100644 index 0000000..0cc95ad --- /dev/null +++ b/tests/test_module.py @@ -0,0 +1,50 @@ +# pylint: disable=C0103,C0111,W0703 + +import unittest + +from bumblebee.engine import Module +from bumblebee.config import Config +from tests.util import MockWidget + +class TestModule(unittest.TestCase): + def setUp(self): + self.widget = MockWidget("foo") + self.config = Config() + self.moduleWithoutWidgets = Module(engine=None, widgets=None) + self.moduleWithOneWidget = Module(engine=None, widgets=self.widget) + self.moduleWithMultipleWidgets = Module(engine=None, + widgets=[self.widget, self.widget, self.widget] + ) + + self.anyConfigName = "cfg" + self.anotherConfigName = "cfg2" + self.anyModule = Module(engine=None, widgets=self.widget, config={ + "name": self.anyConfigName, "config": self.config + }) + self.anotherModule = Module(engine=None, widgets=self.widget, config={ + "name": self.anotherConfigName, "config": self.config + }) + self.anyKey = "some-parameter" + self.anyValue = "value" + self.anotherValue = "another-value" + self.emptyKey = "i-do-not-exist" + self.config.set("{}.{}".format(self.anyConfigName, self.anyKey), self.anyValue) + self.config.set("{}.{}".format(self.anotherConfigName, self.anyKey), self.anotherValue) + + def test_empty_widgets(self): + self.assertEquals(self.moduleWithoutWidgets.widgets(), []) + + def test_single_widget(self): + self.assertEquals(self.moduleWithOneWidget.widgets(), [self.widget]) + + def test_multiple_widgets(self): + for widget in self.moduleWithMultipleWidgets.widgets(): + self.assertEquals(widget, self.widget) + + def test_parameters(self): + self.assertEquals(self.anyModule.parameter(self.anyKey), self.anyValue) + self.assertEquals(self.anotherModule.parameter(self.anyKey), self.anotherValue) + + def test_default_parameters(self): + self.assertEquals(self.anyModule.parameter(self.emptyKey), None) + self.assertEquals(self.anyModule.parameter(self.emptyKey, self.anyValue), self.anyValue) diff --git a/tests/test_store.py b/tests/test_store.py new file mode 100644 index 0000000..0b712f0 --- /dev/null +++ b/tests/test_store.py @@ -0,0 +1,24 @@ +# pylint: disable=C0103,C0111,W0703 + +import unittest + +from bumblebee.store import Store + +class TestStore(unittest.TestCase): + def setUp(self): + self.store = Store() + self.anyKey = "some-key" + self.anyValue = "some-value" + self.unsetKey = "invalid-key" + + def test_set_value(self): + self.store.set(self.anyKey, self.anyValue) + self.assertEquals(self.store.get(self.anyKey), self.anyValue) + + def test_get_invalid_value(self): + result = self.store.get(self.unsetKey) + self.assertEquals(result, None) + + def test_get_invalid_with_default_value(self): + result = self.store.get(self.unsetKey, self.anyValue) + self.assertEquals(result, self.anyValue) diff --git a/tests/test_theme.py b/tests/test_theme.py new file mode 100644 index 0000000..8d9dccc --- /dev/null +++ b/tests/test_theme.py @@ -0,0 +1,115 @@ +# pylint: disable=C0103,C0111,W0703 + +import unittest +from bumblebee.theme import Theme +from bumblebee.error import ThemeLoadError +from tests.util import MockWidget + +class TestTheme(unittest.TestCase): + def setUp(self): + self.nonexistentThemeName = "no-such-theme" + self.invalidThemeName = "invalid" + self.validThemeName = "test" + self.themedWidget = MockWidget("bla") + self.theme = Theme(self.validThemeName) + self.cycleTheme = Theme("test_cycle") + self.anyWidget = MockWidget("bla") + self.anotherWidget = MockWidget("blub") + + data = self.theme.data() + self.widgetTheme = "test-widget" + self.themedWidget.module = self.widgetTheme + self.defaultColor = data["defaults"]["fg"] + self.defaultBgColor = data["defaults"]["bg"] + self.widgetColor = data[self.widgetTheme]["fg"] + self.widgetBgColor = data[self.widgetTheme]["bg"] + self.defaultPrefix = data["defaults"]["prefix"] + self.defaultSuffix = data["defaults"]["suffix"] + self.widgetPrefix = data[self.widgetTheme]["prefix"] + self.widgetSuffix = data[self.widgetTheme]["suffix"] + + def test_load_valid_theme(self): + try: + Theme(self.validThemeName) + except Exception as e: + self.fail(e) + + def test_load_nonexistent_theme(self): + with self.assertRaises(ThemeLoadError): + Theme(self.nonexistentThemeName) + + def test_load_invalid_theme(self): + with self.assertRaises(ThemeLoadError): + Theme(self.invalidThemeName) + + def test_default_prefix(self): + self.assertEquals(self.theme.prefix(self.anyWidget), self.defaultPrefix) + + def test_default_suffix(self): + self.assertEquals(self.theme.suffix(self.anyWidget), self.defaultSuffix) + + def test_widget_prefix(self): + self.assertEquals(self.theme.prefix(self.themedWidget), self.widgetPrefix) + + def test_widget_fg(self): + self.assertEquals(self.theme.fg(self.anyWidget), self.defaultColor) + self.anyWidget.module = self.widgetTheme + self.assertEquals(self.theme.fg(self.anyWidget), self.widgetColor) + + def test_widget_bg(self): + self.assertEquals(self.theme.bg(self.anyWidget), self.defaultBgColor) + self.anyWidget.module = self.widgetTheme + self.assertEquals(self.theme.bg(self.anyWidget), self.widgetBgColor) + + def test_absent_cycle(self): + theme = self.theme + try: + theme.fg(self.anyWidget) + theme.fg(self.anotherWidget) + except Exception as e: + self.fail(e) + + def test_reset(self): + theme = self.cycleTheme + data = theme.data() + theme.reset() + self.assertEquals(theme.fg(self.anyWidget), data["cycle"][0]["fg"]) + self.assertEquals(theme.fg(self.anotherWidget), data["cycle"][1]["fg"]) + theme.reset() + self.assertEquals(theme.fg(self.anyWidget), data["cycle"][0]["fg"]) + + def test_separator_block_width(self): + theme = self.theme + data = theme.data() + + self.assertEquals(theme.separator_block_width(self.anyWidget), + data["defaults"]["separator-block-width"] + ) + + def test_separator(self): + for theme in [self.theme, self.cycleTheme]: + theme.reset() + prev_bg = theme.bg(self.anyWidget) + theme.bg(self.anotherWidget) + + self.assertEquals(theme.separator_fg(self.anotherWidget), theme.bg(self.anotherWidget)) + self.assertEquals(theme.separator_bg(self.anotherWidget), prev_bg) + + def test_state(self): + theme = self.theme + data = theme.data() + + self.assertEquals(theme.fg(self.anyWidget), data["defaults"]["fg"]) + self.assertEquals(theme.bg(self.anyWidget), data["defaults"]["bg"]) + + self.anyWidget.attr_state = ["critical"] + self.assertEquals(theme.fg(self.anyWidget), data["defaults"]["critical"]["fg"]) + self.assertEquals(theme.bg(self.anyWidget), data["defaults"]["critical"]["bg"]) + + self.themedWidget.attr_state = ["critical"] + self.assertEquals(theme.fg(self.themedWidget), data[self.widgetTheme]["critical"]["fg"]) + # if elements are missing in the state theme, they are taken from the + # widget theme instead (i.e. no fallback to a more general state theme) + self.assertEquals(theme.bg(self.themedWidget), data[self.widgetTheme]["bg"]) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests/util.py b/tests/util.py new file mode 100644 index 0000000..0b46d62 --- /dev/null +++ b/tests/util.py @@ -0,0 +1,138 @@ +# pylint: disable=C0103,C0111,W0613 + +import json +import shlex +import subprocess + +from bumblebee.output import Widget + +def assertWidgetAttributes(test, widget): + test.assertTrue(isinstance(widget, Widget)) + test.assertTrue(hasattr(widget, "full_text")) + +def assertPopen(output, cmd): + res = shlex.split(cmd) + output.assert_any_call(res, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + +def assertStateContains(test, module, state): + for widget in module.widgets(): + widget.link_module(module) + module.update(module.widgets()) + test.assertTrue(state in module.widgets()[0].state()) + +def assertMouseEvent(mock_input, mock_output, mock_select, engine, module, button, cmd, instance_id=None): + mock_input.readline.return_value = json.dumps({ + "name": module.id if module else "test", + "button": button, + "instance": instance_id + }) + mock_select.return_value = (1, 2, 3) + engine.input.start() + engine.input.stop() + mock_input.readline.assert_any_call() + if cmd: + assertPopen(mock_output, cmd) + +class MockInput(object): + def start(self): + pass + + def stop(self): + pass + + def register_callback(self, obj, button, cmd): + pass + +class MockEngine(object): + def __init__(self): + self.input = MockInput() + +class MockConfig(object): + def __init__(self): + self._data = {} + + def get(self, name, default): + if name in self._data: + return self._data[name] + return default + + def set(self, name, value): + self._data[name] = value + +class MockOutput(object): + def start(self): + pass + + def stop(self): + pass + + def draw(self, widget, engine, module): + engine.stop() + + def begin(self): + pass + + def flush(self): + pass + + def end(self): + pass + +class MockModule(object): + def __init__(self, engine=None, config=None): + self.id = None + +class MockWidget(Widget): + def __init__(self, text): + super(MockWidget, self).__init__(text) + self._text = text + self.module = None + self.attr_state = ["state-default"] + self.id = "none" + + def state(self): + return self.attr_state + + def update(self, widgets): + pass + + def full_text(self): + return self._text + +class MockTheme(object): + def __init__(self): + self.attr_prefix = None + self.attr_suffix = None + self.attr_fg = None + self.attr_bg = None + self.attr_separator = None + self.attr_separator_block_width = 0 + + def padding(self, widget): + return "" + + def reset(self): + pass + + def separator_block_width(self, widget): + return self.attr_separator_block_width + + def separator(self, widget): + return self.attr_separator + + def prefix(self, widget, default=None): + return self.attr_prefix + + def suffix(self, widget, default=None): + return self.attr_suffix + + def fg(self, widget): + return self.attr_fg + + def bg(self, widget): + return self.attr_bg + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/themes/gruvbox-powerline.json b/themes/gruvbox-powerline.json index f04bbc7..e8921c3 100644 --- a/themes/gruvbox-powerline.json +++ b/themes/gruvbox-powerline.json @@ -3,42 +3,41 @@ "defaults": { "prefix": " ", "suffix" : " ", - "cycle": [ - { - "fg": "#ebdbb2", - "bg": "#1d2021" - }, - { - "fg": "#fbf1c7", - "bg": "#282828" - } - ], - "fg-critical": "#fbf1c7", - "bg-critical": "#cc241d", - "fg-warning": "#1d2021", - "bg-warning": "#d79921", - + "warning": { + "fg": "#1d2021", + "bg": "#d79921" + }, + "critical": { + "fg": "#fbf1c7", + "bg": "#cc241d" + }, "default-separators": false, "separator-block-width": 0 }, + "cycle": [ + { + "fg": "#ebdbb2", + "bg": "#1d2021" + }, + { + "fg": "#fbf1c7", + "bg": "#282828" + } + ], "dnf": { - "states": { - "good": { - "fg": "#002b36", - "bg": "#859900" - } + "good": { + "fg": "#002b36", + "bg": "#859900" } }, "battery": { - "states": { - "charged": { - "fg": "#1d2021", - "bg": "#b8bb26" - }, - "AC": { - "fg": "#1d2021", - "bg": "#b8bb26" - } + "charged": { + "fg": "#1d2021", + "bg": "#b8bb26" + }, + "AC": { + "fg": "#1d2021", + "bg": "#b8bb26" } } } diff --git a/themes/icons/ascii.json b/themes/icons/ascii.json index bd051ce..613f133 100644 --- a/themes/icons/ascii.json +++ b/themes/icons/ascii.json @@ -1,119 +1,56 @@ { - "memory": { - "prefix": "ram" - }, - "cpu": { - "prefix": "cpu" - }, - "disk": { - "prefix": "hdd" - }, - "dnf": { - "prefix": "dnf" - }, - "brightness": { - "prefix": "o" + "defaults": { + "padding": " " }, + "memory": { "prefix": "ram" }, + "cpu": { "prefix": "cpu" }, + "disk": { "prefix": "hdd" }, + "dnf": { "prefix": "dnf" }, + "brightness": { "prefix": "o" }, "cmus": { - "states": { - "playing": { - "prefix": ">" - }, - "paused": { - "prefix": "||" - }, - "stopped": { - "prefix": "[]" - } - }, - "prev": { - "prefix": "|<" - }, - "next": { - "prefix": ">|" - }, - "shuffle": { - "states": { "on": { "prefix": "S" }, "off": { "prefix": "[s]" } } - }, - "repeat": { - "states": { "on": { "prefix": "R" }, "off": { "prefix": "[r]" } } - } + "playing": { "prefix": ">" }, + "paused": { "prefix": "||" }, + "stopped": { "prefix": "[]" }, + "prev": { "prefix": "|<" }, + "next": { "prefix": ">|" }, + "shuffle-on": { "prefix": "S" }, + "shuffle-off": { "prefix": "[s]" }, + "repeat-on": { "prefix": "R" }, + "repeat-off": { "prefix": "[r]" } }, "pasink": { - "states": { - "muted": { - "prefix": "audio(mute)" - }, - "unmuted": { - "prefix": "audio" - } - } + "muted": { "prefix": "audio(mute)" }, + "unmuted": { "prefix": "audio" } }, "pasource": { - "states": { - "muted": { - "prefix": "mic(mute)" - }, - "unmuted": { - "prefix": "mic" - } - } + "muted": { "prefix": "mic(mute)" }, + "unmuted": { "prefix": "mic" } }, "nic": { - "states": { - "wireless-up": { - "prefix": "wifi" - }, - "wireless-down": { - "prefix": "wifi" - }, - "wired-up": { - "prefix": "lan" - }, - "wired-down": { - "prefix": "lan" - }, - "tunnel-up": { - "prefix": "tun" - }, - "tunnel-down": { - "prefix": "tun" - } - } + "wireless-up": { "prefix": "wifi" }, + "wireless-down": { "prefix": "wifi" }, + "wired-up": { "prefix": "lan" }, + "wired-down": { "prefix": "lan" }, + "tunnel-up": { "prefix": "tun" }, + "tunnel-down": { "prefix": "tun" } }, "battery": { - "states": { - "charged": { - "suffix": "full" - }, - "charging": { - "suffix": "chr" - }, - "AC": { - "suffix": "ac" - }, - "discharging-10": { - "prefix": "!", - "suffix": "dis" - }, - "discharging-25": { - "suffix": "dis" - }, - "discharging-50": { - "suffix": "dis" - }, - "discharging-80": { - "suffix": "dis" - }, - "discharging-100": { - "suffix": "dis" - } - } + "charged": { "suffix": "full" }, + "charging": { "suffix": "chr" }, + "AC": { "suffix": "ac" }, + "discharging-10": { + "prefix": "!", + "suffix": "dis" + }, + "discharging-25": { "suffix": "dis" }, + "discharging-50": { "suffix": "dis" }, + "discharging-80": { "suffix": "dis" }, + "discharging-100": { "suffix": "dis" } }, "caffeine": { - "states": { "activated": {"prefix": "caf-on" }, "deactivated": { "prefix": "caf-off " } } + "activated": {"prefix": "caf-on" }, "deactivated": { "prefix": "caf-off " } }, "xrandr": { - "states": { "on": { "prefix": " off "}, "off": { "prefix": " on "} } + "on": { "prefix": " off "}, "off": { "prefix": " on "} } } diff --git a/themes/icons/awesome-fonts.json b/themes/icons/awesome-fonts.json index 62d9294..b3784eb 100644 --- a/themes/icons/awesome-fonts.json +++ b/themes/icons/awesome-fonts.json @@ -1,134 +1,60 @@ { "defaults": { - "separator": "" - }, - "date": { - "prefix": "" - }, - "time": { - "prefix": "" - }, - "memory": { - "prefix": "" - }, - "cpu": { - "prefix": "" - }, - "disk": { - "prefix": "" - }, - "dnf": { - "prefix": "" - }, - "brightness": { - "prefix": "" + "separator": "", "padding": " ", + "unknown": { "prefix": "" } }, + "date": { "prefix": "" }, + "time": { "prefix": "" }, + "memory": { "prefix": "" }, + "cpu": { "prefix": "" }, + "disk": { "prefix": "" }, + "dnf": { "prefix": "" }, + "brightness": { "prefix": "" }, + "load": { "prefix": "" }, "cmus": { - "states": { - "playing": { - "prefix": "" - }, - "paused": { - "prefix": "" - }, - "stopped": { - "prefix": "" - } - }, - "prev": { - "prefix": "" - }, - "next": { - "prefix": "" - }, - "shuffle": { - "states": { "on": { "prefix": "" }, "off": { "prefix": "" } } - }, - "repeat": { - "states": { "on": { "prefix": "" }, "off": { "prefix": "" } } - } + "playing": { "prefix": "" }, + "paused": { "prefix": "" }, + "stopped": { "prefix": "" }, + "prev": { "prefix": "" }, + "next": { "prefix": "" }, + "shuffle-on": { "prefix": "" }, + "shuffle-off": { "prefix": "" }, + "repeat-on": { "prefix": "" }, + "repeat-off": { "prefix": "" } }, "pasink": { - "states": { - "muted": { - "prefix": "" - }, - "unmuted": { - "prefix": "" - } - } + "muted": { "prefix": "" }, + "unmuted": { "prefix": "" } }, "pasource": { - "states": { - "muted": { - "prefix": "" - }, - "unmuted": { - "prefix": "" - } - } + "muted": { "prefix": "" }, + "unmuted": { "prefix": "" } }, "nic": { - "states": { - "wireless-up": { - "prefix": "" - }, - "wireless-down": { - "prefix": "" - }, - "wired-up": { - "prefix": "" - }, - "wired-down": { - "prefix": "" - }, - "tunnel-up": { - "prefix": "" - }, - "tunnel-down": { - "prefix": "" - } - } + "wireless-up": { "prefix": "" }, + "wireless-down": { "prefix": "" }, + "wired-up": { "prefix": "" }, + "wired-down": { "prefix": "" }, + "tunnel-up": { "prefix": "" }, + "tunnel-down": { "prefix": "" } }, "battery": { - "states": { - "charged": { - "prefix": "", - "suffix": "" - }, - "AC": { - "suffix": "" - }, - "charging": { - "prefix": [ "", "", "", "", "" ], - "suffix": "" - }, - "discharging-10": { - "prefix": "", - "suffix": "" - }, - "discharging-25": { - "prefix": "", - "suffix": "" - }, - "discharging-50": { - "prefix": "", - "suffix": "" - }, - "discharging-80": { - "prefix": "", - "suffix": "" - }, - "discharging-100": { - "prefix": "", - "suffix": "" - } - } + "charged": { "prefix": "", "suffix": "" }, + "AC": { "suffix": "" }, + "charging": { + "prefix": [ "", "", "", "", "" ], + "suffix": "" + }, + "discharging-10": { "prefix": "", "suffix": "" }, + "discharging-25": { "prefix": "", "suffix": "" }, + "discharging-50": { "prefix": "", "suffix": "" }, + "discharging-80": { "prefix": "", "suffix": "" }, + "discharging-100": { "prefix": "", "suffix": "" } }, "caffeine": { - "states": { "activated": {"prefix": " " }, "deactivated": { "prefix": " " } } + "activated": {"prefix": " " }, "deactivated": { "prefix": " " } }, "xrandr": { - "states": { "on": { "prefix": " "}, "off": { "prefix": " "} } + "on": { "prefix": " "}, "off": { "prefix": " "} } } diff --git a/themes/icons/test.json b/themes/icons/test.json new file mode 100644 index 0000000..ad17178 --- /dev/null +++ b/themes/icons/test.json @@ -0,0 +1,6 @@ +{ + "test-widget": { + "prefix": "widget-prefix", + "suffix": "widget-suffix" + } +} diff --git a/themes/invalid.json b/themes/invalid.json new file mode 100644 index 0000000..d510f27 --- /dev/null +++ b/themes/invalid.json @@ -0,0 +1 @@ +this is really not json diff --git a/themes/powerline.json b/themes/powerline.json index bfec776..afea669 100644 --- a/themes/powerline.json +++ b/themes/powerline.json @@ -1,41 +1,40 @@ { "icons": [ "awesome-fonts" ], "defaults": { - "cycle": [ - { - "fg": "#ffd700", - "bg": "#d75f00" - }, - { - "fg": "#ffffff", - "bg": "#0087af" - } - ], - "fg-critical": "#ffffff", - "bg-critical": "#ff0000", - "fg-warning": "#d75f00", - "bg-warning": "#ffd700", - + "critical": { + "fg": "#ffffff", + "bg": "#ff0000" + }, + "warning": { + "fg": "#d75f00", + "bg": "#ffd700" + }, "default_separators": false }, + "cycle": [ + { + "fg": "#ffd700", + "bg": "#d75f00" + }, + { + "fg": "#ffffff", + "bg": "#0087af" + } + ], "dnf": { - "states": { - "good": { - "fg": "#494949", - "bg": "#41db00" - } + "good": { + "fg": "#494949", + "bg": "#41db00" } }, "battery": { - "states": { - "charged": { - "fg": "#494949", - "bg": "#41db00" - }, - "AC": { - "fg": "#494949", - "bg": "#41db00" - } + "charged": { + "fg": "#494949", + "bg": "#41db00" + }, + "AC": { + "fg": "#494949", + "bg": "#41db00" } } } diff --git a/themes/solarized-powerline.json b/themes/solarized-powerline.json index 998b7f4..cb9049b 100644 --- a/themes/solarized-powerline.json +++ b/themes/solarized-powerline.json @@ -1,42 +1,34 @@ { "icons": [ "awesome-fonts" ], "defaults": { - "cycle": [ - { - "fg": "#93a1a1", - "bg": "#002b36" - }, - { - "fg": "#eee8d5", - "bg": "#586e75" - } - ], - "fg-critical": "#002b36", - "bg-critical": "#dc322f", - "fg-warning": "#002b36", - "bg-warning": "#b58900", - - "default-separators": false, - "separator-block-width": 0 + "separator-block-width": 0, + "warning": { + "fg": "#002b36", + "bg": "#b58900" + }, + "critical": { + "fg": "#002b36", + "bg": "#dc322f" + } }, + "cycle": [ + { "fg": "#93a1a1", "bg": "#002b36" }, + { "fg": "#eee8d5", "bg": "#586e75" } + ], "dnf": { - "states": { - "good": { - "fg": "#002b36", - "bg": "#859900" - } + "good": { + "fg": "#002b36", + "bg": "#859900" } }, "battery": { - "states": { - "charged": { - "fg": "#002b36", - "bg": "#859900" - }, - "AC": { - "fg": "#002b36", - "bg": "#859900" - } + "charged": { + "fg": "#002b36", + "bg": "#859900" + }, + "AC": { + "fg": "#002b36", + "bg": "#859900" } } } diff --git a/themes/solarized.json b/themes/solarized.json index 5cc8f86..2ea76c4 100644 --- a/themes/solarized.json +++ b/themes/solarized.json @@ -1,41 +1,41 @@ { "icons": [ "ascii" ], "defaults": { - "cycle": [ - { - "fg": "#93a1a1", - "bg": "#002b36" - }, - { - "fg": "#eee8d5", - "bg": "#586e75" - } - ], - "fg-critical": "#002b36", - "bg-critical": "#dc322f", - "fg-warning": "#002b36", - "bg-warning": "#b58900", - + "critical": { + "fg": "#002b36", + "bg": "#dc322f" + }, + "warning": { + "fg": "#002b36", + "bg": "#b58900" + }, "default_separators": false, "separator": "" }, + "cycle": [ + { + "fg": "#93a1a1", + "bg": "#002b36" + }, + { + "fg": "#eee8d5", + "bg": "#586e75" + } + ], "dnf": { - "states": { - "good": { - "fg": "#002b36", - "bg": "#859900" - } + "good": { + "fg": "#002b36", + "bg": "#859900" } }, "battery": { - "states": { - "charged": { - "fg": "#002b36", - "bg": "#859900" - }, - "AC": { - "fg": "#002b36", - "bg": "#859900" - } + "charged": { + "fg": "#002b36", + "bg": "#859900" + }, + "AC": { + "fg": "#002b36", + "bg": "#859900" + } } } diff --git a/themes/test.json b/themes/test.json new file mode 100644 index 0000000..401a234 --- /dev/null +++ b/themes/test.json @@ -0,0 +1,22 @@ +{ + "icons": [ "test" ], + "defaults": { + "prefix": "default-prefix", + "suffix": "default-suffix", + "fg": "#000000", + "bg": "#111111", + "separator": " * ", + "separator-block-width": 10, + "critical": { + "fg": "#ffffff", + "bg": "#010101" + } + }, + "test-widget": { + "fg": "#ababab", + "bg": "#222222", + "critical": { + "fg": "#bababa" + } + } +} diff --git a/themes/test_cycle.json b/themes/test_cycle.json new file mode 100644 index 0000000..5fd7e1a --- /dev/null +++ b/themes/test_cycle.json @@ -0,0 +1,18 @@ +{ + "icons": [ "test" ], + "defaults": { + "prefix": "default-prefix", + "suffix": "default-suffix", + "fg": "#000000", + "bg": "#111111" + }, + "cycle": [ + { "fg": "#aa0000" }, + { "fg": "#00aa00" }, + { "fg": "#0000aa" } + ], + "test-widget": { + "fg": "#ababab", + "bg": "#222222" + } +}