ef35c957b2
Allow passing in commands / events via a unix socket. This should allow for "emulating" input events. see #547
175 lines
5.2 KiB
Python
175 lines
5.2 KiB
Python
"""Input classes"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import uuid
|
|
import time
|
|
import socket
|
|
import select
|
|
import logging
|
|
import threading
|
|
import bumblebee.util
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
LEFT_MOUSE = 1
|
|
MIDDLE_MOUSE = 2
|
|
RIGHT_MOUSE = 3
|
|
WHEEL_UP = 4
|
|
WHEEL_DOWN = 5
|
|
|
|
def is_terminated():
|
|
for thread in threading.enumerate():
|
|
if thread.name == "MainThread" and not thread.is_alive():
|
|
return True
|
|
return False
|
|
|
|
class CommandSocket(object):
|
|
def __init__(self):
|
|
self._name = "/tmp/.bumblebee-status.{}".format(os.getpid())
|
|
self._socket = None
|
|
|
|
def __enter__(self):
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self._socket.bind(self._name)
|
|
self._socket.listen(5)
|
|
return self._socket
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self._socket.close()
|
|
os.unlink(self._name)
|
|
|
|
def read_input(inp):
|
|
"""Read i3bar input and execute callbacks"""
|
|
|
|
with CommandSocket() as cmdsocket:
|
|
poll = select.poll()
|
|
poll.register(sys.stdin, select.POLLIN)
|
|
poll.register(cmdsocket, select.POLLIN)
|
|
log.debug("starting click event processing")
|
|
while inp.running:
|
|
if is_terminated():
|
|
return
|
|
|
|
try:
|
|
events = poll.poll(1000)
|
|
except Exception:
|
|
continue
|
|
for fileno, event in events:
|
|
|
|
if fileno == cmdsocket.fileno():
|
|
tmp, _ = cmdsocket.accept()
|
|
line = tmp.recv(4096).decode()
|
|
tmp.close()
|
|
else:
|
|
line = "["
|
|
while line.startswith("["):
|
|
line = sys.stdin.readline().strip(",").strip()
|
|
log.debug("new event: {}".format(line))
|
|
inp.has_event = True
|
|
try:
|
|
event = json.loads(line)
|
|
if "instance" in event:
|
|
inp.callback(event)
|
|
inp.redraw()
|
|
else:
|
|
log.debug("field 'instance' missing in input, not processing the event")
|
|
except ValueError as e:
|
|
log.debug("failed to parse event: {}".format(e))
|
|
log.debug("exiting click event processing")
|
|
poll.unregister(sys.stdin.fileno())
|
|
inp.has_event = True
|
|
inp.clean_exit = True
|
|
|
|
class I3BarInput(object):
|
|
"""Process incoming events from the i3bar"""
|
|
def __init__(self):
|
|
self.running = True
|
|
self._callbacks = {}
|
|
self.clean_exit = False
|
|
self.global_id = str(uuid.uuid4())
|
|
self.need_event = False
|
|
self.has_event = False
|
|
self._condition = threading.Condition()
|
|
|
|
def start(self):
|
|
"""Start asynchronous input processing"""
|
|
self.has_event = False
|
|
self.running = True
|
|
self._condition.acquire()
|
|
self._thread = threading.Thread(target=read_input, args=(self,))
|
|
self._thread.start()
|
|
|
|
def redraw(self):
|
|
self._condition.acquire()
|
|
self._condition.notify()
|
|
self._condition.release()
|
|
|
|
def alive(self):
|
|
"""Check whether the input processing is still active"""
|
|
return self._thread.is_alive()
|
|
|
|
def wait(self, timeout):
|
|
self._condition.wait(timeout)
|
|
|
|
def _wait(self):
|
|
while not self.has_event:
|
|
time.sleep(0.1)
|
|
self.has_event = False
|
|
|
|
def stop(self):
|
|
"""Stop asynchronous input processing"""
|
|
self._condition.release()
|
|
if self.need_event:
|
|
self._wait()
|
|
self.running = False
|
|
self._thread.join()
|
|
return self.clean_exit
|
|
|
|
def _uuidstr(self, name, button):
|
|
return "{}::{}".format(name, button)
|
|
|
|
def _uid(self, obj, button):
|
|
uid = self.global_id
|
|
if obj:
|
|
uid = obj.id
|
|
return self._uuidstr(uid, button)
|
|
|
|
def deregister_callbacks(self, obj):
|
|
to_delete = []
|
|
uid = obj.id if obj else self.global_id
|
|
for key in self._callbacks:
|
|
if uid in key:
|
|
to_delete.append(key)
|
|
for key in to_delete:
|
|
del self._callbacks[key]
|
|
|
|
def register_callback(self, obj, button, cmd):
|
|
"""Register a callback function or system call"""
|
|
uid = self._uid(obj, button)
|
|
if uid not in self._callbacks:
|
|
self._callbacks[uid] = {}
|
|
self._callbacks[uid] = cmd
|
|
|
|
def callback(self, event):
|
|
"""Execute callback action for an incoming event"""
|
|
button = event["button"]
|
|
|
|
cmd = self._callbacks.get(self._uuidstr(self.global_id, button), None)
|
|
cmd = self._callbacks.get(self._uuidstr(event["name"], button), cmd)
|
|
cmd = self._callbacks.get(self._uuidstr(event["instance"], button), cmd)
|
|
|
|
if cmd is None:
|
|
return
|
|
try:
|
|
if callable(cmd):
|
|
cmd(event)
|
|
else:
|
|
bumblebee.util.execute(cmd, False)
|
|
except Exception:
|
|
# fall back to global default
|
|
if not "__fallback" in event:
|
|
return self.callback({"name": None, "instance": None, "__fallback": True, "button": event["button"]})
|
|
|
|
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|