[core] Add input processing
Create infrastructure for input event handling and add i3bar event processing. For each event, callbacks can be registered in the input module. Modules and widgets both identify themselves using a unique ID (the module name for modules, a generated UUID for the widgets). This ID is then used for registering the callbacks. This is possible since both widgets and modules are statically allocated & do not change their IDs. Callback actions can be either callable Python objects (in which case the event is passed as parameter), or strings, in which case the string is interpreted as a shell command. see #23
This commit is contained in:
parent
fa30b9505b
commit
e72c25b0bc
10 changed files with 274 additions and 19 deletions
|
@ -5,23 +5,34 @@ import bumblebee.theme
|
|||
import bumblebee.engine
|
||||
import bumblebee.config
|
||||
import bumblebee.output
|
||||
import bumblebee.input
|
||||
|
||||
def main():
|
||||
config = bumblebee.config.Config(sys.argv[1:])
|
||||
theme = bumblebee.theme.Theme(config.theme())
|
||||
output = bumblebee.output.I3BarOutput(theme=theme)
|
||||
inp = bumblebee.input.I3BarInput()
|
||||
engine = bumblebee.engine.Engine(
|
||||
config=config,
|
||||
output=output,
|
||||
inp=inp,
|
||||
)
|
||||
|
||||
engine.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
engine.run()
|
||||
except KeyboardInterrupt as error:
|
||||
inp.stop()
|
||||
sys.exit(0)
|
||||
except bumblebee.error.BaseError as error:
|
||||
inp.stop()
|
||||
sys.stderr.write("fatal: {}\n".format(error))
|
||||
sys.exit(1)
|
||||
except Exception as error:
|
||||
inp.stop()
|
||||
sys.stderr.write("fatal: {}\n".format(error))
|
||||
sys.exit(2)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
||||
|
|
|
@ -30,6 +30,7 @@ class Module(object):
|
|||
self._config = config
|
||||
if "name" not in self._config:
|
||||
self._config["name"] = self.name
|
||||
self.id = self._config["name"]
|
||||
self._widgets = []
|
||||
if widgets:
|
||||
self._widgets = widgets if isinstance(widgets, list) else [widgets]
|
||||
|
@ -53,12 +54,14 @@ class Engine(object):
|
|||
This class connects input/output, instantiates all
|
||||
required modules and drives the "event loop"
|
||||
"""
|
||||
def __init__(self, config, output=None):
|
||||
def __init__(self, config, output=None, inp=None):
|
||||
self._output = output
|
||||
self._config = config
|
||||
self._running = True
|
||||
self._modules = []
|
||||
self.input = inp
|
||||
self.load_modules(config.modules())
|
||||
self.input.start()
|
||||
|
||||
def load_modules(self, modules):
|
||||
"""Load specified modules and return them as list"""
|
||||
|
@ -96,12 +99,13 @@ class Engine(object):
|
|||
module.update(module.widgets())
|
||||
for widget in module.widgets():
|
||||
widget.link_module(module)
|
||||
self._output.draw(widget=widget, engine=self)
|
||||
self._output.draw(widget=widget, module=module, engine=self)
|
||||
self._output.flush()
|
||||
self._output.end()
|
||||
if self.running():
|
||||
time.sleep(1)
|
||||
|
||||
self._output.stop()
|
||||
self.input.stop()
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
||||
|
|
77
bumblebee/input.py
Normal file
77
bumblebee/input.py
Normal file
|
@ -0,0 +1,77 @@
|
|||
"""Input classes"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import uuid
|
||||
import time
|
||||
import threading
|
||||
import bumblebee.util
|
||||
|
||||
LEFT_MOUSE = 1
|
||||
RIGHT_MOUSE = 3
|
||||
|
||||
def read_input(inp):
|
||||
"""Read i3bar input and execute callbacks"""
|
||||
while inp.running:
|
||||
line = sys.stdin.readline().strip(",").strip()
|
||||
inp.has_event = True
|
||||
try:
|
||||
event = json.loads(line)
|
||||
inp.callback(event)
|
||||
except ValueError:
|
||||
pass
|
||||
inp.has_event = True
|
||||
inp.clean_exit = True
|
||||
|
||||
class I3BarInput(object):
|
||||
"""Process incoming events from the i3bar"""
|
||||
def __init__(self):
|
||||
self.running = True
|
||||
self._thread = threading.Thread(target=read_input, args=(self,))
|
||||
self._callbacks = {}
|
||||
self.clean_exit = False
|
||||
self.global_id = str(uuid.uuid4())
|
||||
self.need_event = False
|
||||
self.has_event = False
|
||||
|
||||
def start(self):
|
||||
"""Start asynchronous input processing"""
|
||||
self._thread.start()
|
||||
|
||||
def alive(self):
|
||||
"""Check whether the input processing is still active"""
|
||||
return self._thread.is_alive()
|
||||
|
||||
def stop(self):
|
||||
"""Stop asynchronous input processing"""
|
||||
if self.need_event:
|
||||
while not self.has_event:
|
||||
time.sleep(0.1)
|
||||
self.running = False
|
||||
self._thread.join()
|
||||
return self.clean_exit
|
||||
|
||||
def register_callback(self, obj, button, cmd):
|
||||
"""Register a callback function or system call"""
|
||||
uid = self.global_id
|
||||
if obj:
|
||||
uid = obj.id
|
||||
|
||||
if uid not in self._callbacks:
|
||||
self._callbacks[uid] = {}
|
||||
self._callbacks[uid][button] = cmd
|
||||
|
||||
def callback(self, event):
|
||||
"""Execute callback action for an incoming event"""
|
||||
cmd = self._callbacks.get(self.global_id, {})
|
||||
cmd = self._callbacks.get(event["name"], cmd)
|
||||
cmd = self._callbacks.get(event["instance"], cmd)
|
||||
cmd = cmd.get(event["button"], None)
|
||||
if cmd is None:
|
||||
return
|
||||
if callable(cmd):
|
||||
cmd(event)
|
||||
else:
|
||||
bumblebee.util.execute(cmd)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -11,6 +11,8 @@ class Module(bumblebee.engine.Module):
|
|||
bumblebee.output.Widget(full_text=self.utilization)
|
||||
)
|
||||
self._utilization = psutil.cpu_percent(percpu=False)
|
||||
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
|
||||
cmd="gnome-system-monitor")
|
||||
|
||||
def utilization(self):
|
||||
return "{:05.02f}%".format(self._utilization)
|
||||
|
|
|
@ -4,12 +4,14 @@
|
|||
|
||||
import sys
|
||||
import json
|
||||
import uuid
|
||||
|
||||
class Widget(object):
|
||||
"""Represents a single visible block in the status bar"""
|
||||
def __init__(self, full_text):
|
||||
self._full_text = full_text
|
||||
self.module = None
|
||||
self.id = str(uuid.uuid4())
|
||||
|
||||
def link_module(self, module):
|
||||
"""Set the module that spawned this widget
|
||||
|
@ -43,7 +45,7 @@ class I3BarOutput(object):
|
|||
"""Finish i3bar protocol"""
|
||||
sys.stdout.write("]\n")
|
||||
|
||||
def draw(self, widget, engine=None):
|
||||
def draw(self, widget, module=None, engine=None):
|
||||
"""Draw a single widget"""
|
||||
full_text = widget.full_text()
|
||||
padding = self._theme.padding(widget)
|
||||
|
@ -68,6 +70,8 @@ class I3BarOutput(object):
|
|||
"background": self._theme.bg(widget),
|
||||
"separator_block_width": self._theme.separator_block_width(widget),
|
||||
"separator": True if separator is None else False,
|
||||
"instance": widget.id,
|
||||
"name": module.id,
|
||||
})
|
||||
|
||||
def begin(self):
|
||||
|
|
21
bumblebee/util.py
Normal file
21
bumblebee/util.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
import shlex
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
from exceptions import RuntimeError
|
||||
except ImportError:
|
||||
# Python3 doesn't require this anymore
|
||||
pass
|
||||
|
||||
def execute(cmd, wait=True):
|
||||
args = shlex.split(cmd)
|
||||
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
if wait:
|
||||
out, _ = proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError("{} exited with {}".format(cmd, proc.returncode))
|
||||
return out.decode("utf-8")
|
||||
return None
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -6,11 +6,11 @@ from bumblebee.error import ModuleLoadError
|
|||
from bumblebee.engine import Engine
|
||||
from bumblebee.config import Config
|
||||
|
||||
from tests.util import MockOutput
|
||||
from tests.util import MockOutput, MockInput
|
||||
|
||||
class TestEngine(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.engine = Engine(config=Config(), output=MockOutput())
|
||||
self.engine = Engine(config=Config(), output=MockOutput(), inp=MockInput())
|
||||
self.singleWidgetModule = [{"module": "test", "name": "a"}]
|
||||
self.testModule = "test"
|
||||
self.invalidModule = "no-such-module"
|
||||
|
|
120
tests/test_i3barinput.py
Normal file
120
tests/test_i3barinput.py
Normal file
|
@ -0,0 +1,120 @@
|
|||
# pylint: disable=C0103,C0111
|
||||
|
||||
import unittest
|
||||
import json
|
||||
import subprocess
|
||||
import mock
|
||||
|
||||
import bumblebee.input
|
||||
from bumblebee.input import I3BarInput
|
||||
from tests.util import MockWidget, MockModule
|
||||
|
||||
class TestI3BarInput(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.inp = I3BarInput()
|
||||
self.inp.need_event = True
|
||||
self.anyModule = MockModule()
|
||||
self.anyWidget = MockWidget("test")
|
||||
self.anyModule.id = "test-module"
|
||||
self._called = 0
|
||||
|
||||
def callback(self, event):
|
||||
self._called += 1
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_basic_read_event(self, mock_input):
|
||||
mock_input.readline.return_value = ""
|
||||
self.inp.start()
|
||||
self.inp.stop()
|
||||
mock_input.readline.assert_any_call()
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_ignore_invalid_data(self, mock_input):
|
||||
mock_input.readline.return_value = "garbage"
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.alive(), True)
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_ignore_invalid_event(self, mock_input):
|
||||
mock_input.readline.return_value = json.dumps({
|
||||
"name": None,
|
||||
"instance": None,
|
||||
"button": None,
|
||||
})
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.alive(), True)
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_global_callback(self, mock_input):
|
||||
mock_input.readline.return_value = json.dumps({
|
||||
"name": "somename",
|
||||
"instance": "someinstance",
|
||||
"button": bumblebee.input.LEFT_MOUSE,
|
||||
})
|
||||
self.inp.register_callback(None, button=1, cmd=self.callback)
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
self.assertTrue(self._called > 0)
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_global_callback_button_missmatch(self, mock_input):
|
||||
mock_input.readline.return_value = json.dumps({
|
||||
"name": "somename",
|
||||
"instance": "someinstance",
|
||||
"button": bumblebee.input.RIGHT_MOUSE,
|
||||
})
|
||||
self.inp.register_callback(None, button=1, cmd=self.callback)
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
self.assertTrue(self._called == 0)
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_module_callback(self, mock_input):
|
||||
mock_input.readline.return_value = json.dumps({
|
||||
"name": self.anyModule.id,
|
||||
"instance": None,
|
||||
"button": bumblebee.input.LEFT_MOUSE,
|
||||
})
|
||||
self.inp.register_callback(self.anyModule, button=1, cmd=self.callback)
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
self.assertTrue(self._called > 0)
|
||||
|
||||
@mock.patch("sys.stdin")
|
||||
def test_widget_callback(self, mock_input):
|
||||
mock_input.readline.return_value = json.dumps({
|
||||
"name": "test",
|
||||
"instance": self.anyWidget.id,
|
||||
"button": bumblebee.input.LEFT_MOUSE,
|
||||
})
|
||||
self.inp.register_callback(self.anyWidget, button=1, cmd=self.callback)
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
self.assertTrue(self._called > 0)
|
||||
|
||||
@mock.patch("subprocess.Popen")
|
||||
@mock.patch("sys.stdin")
|
||||
def test_widget_cmd_callback(self, mock_input, mock_output):
|
||||
mock_input.readline.return_value = json.dumps({
|
||||
"name": "test",
|
||||
"instance": self.anyWidget.id,
|
||||
"button": bumblebee.input.LEFT_MOUSE,
|
||||
})
|
||||
self.inp.register_callback(self.anyWidget, button=1, cmd="echo")
|
||||
self.inp.start()
|
||||
self.assertEquals(self.inp.stop(), True)
|
||||
mock_input.readline.assert_any_call()
|
||||
mock_output.assert_called_with(["echo"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT
|
||||
)
|
||||
|
||||
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
@ -9,8 +9,7 @@ except ImportError:
|
|||
from io import StringIO
|
||||
|
||||
from bumblebee.output import I3BarOutput
|
||||
from tests.util import MockWidget
|
||||
from tests.util import MockTheme
|
||||
from tests.util import MockWidget, MockTheme, MockModule
|
||||
|
||||
class TestI3BarOutput(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -19,6 +18,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
self.expectedStart = json.dumps({"version": 1, "click_events": True}) + "[\n"
|
||||
self.expectedStop = "]\n"
|
||||
self.someWidget = MockWidget("foo bar baz")
|
||||
self.anyModule = MockModule(None, None)
|
||||
self.anyColor = "#ababab"
|
||||
self.anotherColor = "#cccccc"
|
||||
|
||||
|
@ -34,7 +34,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
|
||||
@mock.patch("sys.stdout", new_callable=StringIO)
|
||||
def test_draw_single_widget(self, stdout):
|
||||
self.output.draw(self.someWidget)
|
||||
self.output.draw(self.someWidget, self.anyModule)
|
||||
self.output.flush()
|
||||
result = json.loads(stdout.getvalue())[0]
|
||||
self.assertEquals(result["full_text"], self.someWidget.full_text())
|
||||
|
@ -42,7 +42,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
@mock.patch("sys.stdout", new_callable=StringIO)
|
||||
def test_draw_multiple_widgets(self, stdout):
|
||||
for widget in [self.someWidget, self.someWidget]:
|
||||
self.output.draw(widget)
|
||||
self.output.draw(widget, self.anyModule)
|
||||
self.output.flush()
|
||||
result = json.loads(stdout.getvalue())
|
||||
for res in result:
|
||||
|
@ -61,7 +61,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
@mock.patch("sys.stdout", new_callable=StringIO)
|
||||
def test_prefix(self, stdout):
|
||||
self.theme.attr_prefix = " - "
|
||||
self.output.draw(self.someWidget)
|
||||
self.output.draw(self.someWidget, self.anyModule)
|
||||
self.output.flush()
|
||||
result = json.loads(stdout.getvalue())[0]
|
||||
self.assertEquals(result["full_text"], "{}{}".format(
|
||||
|
@ -71,7 +71,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
@mock.patch("sys.stdout", new_callable=StringIO)
|
||||
def test_suffix(self, stdout):
|
||||
self.theme.attr_suffix = " - "
|
||||
self.output.draw(self.someWidget)
|
||||
self.output.draw(self.someWidget, self.anyModule)
|
||||
self.output.flush()
|
||||
result = json.loads(stdout.getvalue())[0]
|
||||
self.assertEquals(result["full_text"], "{}{}".format(
|
||||
|
@ -82,7 +82,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
def test_bothfix(self, stdout):
|
||||
self.theme.attr_suffix = " - "
|
||||
self.theme.attr_prefix = " * "
|
||||
self.output.draw(self.someWidget)
|
||||
self.output.draw(self.someWidget, self.anyModule)
|
||||
self.output.flush()
|
||||
result = json.loads(stdout.getvalue())[0]
|
||||
self.assertEquals(result["full_text"], "{}{}{}".format(
|
||||
|
@ -95,7 +95,7 @@ class TestI3BarOutput(unittest.TestCase):
|
|||
def test_colors(self, stdout):
|
||||
self.theme.attr_fg = self.anyColor
|
||||
self.theme.attr_bg = self.anotherColor
|
||||
self.output.draw(self.someWidget)
|
||||
self.output.draw(self.someWidget, self.anyModule)
|
||||
self.output.flush()
|
||||
result = json.loads(stdout.getvalue())[0]
|
||||
self.assertEquals(result["color"], self.anyColor)
|
||||
|
|
|
@ -6,8 +6,19 @@ def assertWidgetAttributes(test, widget):
|
|||
test.assertTrue(isinstance(widget, Widget))
|
||||
test.assertTrue(hasattr(widget, "full_text"))
|
||||
|
||||
class MockInput(object):
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def register_callback(self, obj, button, cmd):
|
||||
pass
|
||||
|
||||
class MockEngine(object):
|
||||
pass
|
||||
def __init__(self):
|
||||
self.input = MockInput()
|
||||
|
||||
class MockOutput(object):
|
||||
def start(self):
|
||||
|
@ -16,7 +27,7 @@ class MockOutput(object):
|
|||
def stop(self):
|
||||
pass
|
||||
|
||||
def draw(self, widget, engine):
|
||||
def draw(self, widget, engine, module):
|
||||
engine.stop()
|
||||
|
||||
def begin(self):
|
||||
|
@ -28,12 +39,17 @@ class MockOutput(object):
|
|||
def end(self):
|
||||
pass
|
||||
|
||||
class MockModule(object):
|
||||
def __init__(self, engine=None, config=None):
|
||||
self.id = None
|
||||
|
||||
class MockWidget(Widget):
|
||||
def __init__(self, text):
|
||||
super(MockWidget, self).__init__(text)
|
||||
self._text = text
|
||||
self.module = None
|
||||
self.attr_state = "state-default"
|
||||
self.id = "none"
|
||||
|
||||
def state(self):
|
||||
return self.attr_state
|
||||
|
|
Loading…
Reference in a new issue