Merge remote-tracking branch 'upstream/master'

This commit is contained in:
mw 2019-08-24 11:40:00 +02:00
commit 928895befe
33 changed files with 1859 additions and 680 deletions

View file

@ -84,7 +84,7 @@ class Config(bumblebee.store.Store):
parameters = [item for sub in self._args.parameters for item in sub]
for param in parameters:
key, value = param.split("=")
key, value = param.split("=", 1)
self.set(key, value)
def modules(self):

View file

@ -41,6 +41,7 @@ class Module(object):
self.error = None
self._next = int(time.time())
self._default_interval = 0
self._engine = engine
self._configFile = None
for cfg in [os.path.expanduser("~/.bumblebee-status.conf"), os.path.expanduser("~/.config/bumblebee-status.conf")]:
@ -56,6 +57,9 @@ class Module(object):
if widgets:
self._widgets = widgets if isinstance(widgets, list) else [widgets]
def theme(self):
return self._engine.theme()
def widgets(self, widgets=None):
"""Return the widgets to draw for this module"""
if widgets:
@ -160,6 +164,9 @@ class Engine(object):
self.input.start()
def theme(self):
return self._theme
def _toggle_minimize(self, event):
for module in self._modules:
widget = module.widget_by_id(event["instance"])

View file

@ -25,15 +25,15 @@ def is_terminated():
def read_input(inp):
"""Read i3bar input and execute callbacks"""
epoll = select.epoll()
epoll.register(sys.stdin.fileno(), select.EPOLLIN)
poll = select.poll()
poll.register(sys.stdin.fileno(), select.POLLIN)
log.debug("starting click event processing")
while inp.running:
if is_terminated():
return
try:
events = epoll.poll(1)
events = poll.poll(1000)
except Exception:
continue
for fileno, event in events:
@ -52,8 +52,7 @@ def read_input(inp):
except ValueError as e:
log.debug("failed to parse event: {}".format(e))
log.debug("exiting click event processing")
epoll.unregister(sys.stdin.fileno())
epoll.close()
poll.unregister(sys.stdin.fileno())
inp.has_event = True
inp.clean_exit = True

View file

@ -29,11 +29,15 @@ class Module(bumblebee.engine.Module):
return len(packages)
return 0
@property
def _format(self):
return self.parameter("format", "Update Arch: {}")
def utilization(self, widget):
return 'Update Arch: {}'.format(self.packages)
return self._format.format(self.packages)
def hidden(self):
return self.check_updates() == 0
return self.check_updates() == 0
def update(self, widgets):
self.packages = self.check_updates()

View file

