#!/usr/bin/env python import os import sys import json import time import signal import socket import select import logging import threading import bumblebee_status.discover bumblebee_status.discover.discover() import core.config import core.output import core.module import core.input import core.event import util.format started = False class CommandSocket(object): def __init__(self): self.__name = "/tmp/.bumblebee-status.{}".format(os.getpid()) self.__socket = None def __enter__(self): self.__socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.__socket.bind(self.__name) self.__socket.listen(5) return self.__socket def __exit__(self, type, value, traceback): self.__socket.close() os.unlink(self.__name) def handle_input(output, config, update_lock): with CommandSocket() as cmdsocket: poll = select.poll() poll.register(sys.stdin.fileno(), select.POLLIN) poll.register(cmdsocket, select.POLLIN) while True: events = poll.poll() modules = {} for fileno, event in events: if fileno == cmdsocket.fileno(): tmp, _ = cmdsocket.accept() line = tmp.recv(4096).decode() tmp.close() logging.debug("socket event {}".format(line)) else: line = "[" while line.startswith("["): line = sys.stdin.readline().strip(",").strip() logging.info("input event: {}".format(line)) try: event = json.loads(line) core.input.trigger(event) if "name" in event: modules[event["name"]] = True except ValueError: pass delay = float(config.get("engine.input_delay", 0.2)) if delay > 0: time.sleep(delay) if update_lock.acquire(blocking=False) == True: core.event.trigger("update", modules.keys(), force=True) core.event.trigger("draw") update_lock.release() poll.unregister(sys.stdin.fileno()) def main(): config = core.config.Config(sys.argv[1:]) level = logging.DEBUG if config.debug() else logging.ERROR if config.logfile(): logging.basicConfig( level=level, format="[%(asctime)s] %(module)-16s %(levelname)-8s %(message)s", filename=os.path.abspath(os.path.expanduser(config.logfile())), ) else: logging.basicConfig( level=level, format="[%(asctime)s] %(module)-16s %(levelname)-8s %(message)s", stream=sys.stderr, ) theme = core.theme.Theme(config.theme(), config.iconset()) output = core.output.i3(theme, config) modules = [] core.input.register(None, core.input.WHEEL_UP, "i3-msg workspace prev_on_output") core.input.register(None, core.input.WHEEL_DOWN, "i3-msg workspace next_on_output") update_lock = threading.Lock() input_thread = threading.Thread(target=handle_input, args=(output, config, update_lock, )) input_thread.daemon = True input_thread.start() def sig_USR1_handler(signum,stack): if update_lock.acquire(blocking=False) == True: core.event.trigger("update", force=True) core.event.trigger("draw") update_lock.release() if config.debug(): modules.append(core.module.load("debug", config, theme)) for module in config.modules(): modules.append(core.module.load(module, config, theme)) modules[-1].register_callbacks() if config.reverse(): modules.reverse() output.modules(modules) if util.format.asbool(config.get("engine.collapsible", True)) == True: core.input.register(None, core.input.MIDDLE_MOUSE, output.toggle_minimize) core.event.trigger("start") started = True signal.signal(10, sig_USR1_handler) while True: if update_lock.acquire(blocking=False) == True: core.event.trigger("update") core.event.trigger("draw") update_lock.release() output.wait(config.interval()) core.event.trigger("stop") if __name__ == "__main__": try: if sys.version_info.major < 3: raise Exception("at least Python 3.4 required (Python 2.x not supported)") main() except Exception as e: # really basic errors -> make sure these are shown in the status bar by minimal config if not started: print("{\"version\":1}") print("[") while True: sys.stdout.write( json.dumps( [ { "full_text": " {} ".format(e), "background": "#ff0000", "color": "#ffffff", "name": "error", "instance": "the-only-one", } ] ) ) sys.stdout.write(",\n") sys.stdout.flush() time.sleep(5) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4