[input] Add per-PID UNIX socket for additional commands

Allow passing in commands / events via a unix socket. This should allow
for "emulating" input events.

see #547
This commit is contained in:
Tobias Witek 2020-02-14 21:39:55 +01:00
parent 8ae8fbb989
commit ef35c957b2

View file

@ -1,9 +1,11 @@
"""Input classes"""
import os
import sys
import json
import uuid
import time
import socket
import select
import logging
import threading
@ -23,10 +25,28 @@ def is_terminated():
return True
return False
class CommandSocket(object):
def __init__(self):
self._name = "/tmp/.bumblebee-status.{}".format(os.getpid())
self._socket = None
def __enter__(self):
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._socket.bind(self._name)
self._socket.listen(5)
return self._socket
def __exit__(self, type, value, traceback):
self._socket.close()
os.unlink(self._name)
def read_input(inp):
"""Read i3bar input and execute callbacks"""
with CommandSocket() as cmdsocket:
poll = select.poll()
poll.register(sys.stdin.fileno(), select.POLLIN)
poll.register(sys.stdin, select.POLLIN)
poll.register(cmdsocket, select.POLLIN)
log.debug("starting click event processing")
while inp.running:
if is_terminated():
@ -37,6 +57,12 @@ def read_input(inp):
except Exception:
continue
for fileno, event in events:
if fileno == cmdsocket.fileno():
tmp, _ = cmdsocket.accept()
line = tmp.recv(4096).decode()
tmp.close()
else:
line = "["
while line.startswith("["):
line = sys.stdin.readline().strip(",").strip()