[modules] re-add shell module

This commit is contained in:
tobi-wan-kenobi 2020-03-30 20:56:08 +02:00
parent 95410e4adf
commit 006a729be3

93
modules/contrib/shell.py Normal file
View file

@ -0,0 +1,93 @@
# pylint: disable=C0111,R0903,W1401
""" Execute command in shell and print result
Few command examples:
'ping 1.1.1.1 -c 1 | grep -Po "(?<=time=)\d+(\.\d+)? ms"'
'echo "BTC=$(curl -s rate.sx/1BTC | grep -Po \"^\d+\")USD"'
'curl -s https://wttr.in/London?format=%l+%t+%h+%w'
'pip3 freeze | wc -l'
'any_custom_script.sh | grep arguments'
Parameters:
* shell.command: Command to execute
Use single parentheses if evaluating anything inside (sh-style)
For example shell.command='echo $(date +"%H:%M:%S")'
But NOT shell.command="echo $(date +'%H:%M:%S')"
Second one will be evaluated only once at startup
* shell.interval: Update interval in seconds
(defaults to 1s == every bumblebee-status update)
* shell.async: Run update in async mode. Won't run next thread if
previous one didn't finished yet. Useful for long
running scripts to avoid bumblebee-status freezes
(defaults to False)
"""
import os
import subprocess
import threading
import bumblebee.engine
import bumblebee.input
import bumblebee.output
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
widget = bumblebee.output.Widget(full_text=self.get_output)
super(Module, self).__init__(engine, config, widget)
if self.parameter('interval'):
self.interval(self.parameter('interval'))
self._command = self.parameter('command')
self._async = bumblebee.util.asbool(self.parameter('async'))
if self._async:
self._output = 'Computing...'
self._current_thread = None
else:
self._output = ''
# LMB and RMB will update output regardless of timer
engine.input.register_callback(self, button=bumblebee.input.RIGHT_MOUSE, cmd=self.update)
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, cmd=self.update)
def get_output(self, _):
return self._output
def update(self, _):
# if requested then run not async version and just execute command in this thread
if not self._async:
self._output = self._get_command_output_or_error(self._command)
return
# if previous thread didn't end yet then don't do anything
if self._current_thread:
return
# spawn new thread to execute command and pass callback method to get output from it
self._current_thread = threading.Thread(target=self._run_command_in_thread,
args=(self._command, self._output_function))
self._current_thread.start()
@staticmethod
def _get_command_output_or_error(command):
try:
command_output = subprocess.check_output(command,
executable=os.environ.get('SHELL'),
shell=True,
stderr=subprocess.STDOUT)
return command_output.decode('utf-8').strip()
except subprocess.CalledProcessError as exception:
exception_output = exception.output.decode('utf-8').replace('\n', '')
return 'Status:{} output:{}'.format(exception.returncode, exception_output)
def _run_command_in_thread(self, command, output_callback):
output_callback(self._get_command_output_or_error(command))
def _output_function(self, text):
self._output = text
# clear this thread data, so next update will spawn a new one
self._current_thread = None
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4