doorstatus/source/server/apistatusd.py

475 lines
17 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# file: apistatusd.py
# date: 26.07.2019
# mail: berhsi@web.de
# Status server, listening for door status updates. The IPv4 address and port
# to listen on are configurable, by default localhost:10001 is used. The
# connection is secured by TLS and client side authentication.
try:
import json
import logging
import os
import socket
import ssl
import sys
import requests
import threading
from time import time, localtime, strftime, sleep
import configparser
except Exception as e:
print('Import error: {}'.format(e))
def certs_readable(config):
'''
checks at start, if the needed certificates defined (no nullstring) and
readable.
param 1: dictionary
return: boolean
'''
for i in (config['server']['key'], config['server']['cert'],
config['client']['cert']):
if i == '' or os.access(i, os.R_OK) is False:
logging.error('Cannot read {}'.format(i))
return False
return True
def print_config(config):
'''
Logs the config with level debug.
'''
logging.debug('Using config:')
for section in config.sections():
logging.debug('Section {}'.format(section))
for i in config[section]:
if i == 'token':
logging.debug(' {}: {}'.format(i, 'aaaaa-bbbbb-ccccc-ddddd-eeeee'))
else:
logging.debug(' {}: {}'.format(i, config[section][i]))
def create_ssl_context(config):
'''
Creates the ssl context.
return: context object or None
'''
requirement = ssl.CERT_REQUIRED
match config['client']['required'].lower():
case 'false':
requirement = ssl.CERT_NONE
case 'may':
requirement = ssl.CERT_OPTIONAL
try:
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.verify_mode = requirement
context.load_cert_chain(certfile=config['server']['cert'],
keyfile=config['server']['key'])
context.load_verify_locations(cafile=config['client']['cert'])
context.set_ciphers("ECDHE-RSA-AES256-GCM-SHA384 ECDHE-RSA-AES128-GCM-SHA256")
context.set_ecdh_curve("secp384r1")
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_2
# ensure, compression is disabled (disabled by default anyway at the moment)
context.options |= ssl.OP_NO_COMPRESSION
context.options |= ssl.OP_CIPHER_SERVER_PREFERENCE
context.options |= ssl.OP_SINGLE_ECDH_USE
logging.debug('SSL context created')
return context
except Exception as e:
logging.error('Failed to create SSL context')
logging.error('Error: {}'.format(e))
return None
def print_ciphers(cipherlist):
'''
Prints the list of allowed ciphers.
param1: dictionary
return: boolean
'''
logging.debug('Available ciphers')
for i in cipherlist:
for j in i.keys():
if j in ('name', 'protocol'):
logging.debug('{}: {}'.format(j, i[j]))
def print_context(ctx):
'''
Prints the ssl settings for the given ssl context.
param1: context object
'''
logging.debug('----------- context ----------------')
logging.debug('Minimum version ssl: {}'.format(ctx.minimum_version))
logging.debug('Maximum version ssl: {}'.format(ctx.maximum_version))
logging.debug('SSL options enabled: {}'.format(ctx.options))
logging.debug('Protocol: {}'.format(ssl.get_protocol_name(ctx.protocol)))
logging.debug('Verify flags certificates: {}'.format(ctx.verify_flags))
logging.debug('Verify mode: {}'.format(ctx.verify_mode))
print_ciphers(ctx.get_ciphers())
logging.debug('------------------------------------')
def display_peercert(cert):
'''
Displays the values of a given certificate.
param1: dictionary or none
'''
if cert == None:
logging.debug('Peer does not offer a certificate')
elif len(cert) == 0:
logging.debug('Peer certificate was not valid')
else:
logging.debug('--- Peer cert ---')
for key in cert.keys():
if key in ('subject', 'serialNumber', 'notBefore', 'notAfter'):
if key == 'subject':
logging.debug(f'Subject: {cert[key]}')
for i in cert[key]:
logging.debug(f'{i[0][0]}: {i[0][1]}')
else:
logging.debug(f'{key}: {cert[key]}')
logging.debug('-----------------')
def receive_buffer_is_valid(raw_data):
'''
Checks validity of the received buffer contents.
param 1: byte object
return: boolean
'''
try:
if raw_data.decode('utf-8', 'strict') in ('0', '1'):
logging.debug('Argument is valid: {}'.format(raw_data))
return True
except UnicodeDecodeError as err:
logging.error('Argument is not valid unicode')
logging.error(err)
return False
except Exception as err:
logging.error('Exception occurred')
logging.error(err)
return False
logging.debug('Argument is not valid: {}'.format(raw_data))
return False
def read_api(api):
'''
Reads the Space API JSON into a dict. Returns the dict on success and
False on failure.
param 1: string
return: dict or boolean
'''
logging.debug('Open API file: {}'.format(api))
# return early if the API JSON cannot be read
if not os.access(api, os.R_OK):
logging.error('Failed to read API file')
return False
logging.debug('API is readable')
with open(api, 'r') as api_file:
logging.debug('API file successfully readable opened')
try:
api_json_data = json.load(api_file)
logging.debug('API file successfully read')
except Exception as e:
logging.error('Failed to read API file: {}'.format(e))
return False
return api_json_data
def change_status(status, timestamp, api_template, filename):
'''
Write the new status together with a timestamp into the Space API JSON.
param 1: byte object
param 2: string
return: boolean
'''
logging.debug('Change status API')
if os.access(filename, os.W_OK):
logging.debug('API file is writable')
with open(filename, 'w') as api_file:
logging.debug('API file successfull writable opened')
api_template["state"]["open"] = status
api_template["state"]["lastchange"] = timestamp
try:
# json.dump(data, api_file, indent=4)
json.dump(api_template, api_file, indent=4)
except Exception as e:
logging.error('Failed to change API file')
logging.error('{}'.format(e))
return False
logging.debug('API file changed')
else:
logging.error('API file is not writable. Wrong permissions?')
return False
logging.info('API file successfull changed to {}'.format(status))
return True
def get_status_and_time(raw_data):
'''
Create a timestamp, changes the value of the given byte into a string
and returns both.
param 1: byte object
return: tuple (boolean, integer)
'''
status = raw_data.decode('utf-8', 'strict') == '1'
timestamp = int(str(time()).split('.')[0])
logging.debug('Set values for timestamp: {} and status: {}'.format(
str(timestamp), str(status)))
return (status, timestamp)
def join_path(host, api):
'''
Becomes two parts (host and api) of the mastodon url and concanate them
param1: string
param2: string
return: string
'''
url = ''
try:
if host[-1] == '/' and api[0] == '/':
url = ''.join((host, api[1:]))
elif host[-1] != '/' and api[0] != '/':
url = '/'.join((host, api))
else:
url = ''.join((host, api))
except TypeError as e:
logging.error("Can't join path: {}".format(e))
return url
class Toot(threading.Thread):
'''
The thread to toot the status to mastodon.
'''
def __init__(self, status, timestamp, config):
'''
param1: boolean
param2: integer
param3: dictionary
'''
threading.Thread.__init__(self)
self.status = status
self.config = config
self.timestamp = timestamp
self.api = '/api/v1/statuses'
self.auth = {'Authorization': ''}
self.data = {'status': ''}
self.url = ''
def run(self):
'''
return: boolean
'''
timeformat = '%d.%m.%Y %H:%M Uhr'
# convert seconds into timestring
try:
timestring = strftime(timeformat, localtime(self.timestamp))
except Exception as e:
logging.error('Can not convert timestamp into timestring')
return False
# set status message
if self.status:
self.data['status'] = 'Krautspace is open since: {}'.format(timestring)
else:
self.data['status'] = 'Krautspace is closed since: {}'.format(timestring)
logging.debug('Message: {}'.format(self.data['status']))
# build mastodon api url
self.url = join_path(self.config['mastodon']['host'], self.api)
# build authentcation header
self.auth['Authorization'] = 'Bearer {}'.format(
self.config['mastodon']['token'])
# and finaly send request to mastodon
try:
logging.debug('Try to toot status')
response = requests.post(self.url, data = self.data,
headers = self.auth)
if response.status_code == 200:
logging.info('Toot successful send')
return True
else:
logging.error('Failed to toot. Response: {}'.format(response.status_code))
except Exception as e:
logging.error('Exception occurred: {}'.format(e))
return False
def main():
'''
The main function - open a socket, create a ssl context, load certs and
listen for connections. At SSL context we set only one available cipher
suite and disable compression.
OP_NO_COMPRESSION: prevention against CRIME attack
OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains CBC 4 attack
(cve-2011-3389)
'''
answer = '3'.encode(encoding='utf-8', errors='strict')
loglevel = logging.INFO
formatstring = '%(asctime)s: %(levelname)s: %(message)s'
logging.basicConfig(format=formatstring, level=loglevel)
default_config = {
'general': {
'timeout': 3.0,
'loglevel': 'info'
},
'server': {
'host': 'localhost',
'port': 10001,
'cert': './certs/server.crt',
'key': './certs/server.key'
},
'client': {
'cert': './certs/client.crt',
'required': 'true'
},
'api': {
'api': './api',
'template': './api_template'
},
'mastodon': {
'send': 'false',
'host': 'localhost',
'token': 'aaaaa-bbbbb-ccccc-ddddd-eeeee'
}
}
api_template = {
"api": "0.13",
"space": "Krautspace Hackspace Jena e.V.",
"url": "https://kraut.space",
"logo": "https://status.krautspace.de/images/krautspace_pixelbanner.png",
"location": {
"address": "Hackspace Jena e. V., Krautgasse 26, 07743 Jena, Germany",
"lat": 50.9292,
"lon": 11.5826
},
"state": {
"open": False,
"lastchange": 1563499131,
"icon": {
"open":"https://status.krautspace.de/images/status-open.png",
"closed":"https://status.krautspace.de/images/status-closed.png"
}
},
"feeds": {
"calendar": {
"type": "ical",
"url": "https://cloud.kraut.space/remote.php/dav/public-calendars/2EkPGt3PF6WwYsA3?export"
},
"blog": {
"type": "rss",
"url": "https://wiki.kraut.space/feed.php?mode=list&ns=blog:content"
}
},
"contact": {
"email": "office@kraut.space",
"mastodon": "@krautspace@chaos.social",
"matrix": "#krautchan:matrix.kraut.space"
},
"issue_report_channels": [
"email"
],
"projects": [
"https://git.kraut.space/Krautspace"
]
}
logging.info('Try to read config file')
configfile = './apistatusd.conf'
config = configparser.ConfigParser()
config.read_dict(default_config)
if not config.read(configfile):
logging.warning('Configuration file %s not found or not readable. Using default values.',
configfile)
logger = logging.getLogger()
if not config['general']['loglevel'] in ('critical', 'error', 'warning', 'info', 'debug'):
logging.warning('Invalid loglevel %s given. Using default level %s.',
config['general']['loglevel'],
default_config['general']['loglevel'])
config.set('general', 'loglevel', default_config['general']['loglevel'])
logging.info('Set loglevel to {}'.format(config['general']['loglevel'].upper()))
logger.setLevel(config['general']['loglevel'].upper())
print_config(config)
# todo: zertifikate sollten nur lesbar sein!
if not certs_readable(config):
logging.error('Cert check failed\nExit')
sys.exit(1)
# ssl context erstellen
context = create_ssl_context(config)
if context is None:
sys.exit(2)
print_context(context)
try:
# tcp socket öffnen => MySocket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as MySocket:
logging.debug('TCP Socket created')
MySocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# MySocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# keep = MySocket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
# logging.debug('Socket keepalive: {}'.format(keep))
try:
MySocket.bind((config['server']['host'], int(config['server']['port'])))
MySocket.listen(5)
logging.info('Listening on {} at Port {}'.format(config['server']['host'],
config['server']['port']))
except Exception as e:
logging.error('Unable to bind and listen')
logging.error('{}'.format(e))
sys.exit(3)
# endlos auf verbindungen warten => ClientSocket
while True:
ClientSocket, ClientAddress = MySocket.accept()
logging.info('Client connected: {}:{}'.format(ClientAddress[0], ClientAddress[1]))
# die verbindung in den ssl-context verpacken => Connection
try:
ClientSocket.settimeout(float(config['general']['timeout']))
logging.debug('set ssl handshake timeout to {}s'.format(ClientSocket.gettimeout()))
Connection = context.wrap_socket(ClientSocket, server_side=True)
logging.debug('SSL Connection established')
display_peercert(Connection.getpeercert(binary_form=False))
except Exception as e:
logging.error('Unexpected error: {}'.format(e))
continue
# empfangen und antworten
raw_data = Connection.recv(1)
if receive_buffer_is_valid(raw_data):
status, timestamp = get_status_and_time(raw_data)
if change_status(status, timestamp, api_template, config['api']['api']):
answer = raw_data
if config['mastodon']['send'].lower() == 'true':
logging.debug('Toot is set to true')
try:
toot_thread = Toot(status, timestamp, config)
toot_thread.run()
except InitException as e:
logging.error('InitException: {}'.format(e))
except Exception as ex:
logging.debug('Exception: {}'.format(ex))
else:
logging.debug('Toot is set to false')
logging.debug('Send {} back'.format(raw_data))
Connection.send(answer)
Connection.close()
except KeyboardInterrupt:
logging.info('Keyboard interrupt received')
if MySocket:
MySocket.close()
logging.debug('TCP socket closed')
sys.exit(255)
except Exception as e:
logging.error('{}'.format(e))
if MySocket:
MySocket.close()
logging.debug('TCP socket closed')
sys.exit(254)
if __name__ == '__main__':
main()