doorstatus/source/server/apistatusd.py
2022-07-13 19:10:17 +02:00

388 lines
14 KiB
Python
Executable file

#!/usr/bin/python3
# file: apistatusd.py
# date: 26.07.2019
# email: berhsi@web.de
# Status server, listening for door status updates. The IPv4 address and port
# to listen on are configurable, by default localhost:10001 is used. The
# connection is secured by TLS and client side authentication.
import json
import logging
import os
import socket
import ssl
import sys
import threading
from mastodon import Mastodon
from time import time, localtime, strftime, sleep
import configparser
class InitException(Exception):
'''
If the initialisation from the mastodon instance failes then we raise
this exception.
'''
def __init__(self, error):
self.error = error
class Toot(threading.Thread):
'''
The thread to toot the status to mastodon.
'''
def __init__(self, status, timestamp, config):
'''
param1: boolean
param2: integer
param3: dictionary
'''
threading.Thread.__init__(self)
self.status = status
self.config = config
self.timestamp = timestamp
try:
self.mastodon = Mastodon(
api_base_url = self.config['mastodon']['host'],
access_token = self.config['mastodon']['token'])
except Exception as e:
logging.error('Exception occurred: {}'.format(e))
raise InitException('Mastodon instance initialisation failed')
def run(self):
'''
return: boolean
'''
msg = None
timeformat = '%d.%m.%Y %H:%M Uhr'
if self.status not in (True, False):
logging.error('Invalid status to toot')
return False
try:
timestring = strftime(timeformat, localtime(self.timestamp))
except Exception as e:
logging.error('Can not convert timestamp into timestring')
return False
logging.debug('Try to toot status to {}'.format(self.config['mastodon']['host']))
if self.status == True:
msg = 'The krautspace is open since: {}'.format(timestring)
elif self.status == False:
msg = 'The krautspace is closed since: {}'.format(timestring)
logging.debug('Send message: {}'.format(msg))
try:
mastodon.toot(mag)
return True
except Exception as e:
logging.error('Failed to toot status')
return False
return False
def certs_readable(config):
'''
checks at start, if the needed certificates defined (no nullstring) and
readable.
param 1: dictionary
return: boolean
'''
for i in (config['server']['key'], config['server']['cert'],
config['client']['cert']):
if i == '' or os.access(i, os.R_OK) is False:
logging.error('Cannot read {}'.format(i))
return False
return True
def print_config(config):
'''
Logs the config with level debug.
'''
logging.debug('Using config:')
for section in config.sections():
logging.debug('Section {}'.format(section))
for i in config[section]:
logging.debug(' {}: {}'.format(i, config[section][i]))
def print_ciphers(cipherlist):
'''
Prints the list of allowed ciphers.
param1: dictionary
return: boolean
'''
logging.debug('Available ciphers')
for i in cipherlist:
for j in i.keys():
if j in ('name', 'protocol'):
logging.debug('{}: {}'.format(j, i[j]))
def print_context(ctx):
'''
Prints the ssl settings for the given ssl context.
param1: context object
'''
logging.debug('----------- context ----------------')
logging.debug('Minimum version ssl: {}'.format(ctx.minimum_version))
logging.debug('Maximum version ssl: {}'.format(ctx.maximum_version))
logging.debug('SSL options enabled: {}'.format(ctx.options))
logging.debug('Protocol: {}'.format(ctx.protocol))
logging.debug('Verify flags certificates: {}'.format(ctx.verify_flags))
logging.debug('Verify mode: {}'.format(ctx.verify_mode))
print_ciphers(ctx.get_ciphers())
logging.debug('------------------------------------')
def display_peercert(cert):
'''
Displays the values of a given certificate.
param1: dictionary or none
'''
if cert == None:
logging.debug('Peer does not offer a certificate')
elif len(cert) == 0:
logging.debug('Peer certificate was not valid')
else:
logging.debug('Peer certificate commonName: {}'.format(
cert['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format(
cert['serialNumber']))
logging.debug('Peer certificate notBefore: {}'.format(
cert['notBefore']))
logging.debug('Peer certificate notAfter: {}'.format(
cert['notAfter']))
def receive_buffer_is_valid(raw_data):
'''
Checks validity of the received buffer contents.
param 1: byte object
return: boolean
'''
if raw_data.decode('utf-8', 'strict') in ('0', '1'):
logging.debug('Argument is valid: {}'.format(raw_data))
return True
logging.debug('Argument is not valid: {}'.format(raw_data))
return False
def change_status(status, timestamp, filename):
'''
Write the new status together with a timestamp into the Space API JSON.
param 1: byte object
param 2: string
return: boolean
'''
logging.debug('Change status API')
# todo: use walrus operator := when migrating to python >= 3.8
data = read_api(filename)
if data is False:
return False
if os.access(filename, os.W_OK):
logging.debug('API file is writable')
with open(filename, 'w') as api_file:
logging.debug('API file open successfull')
data["state"]["open"] = status
data["state"]["lastchange"] = timestamp
try:
json.dump(data, api_file, indent=4)
except Exception as e:
logging.error('Failed to change API file')
logging.error('{}'.format(e))
return False
logging.debug('API file changed')
else:
logging.error('API file is not writable. Wrong permissions?')
return False
logging.info('API file successfull changed to {}'.format(status))
return True
def read_api(api):
'''
Reads the Space API JSON into a dict. Returns the dict on success and
False on failure.
param 1: string
return: dict or boolean
'''
logging.debug('Open API file: {}'.format(api))
# return early if the API JSON cannot be read
if not os.access(api, os.R_OK):
logging.error('Failed to read API file')
return False
logging.debug('API is readable')
with open(api, 'r') as api_file:
logging.debug('API file successfully opened')
try:
api_json_data = json.load(api_file)
logging.debug('API file read successfull')
except Exception as e:
logging.error('Failed to read API file: {}'.format(e))
return False
return api_json_data
def get_status_and_time(raw_data):
'''
Create a timestamp, changes the value of the given byte into a string
and returns both.
param 1: byte object
return: tuple (boolean, integer)
'''
status = True if raw_data.decode('utf-8', 'strict') == '1' else False
timestamp = int(str(time()).split('.')[0])
logging.debug('Set values for timestamp: {} and status: {}'.format(
str(timestamp), str(status)))
return (status, timestamp)
def main():
'''
The main function - open a socket, create a ssl context, load certs and
listen for connections. At SSL context we set only one available cipher
suite and disable compression.
OP_NO_COMPRESSION: prevention against CRIME attack
OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains CBC 4 attack
(cve-2011-3389)
'''
answer = '3'.encode(encoding='utf-8', errors='strict')
loglevel = logging.WARNING
formatstring = '%(asctime)s: %(levelname)s: %(message)s'
logging.basicConfig(format=formatstring, level=loglevel)
default_config = {
'general': {
'timeout': 3.0,
'loglevel': 'warning'
},
'server': {
'host': 'localhost',
'port': 10001,
'cert': './certs/server.crt',
'key': './certs/server.key'
},
'client': {
'cert': './certs/client.crt'
},
'api': {
'api': './api',
'template': './api_template'
},
'mastodon': {
'send': 'false',
'host': 'localhost',
'token': 'aaaaa-bbbbb-ccccc-ddddd-eeeee'
}
}
configfile = './apistatusd.conf'
config = configparser.ConfigParser()
config.read_dict(default_config)
if not config.read(configfile):
logging.warning('Configuration file %s not found or not readable. Using default values.',
configfile)
logger = logging.getLogger()
if not config['general']['loglevel'] in ('critical', 'error', 'warning', 'info', 'debug'):
logging.warning('Invalid loglevel %s given. Using default level %s.',
config['general']['loglevel'],
default_config['general']['loglevel'])
config.set('general', 'loglevel', default_config['general']['loglevel'])
logger.setLevel(config['general']['loglevel'].upper())
print_config(config)
# todo: zertifikate sollten nur lesbar sein!
if not certs_readable(config):
logging.error('Cert check failed\nExit')
sys.exit(1)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.verify_mode = ssl.CERT_OPTIONAL
context.load_cert_chain(certfile=config['server']['cert'],
keyfile=config['server']['key'])
context.load_verify_locations(cafile=config['client']['cert'])
context.options = ssl.OP_CIPHER_SERVER_PREFERENCE
# ensure, compression is disabled (disabled by default anyway at the moment)
context.options |= ssl.OP_NO_COMPRESSION
logging.debug('SSL context created')
print_context(context)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
logging.debug('Socket created')
mySocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
keep = mySocket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
logging.debug('Socket keepalive: {}'.format(keep))
try:
mySocket.bind((config['server']['host'], int(config['server']['port'])))
mySocket.listen(5)
except Exception as e:
logging.error('Unable to bind and listen')
logging.error('{}'.format(e))
sys.exit(1)
logging.info('Listening on {} at Port {}'.format(config['server']['host'],
config['server']['port']))
while True:
try:
fromSocket, fromAddr = mySocket.accept()
logging.info('Client connected: {}:{}'.format(fromAddr[0], fromAddr[1]))
try:
fromSocket.settimeout(float(config['general']['timeout']))
logging.debug('Connection timeout set to {}'.format(
config['general']['timeout']))
except Exception:
logging.error('Cannot set timeout to {}'.format(
config['general']['timeout']))
try:
conn = context.wrap_socket(fromSocket, server_side=True)
conn.settimeout(float(config['general']['timeout']))
except socket.timeout:
logging.error('Socket timeout')
continue
except Exception as e:
logging.error('Connection failed: {}'.format(e))
continue
logging.info('Connection established')
try:
cert = conn.getpeercert(binary_form=False)
display_peercert(cert)
except ValueError:
logging.debug('SSL handshake has not been done yet')
except Exception as e:
logging.debug('Unexpected error: {}'.format(e))
raw_data = conn.recv(1)
if receive_buffer_is_valid(raw_data) is True:
status, timestamp = get_status_and_time(raw_data)
if change_status(status, timestamp, config['api']['api']) is True:
answer = raw_data
if config['mastodon']['send'].lower() == 'true':
try:
toot_thread = Toot(status, timestamp, config)
toot_thread.run()
except InitException as e:
logging.debug('InitException: {}'.format(e))
except Exception as ex:
logging.debug('Exception: {}'.format(ex))
else: logging.debug('Toot is set to false')
if conn:
logging.debug('Send {} back'.format(raw_data))
conn.send(answer)
sleep(0.1) # protection against dos
except KeyboardInterrupt:
logging.info('Keyboard interrupt received')
sys.exit(1)
except Exception as e:
logging.error('{}'.format(e))
continue
return 0
if __name__ == '__main__':
main()