@ -7,6 +7,7 @@ Parameters:
* battery.warning : Warning threshold in % of remaining charge (defaults to 20)
* battery.critical : Critical threshold in % of remaining charge (defaults to 10)
* battery.showdevice : If set to "true", add the device name to the widget (defaults to False)
* battery.decorate : If set to "false", hides additional icons (charging, etc.) (defaults to True)
"""
import os
@ -47,6 +48,8 @@ class Module(bumblebee.engine.Module):
self.capacity(widget)
while len(widgets) > 0: del widgets[0]
for widget in new_widgets:
if bumblebee.util.asbool(self.parameter("decorate", True)) == False:
widget.set("theme.exclude", "suffix")
widgets.append(widget)
self._widgets = widgets

View file

@ -17,8 +17,11 @@ Parameters:
from __future__ import absolute_import
import datetime
import locale
import pytz
import tzlocal
try:
import pytz
import tzlocal
except:
pass
import bumblebee.input
import bumblebee.output
import bumblebee.engine
@ -40,7 +43,10 @@ class Module(bumblebee.engine.Module):
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, cmd=self.next_tz)
engine.input.register_callback(self, button=bumblebee.input.RIGHT_MOUSE, cmd=self.prev_tz)
self._fmt = self.parameter("format", default_format(self.name))
self._timezones = self.parameter("timezone", tzlocal.get_localzone().zone).split(",")
try:
self._timezones = self.parameter("timezone", tzlocal.get_localzone().zone).split(",")
except:
self._timezones = ""
self._current_tz = 0
l = locale.getdefaultlocale()
@ -54,10 +60,13 @@ class Module(bumblebee.engine.Module):
def get_time(self, widget):
try:
tz = pytz.timezone(self._timezones[self._current_tz].strip())
retval = datetime.datetime.now(tz=tzlocal.get_localzone()).astimezone(tz).strftime(self._fmt)
except pytz.exceptions.UnknownTimeZoneError:
retval = "[Unknown timezone: {}]".format(self._timezones[self._current_tz].strip())
try:
tz = pytz.timezone(self._timezones[self._current_tz].strip())
retval = datetime.datetime.now(tz=tzlocal.get_localzone()).astimezone(tz).strftime(self._fmt)
except pytz.exceptions.UnknownTimeZoneError:
retval = "[Unknown timezone: {}]".format(self._timezones[self._current_tz].strip())
except:
retval = "[n/a]"
enc = locale.getpreferredencoding()
if hasattr(retval, "decode"):

View file

@ -0,0 +1,68 @@
# pylint: disable=C0111,R0903
"""Display HTTP status code
Parameters:
* http_status.label: Prefix label (optional)
* http_status.target: Target to retrieve the HTTP status from
* http_status.expect: Expected HTTP status
"""
from requests import head
import psutil
import bumblebee.input
import bumblebee.output
import bumblebee.engine
class Module(bumblebee.engine.Module):
UNK = "UNK"
def __init__(self, engine, config):
widget = bumblebee.output.Widget(full_text=self.output)
super(Module, self).__init__(engine, config, widget)
self._label = self.parameter("label")
self._target = self.parameter("target")
self._expect = self.parameter("expect", "200")
self._status = self.getStatus()
self._output = self.getOutput()
def labelize(self, s):
if self._label is None:
return s
return "{}: {}".format(self._label, s)
def getStatus(self):
try:
res = head(self._target)
except Exception:
return self.UNK
else:
status = str(res.status_code)
self._status = status
return status
def getOutput(self):
if self._status == self._expect:
return self.labelize(self._status)
else:
reason = " != {}".format(self._expect)
return self.labelize("{}{}".format(self._status, reason))
def output(self, widget):
return self._output
def update(self, widgets):
self.getStatus()
self._output = self.getOutput()
def state(self, widget):
if self._status == self.UNK:
return "warning"
if self._status != self._expect:
return "critical"
return self._output
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,114 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Displays network traffic
* No extra configuration needed
"""
import psutil
import netifaces
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import bumblebee.util
WIDGET_NAME = 'network_traffic'
class Module(bumblebee.engine.Module):
"""Bumblebee main module """
def __init__(self, engine, config):
super(Module, self).__init__(engine, config)
try:
self._bandwidth = BandwidthInfo()
self._bytes_recv = self._bandwidth.bytes_recv()
self._bytes_sent = self._bandwidth.bytes_sent()
except Exception:
""" We do not want do explode anything """
pass
@classmethod
def state(cls, widget):
"""Return the widget state"""
if widget.name == '{}.rx'.format(WIDGET_NAME):
return 'rx'
elif widget.name == '{}.tx'.format(WIDGET_NAME):
return 'tx'
return None
def update(self, widgets):
try:
bytes_recv = self._bandwidth.bytes_recv()
bytes_sent = self._bandwidth.bytes_sent()
download_rate = (bytes_recv - self._bytes_recv)
upload_rate = (bytes_sent - self._bytes_sent)
self.update_widgets(widgets, download_rate, upload_rate)
self._bytes_recv, self._bytes_sent = bytes_recv, bytes_sent
except Exception:
""" We do not want do explode anything """
pass
@classmethod
def update_widgets(cls, widgets, download_rate, upload_rate):
"""Update tx/rx widgets with new rates"""
del widgets[:]
widgets.extend((
TrafficWidget(text=download_rate, direction='rx'),
TrafficWidget(text=upload_rate, direction='tx')
))
class BandwidthInfo(object):
"""Get received/sent bytes from network adapter"""
def bytes_recv(self):
"""Return received bytes"""
return self.bandwidth().bytes_recv
def bytes_sent(self):
"""Return sent bytes"""
return self.bandwidth().bytes_sent
def bandwidth(self):
"""Return bandwidth information"""
io_counters = self.io_counters()
return io_counters[self.default_network_adapter()]
@classmethod
def default_network_adapter(cls):
"""Return default active network adapter"""
gateway = netifaces.gateways()['default']
if not gateway:
raise 'No default gateway found'
return gateway[netifaces.AF_INET][1]
@classmethod
def io_counters(cls):
"""Return IO counters"""
return psutil.net_io_counters(pernic=True)
class TrafficWidget(object):
"""Create a traffic widget with humanized bytes string with proper icon (up/down)"""
def __new__(cls, text, direction):
widget = bumblebee.output.Widget(name='{0}.{1}'.format(WIDGET_NAME, direction))
widget.set('theme.minwidth', '0000000KiB/s')
widget.full_text(cls.humanize(text))
return widget
@staticmethod
def humanize(text):
"""Return humanized bytes"""
humanized_byte_format = bumblebee.util.bytefmt(text)
return '{0}/s'.format(humanized_byte_format)

View file

