"""Input classes""" import os import sys import json import uuid import time import socket import select import logging import threading import bumblebee.util log = logging.getLogger(__name__) LEFT_MOUSE = 1 MIDDLE_MOUSE = 2 RIGHT_MOUSE = 3 WHEEL_UP = 4 WHEEL_DOWN = 5 def is_terminated(): for thread in threading.enumerate(): if thread.name == "MainThread" and not thread.is_alive(): return True return False class CommandSocket(object): def __init__(self): self._name = "/tmp/.bumblebee-status.{}".format(os.getpid()) self._socket = None def __enter__(self): self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._socket.bind(self._name) self._socket.listen(5) return self._socket def __exit__(self, type, value, traceback): self._socket.close() os.unlink(self._name) def read_input(inp): """Read i3bar input and execute callbacks""" with CommandSocket() as cmdsocket: poll = select.poll() poll.register(sys.stdin, select.POLLIN) poll.register(cmdsocket, select.POLLIN) log.debug("starting click event processing") while inp.running: if is_terminated(): return try: events = poll.poll(1000) except Exception: continue for fileno, event in events: if fileno == cmdsocket.fileno(): tmp, _ = cmdsocket.accept() line = tmp.recv(4096).decode() tmp.close() else: line = "[" while line.startswith("["): line = sys.stdin.readline().strip(",").strip() log.debug("new event: {}".format(line)) inp.has_event = True try: event = json.loads(line) if "instance" in event: inp.callback(event) inp.redraw() else: log.debug("field 'instance' missing in input, not processing the event") except ValueError as e: log.debug("failed to parse event: {}".format(e)) log.debug("exiting click event processing") poll.unregister(sys.stdin.fileno()) inp.has_event = True inp.clean_exit = True class I3BarInput(object): """Process incoming events from the i3bar""" def __init__(self): self.running = True self._callbacks = {} self.clean_exit = False self.global_id = str(uuid.uuid4()) self.need_event = False self.has_event = False self._condition = threading.Condition() def start(self): """Start asynchronous input processing""" self.has_event = False self.running = True self._condition.acquire() self._thread = threading.Thread(target=read_input, args=(self,)) self._thread.start() def redraw(self): self._condition.acquire() self._condition.notify() self._condition.release() def alive(self): """Check whether the input processing is still active""" return self._thread.is_alive() def wait(self, timeout): self._condition.wait(timeout) def _wait(self): while not self.has_event: time.sleep(0.1) self.has_event = False def stop(self): """Stop asynchronous input processing""" self._condition.release() if self.need_event: self._wait() self.running = False self._thread.join() return self.clean_exit def _uuidstr(self, name, button): return "{}::{}".format(name, button) def _uid(self, obj, button): uid = self.global_id if obj: uid = obj.id return self._uuidstr(uid, button) def deregister_callbacks(self, obj): to_delete = [] uid = obj.id if obj else self.global_id for key in self._callbacks: if uid in key: to_delete.append(key) for key in to_delete: del self._callbacks[key] def register_callback(self, obj, button, cmd): """Register a callback function or system call""" uid = self._uid(obj, button) if uid not in self._callbacks: self._callbacks[uid] = {} self._callbacks[uid] = cmd def callback(self, event): """Execute callback action for an incoming event""" button = event["button"] cmd = self._callbacks.get(self._uuidstr(self.global_id, button), None) cmd = self._callbacks.get(self._uuidstr(event["name"], button), cmd) cmd = self._callbacks.get(self._uuidstr(event["instance"], button), cmd) if cmd is None: return try: if callable(cmd): cmd(event) else: bumblebee.util.execute(cmd, False) except Exception: # fall back to global default if not "__fallback" in event: return self.callback({"name": None, "instance": None, "__fallback": True, "button": event["button"]}) # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4