bumblebee-status/bumblebee/modules/spaceapi.py
Tobias Manske 8eaded9405
[module/spaceapi] fix freeze on unreachable API
The statusbar was freezing for several minutes if it could not reach
the API endpoint. This is because of a missing timeout statement in
the call to python.requests's get function.

- Added spaceapi.timeout parameter
- Added timeout to requests

Signed-off-by: Tobias Manske <tobias.manske@mailbox.org>
2018-10-26 00:17:13 +02:00

81 lines
2.5 KiB
Python

# pylint: disable=C0111,R0903
"""Displays the state of a spaceapi endpoint
Requires the following libraries:
* requests
Parameters:
* spaceapi.url: String representation of the api endpoint
* spaceapi.name: String overwriting the space name
* spaceapi.prefix: Prefix for the space string
* spaceapi.interval: time between updates in minutes
* spaceapi.timeout: Maximum time in seconds to wait for a response from API
endpoint
"""
import bumblebee.input
import bumblebee.output
import bumblebee.engine
import requests
class Module(bumblebee.engine.Module):
def __init__(self, engine, config):
super(Module, self).__init__(
engine, config, bumblebee.output.Widget(full_text=self.getState)
)
# Represents the state of the hackerspace
self._open = False
# Set to true if there was an error calling the spaceapi
self._error = False
# The URL representing the api endpoint
self._url = self.parameter("url",
default="http://club.entropia.de/spaceapi")
# Space Name, can be set manually in case of multiple widgets,
# so you're able to distinguish
self._name = self.parameter("name", default="")
# The timeout prevents the statusbar from blocking when the destination
# can't be reached.
self._timeout = self.parameter("timeout", default=2)
# Only execute every 5 minutes by default
self.interval(self.parameter("interval", default=5))
def getState(self, widget):
text = self.parameter("prefix", default="")
text += self._name + ": "
if self._error:
text += "ERROR"
elif self._open:
text += "Open"
else:
text += "Closed"
return text
def state(self, widget):
if self._error:
return ["critical"]
elif self._open:
return ["warning"]
else:
return []
def update(self, widgets):
try:
with requests.get(self._url, timeout=self.timeout) as u:
json = u.json()
self._open = json["state"]["open"]
self._name = self.parameter("name", default=json["space"])
self._error = False
except Exception:
# Displays ERROR status
self._error = True
# Author: Tobias Manske <tobias.manske@mailbox.org>
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4