@ -1,68 +1,68 @@
# pylint: disable=C0111,R0903
"""Displays the pi-hole status (up/down) together with the number of ads that were blocked today
Parameters:
* pihole.address : pi-hole address (e.q: http://192.168.1.3)
* pihole.pwhash : pi-hole webinterface password hash (can be obtained from the /etc/pihole/SetupVars.conf file)
"""
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import requests
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.pihole_status)
)
buttons = {"LEFT_CLICK":bumblebee.input.LEFT_MOUSE}
self._pihole_address = self.parameter("address", "")
self._pihole_pw_hash = self.parameter("pwhash", "")
self._pihole_status = None
self._ads_blocked_today = "-"
self.update_pihole_status()
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.toggle_pihole_status)
def pihole_status(self, widget):
if self._pihole_status is None:
return "pi-hole unknown"
return "pi-hole " + ("up/" + self._ads_blocked_today if self._pihole_status else "down")
def update_pihole_status(self):
try:
data = requests.get(self._pihole_address + "/admin/api.php?summary").json()
self._pihole_status = True if data["status"] == "enabled" else False
self._ads_blocked_today = data["ads_blocked_today"]
except:
self._pihole_status = None
def toggle_pihole_status(self, widget):
if self._pihole_status is not None:
try:
req = None
if self._pihole_status:
req = requests.get(self._pihole_address + "/admin/api.php?disable&auth=" + self._pihole_pw_hash)
else:
req = requests.get(self._pihole_address + "/admin/api.php?enable&auth=" + self._pihole_pw_hash)
if req is not None:
if req.status_code == 200:
status = req.json()["status"]
self._pihole_status = False if status == "disabled" else True
except:
pass
def update(self, widgets):
self.update_pihole_status()
def state(self, widget):
if self._pihole_status is None:
return []
elif self._pihole_status:
return ["enabled"]
return ["disabled", "warning"]
# pylint: disable=C0111,R0903
"""Displays the pi-hole status (up/down) together with the number of ads that were blocked today
Parameters:
* pihole.address : pi-hole address (e.q: http://192.168.1.3)
* pihole.pwhash : pi-hole webinterface password hash (can be obtained from the /etc/pihole/SetupVars.conf file)
"""
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import requests
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.pihole_status)
)
buttons = {"LEFT_CLICK":bumblebee.input.LEFT_MOUSE}
self._pihole_address = self.parameter("address", "")
self._pihole_pw_hash = self.parameter("pwhash", "")
self._pihole_status = None
self._ads_blocked_today = "-"
self.update_pihole_status()
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.toggle_pihole_status)
def pihole_status(self, widget):
if self._pihole_status is None:
return "pi-hole unknown"
return "pi-hole " + ("up/" + self._ads_blocked_today if self._pihole_status else "down")
def update_pihole_status(self):
try:
data = requests.get(self._pihole_address + "/admin/api.php?summary").json()
self._pihole_status = True if data["status"] == "enabled" else False
self._ads_blocked_today = data["ads_blocked_today"]
except:
self._pihole_status = None
def toggle_pihole_status(self, widget):
if self._pihole_status is not None:
try:
req = None
if self._pihole_status:
req = requests.get(self._pihole_address + "/admin/api.php?disable&auth=" + self._pihole_pw_hash)
else:
req = requests.get(self._pihole_address + "/admin/api.php?enable&auth=" + self._pihole_pw_hash)
if req is not None:
if req.status_code == 200:
status = req.json()["status"]
self._pihole_status = False if status == "disabled" else True
except:
pass
def update(self, widgets):
self.update_pihole_status()
def state(self, widget):
if self._pihole_status is None:
return []
elif self._pihole_status:
return ["enabled"]
return ["disabled", "warning"]

View file

