bumblebee-status/bumblebee/input.py
Tobi-wan Kenobi e72c25b0bc [core] Add input processing
Create infrastructure for input event handling and add i3bar event
processing. For each event, callbacks can be registered in the input
module.
Modules and widgets both identify themselves using a unique ID (the
module name for modules, a generated UUID for the widgets). This ID is
then used for registering the callbacks. This is possible since both
widgets and modules are statically allocated & do not change their IDs.

Callback actions can be either callable Python objects (in which case
the event is passed as parameter), or strings, in which case the string
is interpreted as a shell command.

see #23
2016-12-09 19:29:16 +01:00

77 lines
2.1 KiB
Python

"""Input classes"""
import sys
import json
import uuid
import time
import threading
import bumblebee.util
LEFT_MOUSE = 1
RIGHT_MOUSE = 3
def read_input(inp):
"""Read i3bar input and execute callbacks"""
while inp.running:
line = sys.stdin.readline().strip(",").strip()
inp.has_event = True
try:
event = json.loads(line)
inp.callback(event)
except ValueError:
pass
inp.has_event = True
inp.clean_exit = True
class I3BarInput(object):
"""Process incoming events from the i3bar"""
def __init__(self):
self.running = True
self._thread = threading.Thread(target=read_input, args=(self,))
self._callbacks = {}
self.clean_exit = False
self.global_id = str(uuid.uuid4())
self.need_event = False
self.has_event = False
def start(self):
"""Start asynchronous input processing"""
self._thread.start()
def alive(self):
"""Check whether the input processing is still active"""
return self._thread.is_alive()
def stop(self):
"""Stop asynchronous input processing"""
if self.need_event:
while not self.has_event:
time.sleep(0.1)
self.running = False
self._thread.join()
return self.clean_exit
def register_callback(self, obj, button, cmd):
"""Register a callback function or system call"""
uid = self.global_id
if obj:
uid = obj.id
if uid not in self._callbacks:
self._callbacks[uid] = {}
self._callbacks[uid][button] = cmd
def callback(self, event):
"""Execute callback action for an incoming event"""
cmd = self._callbacks.get(self.global_id, {})
cmd = self._callbacks.get(event["name"], cmd)
cmd = self._callbacks.get(event["instance"], cmd)
cmd = cmd.get(event["button"], None)
if cmd is None:
return
if callable(cmd):
cmd(event)
else:
bumblebee.util.execute(cmd)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4