bumblebee-status/modules/contrib/shell.py

79 lines
2.8 KiB
Python
Raw Normal View History

2020-03-30 18:56:08 +00:00
# pylint: disable=C0111,R0903,W1401
""" Execute command in shell and print result
Few command examples:
2020-03-30 19:09:09 +00:00
'ping -c 1 1.1.1.1 | grep -Po '(?<=time=)\d+(\.\d+)? ms''
'echo 'BTC=$(curl -s rate.sx/1BTC | grep -Po \'^\d+\')USD''
2020-03-30 18:56:08 +00:00
'curl -s https://wttr.in/London?format=%l+%t+%h+%w'
'pip3 freeze | wc -l'
'any_custom_script.sh | grep arguments'
Parameters:
* shell.command: Command to execute
Use single parentheses if evaluating anything inside (sh-style)
For example shell.command='echo $(date +'%H:%M:%S')'
But NOT shell.command='echo $(date +'%H:%M:%S')'
2020-03-30 18:56:08 +00:00
Second one will be evaluated only once at startup
* shell.interval: Update interval in seconds
(defaults to 1s == every bumblebee-status update)
* shell.async: Run update in async mode. Won't run next thread if
previous one didn't finished yet. Useful for long
running scripts to avoid bumblebee-status freezes
(defaults to False)
"""
import os
import subprocess
import threading
2020-03-30 19:09:09 +00:00
import core.module
import core.widget
import core.input
import util.format
import util.cli
2020-03-30 18:56:08 +00:00
2020-03-30 19:09:09 +00:00
class Module(core.module.Module):
def __init__(self, config, theme):
super().__init__(config, theme, core.widget.Widget(self.get_output))
2020-03-30 18:56:08 +00:00
self.__command = self.parameter('command', 'echo "no command configured"')
2020-03-30 19:09:09 +00:00
self.__async = util.format.asbool(self.parameter('async'))
2020-03-30 18:56:08 +00:00
2020-03-30 19:09:09 +00:00
if self.__async:
self.__output = 'please wait...'
self.__current_thread = threading.Thread()
2020-03-30 18:56:08 +00:00
# LMB and RMB will update output regardless of timer
2020-03-30 19:09:09 +00:00
core.input.register(self, button=core.input.LEFT_MOUSE, cmd=self.update)
core.input.register(self, button=core.input.RIGHT_MOUSE, cmd=self.update)
2020-03-30 18:56:08 +00:00
def set_output(self, value):
self.__output = value
2020-03-30 18:56:08 +00:00
def get_output(self, _):
2020-03-30 19:09:09 +00:00
return self.__output
2020-03-30 18:56:08 +00:00
2020-03-30 19:09:09 +00:00
def update(self):
2020-03-30 18:56:08 +00:00
# if requested then run not async version and just execute command in this thread
2020-03-30 19:09:09 +00:00
if not self.__async:
self.__output = util.cli.execute(self.__command, ignore_errors=True).strip()
2020-03-30 18:56:08 +00:00
return
# if previous thread didn't end yet then don't do anything
if self.__current_thread.is_alive():
2020-03-30 18:56:08 +00:00
return
# spawn new thread to execute command and pass callback method to get output from it
2020-03-30 19:09:09 +00:00
self.__current_thread = threading.Thread(
target=lambda obj, cmd: obj.set_output(util.cli.execute(cmd, ignore_errors=True)),
args=(self, self.__command)
2020-03-30 19:09:09 +00:00
)
self.__current_thread.start()
def state(self, _):
if self.__output == 'no command configured':
return 'warning'
2020-03-30 18:56:08 +00:00
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4