@ -4,21 +4,33 @@
Requires the following executable:
* redshift
Parameters:
* redshift.location : location provider, either of "geoclue2" (default), \
"ipinfo" (requires the requests package), or "manual"
* redshift.lat : latitude if location is set to "manual"
* redshift.lon : longitude if location is set to "manual"
"""
import threading
try:
import requests
except ImportError:
pass
import bumblebee.input
import bumblebee.output
import bumblebee.engine
def is_terminated():
for thread in threading.enumerate():
if thread.name == "MainThread" and not thread.is_alive():
return True
return False
def get_redshift_value(widget):
def get_redshift_value(widget, location, lat, lon):
while True:
if is_terminated():
return
@ -31,8 +43,14 @@ def get_redshift_value(widget):
break
widget.get("condition").release()
command = ["redshift", "-p", "-l"]
if location == "manual":
command.append(lat + ":" + lon)
else:
command.append("geoclue2")
try:
res = bumblebee.util.execute("redshift -p")
res = bumblebee.util.execute(" ".join(command))
except Exception:
res = ""
widget.set("temp", "n/a")
@ -52,14 +70,40 @@ def get_redshift_value(widget):
widget.set("state", "transition")
widget.set("transition", " ".join(line.split(" ")[2:]))
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
widget = bumblebee.output.Widget(full_text=self.text)
super(Module, self).__init__(engine, config, widget)
self._location = self.parameter("location", "geoclue2")
self._lat = self.parameter("lat", None)
self._lon = self.parameter("lon", None)
# Even if location method is set to manual, if we have no lat or lon,
# fall back to the geoclue2 method.
if self._location == "manual" and (self._lat is None
or self._lon is None):
self._location == "geoclue2"
if self._location == "ipinfo":
try:
location_url = "http://ipinfo.io/json"
location = requests.get(location_url).json()
self._lat, self._lon = location["loc"].split(",")
self._lat = str(float(self._lat))
self._lon = str(float(self._lon))
self._location = "manual"
except Exception:
# Fall back to geoclue2.
self._location = "geoclue2"
self._text = ""
self._condition = threading.Condition()
widget.set("condition", self._condition)
self._thread = threading.Thread(target=get_redshift_value, args=(widget,))
self._thread = threading.Thread(target=get_redshift_value,
args=(widget, self._location,
self._lat, self._lon))
self._thread.start()
self._condition.acquire()
self._condition.notify()

311
bumblebee/modules/rss.py Normal file
View file

@ -0,0 +1,311 @@
# pylint: disable=C0111,R0903
"""RSS news ticker
Fetches rss news items and shows these as a news ticker.
Left-clicking will open the full story in a browser.
New stories are highlighted.
Parameters:
* rss.feeds : Space-separated list of RSS URLs
* rss.length : Maximum length of the module, default is 60
"""
try:
import feedparser
DEPENDENCIES_OK = True
except ImportError:
DEPENDENCIES_OK = False
import webbrowser
import time
import os
import tempfile
import logging
import random
import re
import json
import bumblebee.input
import bumblebee.output
import bumblebee.engine
# pylint: disable=too-many-instance-attributes
class Module(bumblebee.engine.Module):
REFRESH_DELAY = 600
SCROLL_SPEED = 3
LAYOUT_STYLES_ITEMS = [[1,1,1],[3,3,2],[2,3,3],[3,2,3]]
HISTORY_FILENAME = ".config/i3/rss.hist"
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.ticker_update if DEPENDENCIES_OK else self._show_error)
)
# Use BBC newsfeed as demo:
self._feeds = self.parameter('feeds', 'https://www.espn.com/espn/rss/news').split(" ")
self._feeds_to_update = []
self._max_title_length = int(self.parameter("length", 60))
self._items = []
self._current_item = None
self._ticker_offset = 0
self._pre_delay = 0
self._post_delay = 0
self._state = []
self._newspaper_filename = tempfile.mktemp('.html')
self._last_refresh = 0
self._last_update = 0
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, cmd=self._open)
engine.input.register_callback(self, button=bumblebee.input.RIGHT_MOUSE, cmd=self._create_newspaper)
self._history = {'ticker': {}, 'newspaper': {}}
self._load_history()
def _load_history(self):
if os.path.isfile(self.HISTORY_FILENAME):
self._history = json.loads(open(self.HISTORY_FILENAME, "r").read())
def _update_history(self, group):
sources = set([i['source'] for i in self._items])
self._history[group] = dict([[s, [i['title'] for i in self._items if i['source'] == s]] for s in sources])
def _save_history(self):
if not os.path.exists(os.path.dirname(self.HISTORY_FILENAME)):
os.makedirs(os.path.dirname(self.HISTORY_FILENAME))
open(self.HISTORY_FILENAME, "w").write(json.dumps(self._history))
def _check_history(self, items, group):
for i in items:
i['new'] = not (i['source'] in self._history[group] and i['title'] in self._history[group][i['source']])
def _open(self, _):
if self._current_item:
webbrowser.open(self._current_item['link'])
def _check_for_image(self, entry):
image = next(iter([l['href'] for l in entry['links'] if l['rel'] == 'enclosure']), None)
if not image and 'media_content' in entry:
try:
media = sorted(entry['media_content'], key=lambda i: i['height'] if 'height' in i else 0, reverse=True)
image = next(iter([i['url'] for i in media if i['medium'] == 'image']), None)
except Exception:
pass
if not image:
match = re.search(r'<img[^>]*src\s*=["\']*([^\s^>^"^\']*)["\']*', entry['summary'])
if match:
image = match.group(1)
return image if image else ''
def _remove_tags(self, txt):
return re.sub('<[^>]*>', '', txt)
def _create_item(self, entry, url, feed):
return {'title': self._remove_tags(entry['title'].replace('\n', ' ')),
'link': entry['link'],
'new': True,
'source': url,
'summary': self._remove_tags(entry['summary']),
'feed': feed,
'image': self._check_for_image(entry),
'published': time.mktime(entry.published_parsed) if hasattr(entry, 'published_parsed') else 0}
def _update_items_from_feed(self, url):
parser = feedparser.parse(url)
new_items = [self._create_item(entry, url, parser['feed']['title']) for entry in parser['entries']]
# Check history
self._check_history(new_items, 'ticker')
# Remove the previous items
self._items = [i for i in self._items if i['source'] != url]
# Add the new items
self._items.extend(new_items)
# Sort the items on publish date
self._items.sort(key=lambda i: i['published'], reverse=True)
def _check_for_refresh(self):
if self._feeds_to_update:
# Update one feed at a time to not overload this update cycle
url = self._feeds_to_update.pop()
self._update_items_from_feed(url)
if not self._feeds_to_update:
self._update_history('ticker')
self._save_history()
if not self._current_item:
self._next_item()
elif time.time()-self._last_refresh >= self.REFRESH_DELAY:
# Populate the list with feeds to update
self._feeds_to_update = self._feeds[:]
# Update the refresh time
self._last_refresh = time.time()
def _next_item(self):
self._ticker_offset = 0
self._pre_delay = 2
self._post_delay = 4
if not self._items:
return
# Index of the current element
idx = self._items.index(self._current_item) if self._current_item in self._items else - 1
# First show new items, else show next
new_items = [i for i in self._items if i['new']]
self._current_item = next(iter(new_items), self._items[(idx+1) % len(self._items)])
def _check_scroll_done(self):
# Check if the complete title has been shown
if self._ticker_offset + self._max_title_length > len(self._current_item['title']):
# Do not immediately show next item after scroll
self._post_delay -= 1
if self._post_delay == 0:
self._current_item['new'] = False
# Mark the previous item as 'old'
self._next_item()
else:
# Increase scroll position
self._ticker_offset += self.SCROLL_SPEED
def _show_error(self, _):
return "Please install feedparser first"
def ticker_update(self, _):
# Only update the ticker once a second
now = time.time()
if now-self._last_update < 1:
return self._response
self._last_update = now
self._check_for_refresh()
# If no items were retrieved, return an empty string
if not self._current_item:
return " "*self._max_title_length
# Prepare a substring of the item title
self._response = self._current_item['title'][self._ticker_offset:self._ticker_offset+self._max_title_length]
# Add spaces if too short
self._response = self._response.ljust(self._max_title_length)
# Do not immediately scroll
if self._pre_delay > 0:
# Change state during pre_delay for new items
if self._current_item['new']:
self._state = ['warning']
self._pre_delay -= 1
return self._response
self._state = []
self._check_scroll_done()
return self._response
def update(self, widgets):
pass
def state(self, _):
return self._state
def _create_news_element(self, item, overlay_title):
try:
timestr = "" if item['published'] == 0 else str(time.ctime(item['published']))
except Exception as exc:
logging.error(str(exc))
raise e
element = "<div class='item' onclick=window.open('"+item['link']+"')>"
element += "<div class='titlecontainer'>"
element += " <img "+("" if item['image'] else "class='noimg' ")+"src='"+item['image']+"'>"
element += " <div class='title"+(" overlay" if overlay_title else "")+"'>"+("<span class='star'>&#x2605;</span>" if item['new'] else "")+item['title']+"</div>"
element += "</div>"
element += "<div class='summary'>"+item['summary']+"</div>"
element += "<div class='info'><span class='author'>"+item['feed']+"</span><span class='published'>"+timestr+"</span></div>"
element += "</div>"
return element
def _create_news_section(self, newspaper_items):
style = random.randint(0, 3)
section = "<table><tr class='style"+str(style)+"'>"
for i in range(0, 3):
section += "<td><div class='itemcontainer'>"
for _ in range(0, self.LAYOUT_STYLES_ITEMS[style][i]):
if newspaper_items:
section += self._create_news_element(newspaper_items[0], self.LAYOUT_STYLES_ITEMS[style][i] != 3)
del newspaper_items[0]
section += "</div></td>"
section += "</tr></table>"
return section
def _create_newspaper(self, _):
content = ""
newspaper_items = self._items[:]
self._check_history(newspaper_items, 'newspaper')
# Make sure new items are always listed first, independent of publish date
newspaper_items.sort(key=lambda i: i['published']+(10000000 if i['new'] else 0), reverse=True)
while newspaper_items:
content += self._create_news_section(newspaper_items)
open(self._newspaper_filename, "w").write(HTML_TEMPLATE.replace("[[CONTENT]]", content))
webbrowser.open("file://"+self._newspaper_filename)
self._update_history('newspaper')
self._save_history()
HTML_TEMPLATE = """<!DOCTYPE html>
<html>
<head>
<script>
window.onload = function() {
var images = document.getElementsByTagName('img');
// Remove very small images
for(var i = 0; i < images.length; i++) {
if (images[i].naturalWidth<50 || images[i].naturalHeight<50) {
images[i].src = ''
images[i].className+=' noimg'
}
}
}
</script>
</head>
<style>
body {background: #eee; font-family: Helvetica neue;}
td {background: #fff; height: 100%;}
tr.style0 td {width: 33%;}
tr.style1 td {width: 20%;}
tr.style1 td:last-child {width: 60%;}
tr.style2 td {width: 20%;}
tr.style2 td:first-child {width: 60%;}
tr.style3 td {width: 20%;}
tr.style3 td:nth-child(2) {width: 60%;}
img {width: 100%; display: block; }
img.noimg {min-height:250px; background: #1299c8;}
#content {width: 1500px; margin: auto; background: #eee; padding: 1px;}
#newspapertitle {text-align: center; font-size: 60px; font-family: Arial Black; background: #1299c8; font-style: Italic; padding: 10px; color: #fff; }
.star {color: #ffa515; font-size: 24px;}
.section {display: flex;}
.column {display: flex;}
.itemcontainer {width: 100%; height: 100%; position: relative; display: inline-table;}
.item {cursor: pointer; }
.titlecontainer {position: relative;}
.title.overlay {font-family: Arial; position: absolute; bottom: 10px; color: #fff; font-weight: bold; text-align: right; max-width: 75%; right: 10px; font-size: 23px; text-shadow: 1px 0 0 #000, 0 -1px 0 #000, 0 1px 0 #000, -1px 0 0 #000;}
.title:not(.overlay) {font-weight: bold; padding: 0px 10px;}
.summary {color: #444; padding: 10px 10px 0px 10px; font-family: Times new roman; font-size: 18px; flex: 1;max-height: 105px; overflow: hidden;}
.info {color: #aaa; font-family: arial; font-size: 13px; padding: 10px;}
.published {float: right;}
</style>
<body>
<div id='content'>
<div id='newspapertitle'>Bumblebee Daily</div>
[[CONTENT]]
</div>
</body>
</html>"""
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

111
bumblebee/modules/sun.py Normal file
View file

@ -0,0 +1,111 @@
# pylint: disable=C0111,R0903
"""Displays sunrise and sunset times
Parameters:
* cpu.lat : Latitude of your location
* cpu.lon : Longitude of your location
"""
try:
from suntime import Sun, SunTimeException
except ImportError:
pass
try:
import requests
except ImportError:
pass
try:
from dateutil.tz import tzlocal
except ImportError:
pass
import datetime
import bumblebee.input
import bumblebee.output
import bumblebee.engine
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(
engine, config,
bumblebee.output.Widget(full_text=self.suntimes)
)
self.interval(3600)
self._lat = self.parameter("lat", None)
self._lon = self.parameter("lon", None)
try:
if not self._lat or not self._lon:
location_url = "http://ipinfo.io/json"
location = requests.get(location_url).json()
self._lat, self._lon = location["loc"].split(",")
self._lat = float(self._lat)
self._lon = float(self._lon)
except Exception:
pass
self.update(None)
def suntimes(self, _):
if self._sunset and self._sunrise:
if self._isup:
return u"\u21A7{} \u21A5{}".format(
self._sunset.strftime('%H:%M'),
self._sunrise.strftime('%H:%M'))
return u"\u21A5{} \u21A7{}".format(self._sunrise.strftime('%H:%M'),
self._sunset.strftime('%H:%M'))
return "?"
def _calculate_times(self):
self._isup = False
try:
sun = Sun(self._lat, self._lon)
except Exception:
self._sunrise = None
self._sunset = None
return
order_matters = True
try:
self._sunrise = sun.get_local_sunrise_time()
except SunTimeException:
self._sunrise = "no sunrise"
order_matters = False
try:
self._sunset = sun.get_local_sunset_time()
except SunTimeException:
self._sunset = "no sunset"
order_matters = False
if not order_matters:
return
now = datetime.datetime.now(tz=tzlocal())
if now > self._sunset:
tomorrow = (now + datetime.timedelta(days=1)).date()
try:
self._sunrise = sun.get_local_sunrise_time(tomorrow)
self._sunset = sun.get_local_sunset_time(tomorrow)
except SunTimeException:
self._sunrise = "no sunrise"
self._sunset = "no sunset"
elif now > self._sunrise:
tomorrow = (now + datetime.timedelta(days=1)).date()
try:
self._sunrise = sun.get_local_sunrise_time(tomorrow)
except SunTimeException:
self._sunrise = "no sunrise"
return
self._isup = True
def update(self, widgets):
if not self._lat or not self._lon:
self._sunrise = None
self._sunset = None
self._calculate_times()
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0111,R0903
""" system module
adds the possibility to
* shutdown
* reboot
the system.
Per default a confirmation dialog is shown before the actual action is performed.
Paramters:
* system.confirm: show confirmation dialog before performing any action (default: true)
"""
import logging
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import bumblebee.popup_v2
import functools
try:
import Tkinter as tk
import tkMessageBox as tkmessagebox
except ImportError:
# python 3
try:
import tkinter as tk
from tkinter import messagebox as tkmessagebox
except ImportError:
logging.warning("failed to import tkinter - bumblebee popups won't work!")
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.text)
)
self._confirm = True
if self.parameter("confirm", "true") == "false":
self._confirm = False
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.popup)
def update(self, widgets):
pass
def text(self, widget):
return ""
def _on_command(self, header, text, command):
do_it = True
if self._confirm:
root = tk.Tk()
root.withdraw()
root.focus_set()
do_it = tkmessagebox.askyesno(header, text)
root.destroy()
if do_it:
bumblebee.util.execute(command)
def popup(self, widget):
menu = bumblebee.popup_v2.PopupMenu()
menu.add_menuitem("shutdown", callback=functools.partial(self._on_command, "Shutdown", "Shutdown?", "shutdown -h now"))
menu.add_menuitem("reboot", callback=functools.partial(self._on_command, "Reboot", "Reboot?", "reboot"))
menu.add_menuitem("log out", callback=functools.partial(self._on_command, "Log out", "Log out?", "i3exit logout"))
# don't ask for these
menu.add_menuitem("switch user", callback=functools.partial(bumblebee.util.execute, "i3exit switch_user"))
menu.add_menuitem("lock", callback=functools.partial(bumblebee.util.execute, "i3exit lock"))
menu.add_menuitem("suspend", callback=functools.partial(bumblebee.util.execute, "i3exit suspend"))
menu.add_menuitem("hibernate", callback=functools.partial(bumblebee.util.execute, "i3exit hibernate"))
menu.show(widget)
def state(self, widget):
return []

View file

@ -103,7 +103,8 @@ class Module(bumblebee.engine.Module):
widget = self.create_widget(widgets, name, attributes={"theme.minwidth": "1000.00MB"})
prev = self._prev.get(name, 0)
speed = bumblebee.util.bytefmt((int(data[direction]) - int(prev))/timediff)
widget.full_text(speed)
txtspeed ='{0}/s'.format(speed)
widget.full_text(txtspeed)
self._prev[name] = data[direction]
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

39
bumblebee/modules/twmn.py Normal file
View file

@ -0,0 +1,39 @@
#pylint: disable=C0111,R0903
"""Toggle twmn notifications."""
import bumblebee.input
import bumblebee.output
import bumblebee.engine
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text="")
)
self._paused = False
# Make sure that twmn is currently not paused
try:
bumblebee.util.execute("killall -SIGUSR2 twmnd")
except:
pass
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.toggle_status
)
def toggle_status(self, event):
self._paused = not self._paused
try:
if self._paused:
bumblebee.util.execute("systemctl --user start twmnd")
else:
bumblebee.util.execute("systemctl --user stop twmnd")
except:
self._paused = not self._paused # toggling failed
def state(self, widget):
if self._paused:
return ["muted"]
return ["unmuted"]

View file

@ -0,0 +1,81 @@
# pylint: disable=C0111,R0903
"""Copy passwords from a password store into the clipboard (currently supports only "pass")
Many thanks to [@bbernhard](https://github.com/bbernhard) for the idea!
Parameters:
* vault.duration: Duration until password is cleared from clipboard (defaults to 30)
* vault.location: Location of the password store (defaults to ~/.password-store)
* vault.offx: x-axis offset of popup menu (defaults to 0)
* vault.offy: y-axis offset of popup menu (defaults to 0)
"""
# TODO:
# - support multiple backends by abstracting the menu structure into a tree
# - build the menu and the actions based on that abstracted tree
#
import os
import time
import threading
import bumblebee.util
import bumblebee.popup_v2
import bumblebee.input
import bumblebee.output
import bumblebee.engine
def build_menu(parent, current_directory, callback):
with os.scandir(current_directory) as it:
for entry in it:
if entry.name.startswith("."): continue
if entry.is_file():
name = entry.name[:entry.name.rfind(".")]
parent.add_menuitem(name, callback=lambda : callback(os.path.join(current_directory, name)))
else:
submenu = bumblebee.popup_v2.PopupMenu(parent, leave=False)
build_menu(submenu, os.path.join(current_directory, entry.name), callback)
parent.add_cascade(entry.name, submenu)
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.text)
)
self._duration = int(self.parameter("duration", 30))
self._offx = int(self.parameter("offx", 0))
self._offy = int(self.parameter("offy", 0))
self._path = os.path.expanduser(self.parameter("location", "~/.password-store/"))
self._reset()
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.popup)
def popup(self, widget):
menu = bumblebee.popup_v2.PopupMenu(leave=False)
build_menu(menu, self._path, self._callback)
menu.show(widget, offset_x=self._offx, offset_y=self._offy)
def _reset(self):
self._timer = None
self._text = "<click-for-password>"
def _callback(self, secret_name):
secret_name = secret_name.replace(self._path, "") # remove common path
if self._timer:
self._timer.cancel()
# bumblebee.util.execute hangs for some reason
os.system("PASSWORD_STORE_CLIP_TIME={} pass -c {} > /dev/null 2>&1".format(self._duration, secret_name))
self._timer = threading.Timer(self._duration, self._reset)
self._timer.start()
self._start = int(time.time())
self._text = secret_name
def text(self, widget):
if self._timer:
return "{} ({}s)".format(self._text, self._duration - (int(time.time()) - self._start))
return self._text
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

View file

@ -1,95 +1,95 @@
# pylint: disable=C0111,R0903
""" Displays the VPN profile that is currently in use.
Left click opens a popup menu that lists all available VPN profiles and allows to establish
a VPN connection using that profile.
Prerequisites:
* nmcli needs to be installed and configured properly.
To quickly test, whether nmcli is working correctly, type "nmcli -g NAME,TYPE,DEVICE con" which
lists all the connection profiles that are configured. Make sure that your VPN profile is in that list!
e.g: to import a openvpn profile via nmcli:
sudo nmcli connection import type openvpn file </path/to/your/openvpn/profile.ovpn>
"""
import logging
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import functools
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.vpn_status)
)
self._connected_vpn_profile = None
self._selected_vpn_profile = None
res = bumblebee.util.execute("nmcli -g NAME,TYPE c")
lines = res.splitlines()
self._vpn_profiles = []
for line in lines:
info = line.split(':')
try:
if info[1] == "vpn":
self._vpn_profiles.append(info[0])
except:
pass
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.popup)
def update(self, widgets):
try:
res = bumblebee.util.execute("nmcli -g NAME,TYPE,DEVICE con")
lines = res.splitlines()
self._connected_vpn_profile = None
for line in lines:
info = line.split(':')
if info[1] == "vpn" and info[2] != "":
self._connected_vpn_profile = info[0]
except Exception as e:
logging.exception("Couldn't get VPN status")
self._connected_vpn_profile = None
def vpn_status(self, widget):
if self._connected_vpn_profile is None:
return "off"
return self._connected_vpn_profile
def _on_vpn_disconnect(self):
try:
bumblebee.util.execute("nmcli c down " + self._connected_vpn_profile)
self._connected_vpn_profile = None
except Exception as e:
logging.exception("Couldn't disconnect VPN connection")
def _on_vpn_connect(self, name):
self._selected_vpn_profile = name
try:
bumblebee.util.execute("nmcli c up " + self._selected_vpn_profile)
self._connected_vpn_profile = name
except Exception as e:
logging.exception("Couldn't establish VPN connection")
self._connected_vpn_profile = None
def popup(self, widget):
menu = bumblebee.popup_v2.PopupMenu()
if self._connected_vpn_profile is not None:
menu.add_menuitem("Disconnect", callback=self._on_vpn_disconnect)
for vpn_profile in self._vpn_profiles:
if self._connected_vpn_profile is not None and self._connected_vpn_profile == vpn_profile:
continue
menu.add_menuitem(vpn_profile, callback=functools.partial(self._on_vpn_connect, vpn_profile))
menu.show(widget)
def state(self, widget):
return []
# pylint: disable=C0111,R0903
""" Displays the VPN profile that is currently in use.
Left click opens a popup menu that lists all available VPN profiles and allows to establish
a VPN connection using that profile.
Prerequisites:
* nmcli needs to be installed and configured properly.
To quickly test, whether nmcli is working correctly, type "nmcli -g NAME,TYPE,DEVICE con" which
lists all the connection profiles that are configured. Make sure that your VPN profile is in that list!
e.g: to import a openvpn profile via nmcli:
sudo nmcli connection import type openvpn file </path/to/your/openvpn/profile.ovpn>
"""
import logging
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import functools
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(engine, config,
bumblebee.output.Widget(full_text=self.vpn_status)
)
self._connected_vpn_profile = None
self._selected_vpn_profile = None
res = bumblebee.util.execute("nmcli -g NAME,TYPE c")
lines = res.splitlines()
self._vpn_profiles = []
for line in lines:
info = line.split(':')
try:
if info[1] == "vpn":
self._vpn_profiles.append(info[0])
except:
pass
engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE,
cmd=self.popup)
def update(self, widgets):
try:
res = bumblebee.util.execute("nmcli -g NAME,TYPE,DEVICE con")
lines = res.splitlines()
self._connected_vpn_profile = None
for line in lines:
info = line.split(':')
if info[1] == "vpn" and info[2] != "":
self._connected_vpn_profile = info[0]
except Exception as e:
logging.exception("Couldn't get VPN status")
self._connected_vpn_profile = None
def vpn_status(self, widget):
if self._connected_vpn_profile is None:
return "off"
return self._connected_vpn_profile
def _on_vpn_disconnect(self):
try:
bumblebee.util.execute("nmcli c down " + self._connected_vpn_profile)
self._connected_vpn_profile = None
except Exception as e:
logging.exception("Couldn't disconnect VPN connection")
def _on_vpn_connect(self, name):
self._selected_vpn_profile = name
try:
bumblebee.util.execute("nmcli c up " + self._selected_vpn_profile)
self._connected_vpn_profile = name
except Exception as e:
logging.exception("Couldn't establish VPN connection")
self._connected_vpn_profile = None
def popup(self, widget):
menu = bumblebee.popup_v2.PopupMenu()
if self._connected_vpn_profile is not None:
menu.add_menuitem("Disconnect", callback=self._on_vpn_disconnect)
for vpn_profile in self._vpn_profiles:
if self._connected_vpn_profile is not None and self._connected_vpn_profile == vpn_profile:
continue
menu.add_menuitem(vpn_profile, callback=functools.partial(self._on_vpn_connect, vpn_profile))
menu.show(widget)
def state(self, widget):
return []

View file

@ -118,8 +118,6 @@ class Module(bumblebee.engine.Module):
self._temperature = int(weather['main']['temp'])
self._weather = weather['weather'][0]['main'].lower()
self._valid = True
except RequestException:
self._valid = False
except Exception:
self._valid = False

View file

@ -13,13 +13,27 @@ except ImportError:
import functools
class PopupMenu(object):
def __init__(self):
self._root = tk.Tk()
self._root.withdraw()
self._menu = tk.Menu(self._root)
self._menu.bind("<FocusOut>", self._on_focus_out)
def __init__(self, parent=None, leave=True):
if not parent:
self._root = tk.Tk()
self._root.withdraw()
self._menu = tk.Menu(self._root, tearoff=0)
self._menu.bind("<FocusOut>", self._on_focus_out)
else:
self._root = parent.root()
self._root.withdraw()
self._menu = tk.Menu(self._root, tearoff=0)
self._menu.bind("<FocusOut>", self._on_focus_out)
if leave:
self._menu.bind("<Leave>", self._on_focus_out)
def root(self):
return self._root
def menu(self):
return self._menu
def _on_focus_out(self, event=None):
self._root.destroy()
@ -28,13 +42,15 @@ class PopupMenu(object):
self._root.destroy()
callback()
def add_cascade(self, menuitem, submenu):
self._menu.add_cascade(label=menuitem, menu=submenu.menu())
def add_menuitem(self, menuitem, callback):
self._menu.add_command(label=menuitem, command=functools.partial(self._on_click, callback))
def show(self, event):
def show(self, event, offset_x=0, offset_y=0):
try:
self._menu.tk_popup(event['x'], event['y'])
self._menu.tk_popup(event['x'] + offset_x, event['y'] + offset_y)
finally:
self._menu.grab_release()
self._root.mainloop()

View file

@ -105,6 +105,9 @@ class Theme(object):
if icon is None:
return self._get(widget, "prefix", None)
def get(self, widget, attribute, default_value=""):
return self._get(widget, attribute, default_value)
def padding(self, widget):
"""Return padding for widget"""
return self._get(widget, "padding", "")
@ -223,6 +226,9 @@ class Theme(object):
if not self._widget:
self._widget = widget
if self._widget.get("theme.exclude", "") == name:
return None
if self._widget != widget:
self._prevbg = self.bg(self._widget)
self._widget = widget
@ -238,7 +244,8 @@ class Theme(object):
states = widget.state()
if name not in states:
for state in states:
state_themes.append(self._get(widget, state, {}))
if state:
state_themes.append(self._get(widget, state, {}))
value = self._defaults.get(name, default)
value = widget.get("theme.{}".format(name), value)