doorstatus/source/server/apistatusd.py
+++ edece83dd1 kleinere umstrukturierung
auswertung der konfig fuer mastodon, default_config erweitert,
set_values() in get_status_and_timestamp() umbenannt,
statusstring und timestamp werden in main() aufgerufen und an andere funktionen uebergeben
2022-07-10 18:17:33 +02:00

322 lines
11 KiB
Python
Executable file

#!/usr/bin/python3
# file: apistatusd.py
# date: 26.07.2019
# email: berhsi@web.de
# Status server, listening for door status updates. The IPv4 address and port
# to listen on are configurable, by default localhost:10001 is used. The
# connection is secured by TLS and client side authentication.
import json
import logging
import os
import socket
import ssl
import sys
from mastodon import Mastodon
from time import time, sleep
import configparser
def certs_readable(config):
'''
checks at start, if the needed certificates defined (no nullstring) and
readable.
param 1: dictionary
return: boolean
'''
for i in (config['server']['key'], config['server']['cert'],
config['client']['cert']):
if i == '' or os.access(i, os.R_OK) is False:
logging.error('Cannot read {}'.format(i))
return False
return True
def print_config(config):
'''
Logs the config with level debug.
'''
logging.debug('Using config:')
for section in config.sections():
logging.debug('Section {}'.format(section))
for i in config[section]:
logging.debug(' {}: {}'.format(i, config[section][i]))
def print_ciphers(cipherlist):
'''
Prints the list of allowed ciphers.
param1: dictionary
return: boolean
'''
logging.debug('Available ciphers')
for i in cipherlist:
for j in i.keys():
if j in ('name', 'protocol'):
logging.debug('{}: {}'.format(j, i[j]))
def print_context(ctx):
'''
Prints the ssl settings for the given ssl context.
param1: context object
'''
logging.debug('----------- context ----------------')
logging.debug('Minimum version ssl: {}'.format(ctx.minimum_version))
logging.debug('Maximum version ssl: {}'.format(ctx.maximum_version))
logging.debug('SSL options enabled: {}'.format(ctx.options))
logging.debug('Protocol: {}'.format(ctx.protocol))
logging.debug('Verify flags certificates: {}'.format(ctx.verify_flags))
logging.debug('Verify mode: {}'.format(ctx.verify_mode))
print_ciphers(ctx.get_ciphers())
logging.debug('------------------------------------')
def display_peercert(cert):
'''
Displays the values of a given certificate.
param1: dictionary or none
'''
if cert == None:
logging.debug('Peer does not offer a certificate')
elif len(cert) == 0:
logging.debug('Peer certificate was not valid')
else:
logging.debug('Peer certificate commonName: {}'.format(
cert['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format(
cert['serialNumber']))
logging.debug('Peer certificate notBefore: {}'.format(
cert['notBefore']))
logging.debug('Peer certificate notAfter: {}'.format(
cert['notAfter']))
def receive_buffer_is_valid(raw_data):
'''
Checks validity of the received buffer contents.
param 1: byte object
return: boolean
'''
if raw_data.decode('utf-8', 'strict') in ('0', '1'):
logging.debug('Argument is valid: {}'.format(raw_data))
return True
logging.debug('Argument is not valid: {}'.format(raw_data))
return False
def change_status(status, timestamp, filename):
'''
Write the new status together with a timestamp into the Space API JSON.
param 1: byte object
param 2: string
return: boolean
'''
logging.debug('Change status API')
# todo: use walrus operator := when migrating to python >= 3.8
data = read_api(filename)
if data is False:
return False
if os.access(filename, os.W_OK):
logging.debug('API file is writable')
with open(filename, 'w') as api_file:
logging.debug('API file open successfull')
data["state"]["open"] = status
data["state"]["lastchange"] = timestamp
try:
json.dump(data, api_file, indent=4)
except Exception as e:
logging.error('Failed to change API file')
logging.error('{}'.format(e))
return False
logging.debug('API file changed')
else:
logging.error('API file is not writable. Wrong permissions?')
return False
logging.info('API file successfull changed to {}'.format(status))
return True
def read_api(api):
'''
Reads the Space API JSON into a dict. Returns the dict on success and
False on failure.
param 1: string
return: dict or boolean
'''
logging.debug('Open API file: {}'.format(api))
# return early if the API JSON cannot be read
if not os.access(api, os.R_OK):
logging.error('Failed to read API file')
return False
logging.debug('API is readable')
with open(api, 'r') as api_file:
logging.debug('API file successfully opened')
try:
api_json_data = json.load(api_file)
logging.debug('API file read successfull')
except Exception as e:
logging.error('Failed to read API file: {}'.format(e))
return False
return api_json_data
def get_status_and_time(raw_data):
'''
Create a timestamp, changes the value of the given byte into a string
and returns both.
param 1: byte object
return: tuple
'''
status = True if raw_data.decode('utf-8', 'strict') == '1' else False
timestamp = int(str(time()).split('.')[0])
logging.debug('Set values for timestamp: {} and status: {}'.format(
str(timestamp), str(status)))
return (status, timestamp)
def main():
'''
The main function - open a socket, create a ssl context, load certs and
listen for connections. At SSL context we set only one available cipher
suite and disable compression.
OP_NO_COMPRESSION: prevention against CRIME attack
OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains CBC 4 attack
(cve-2011-3389)
'''
answer = '3'.encode(encoding='utf-8', errors='strict')
loglevel = logging.WARNING
formatstring = '%(asctime)s: %(levelname)s: %(message)s'
logging.basicConfig(format=formatstring, level=loglevel)
default_config = {
'general': {
'timeout': 3.0,
'loglevel': 'warning'
},
'server': {
'host': 'localhost',
'port': 10001,
'cert': './certs/server.crt',
'key': './certs/server.key'
},
'client': {
'cert': './certs/client.crt'
},
'api': {
'api': './api',
'template': './api_template'
},
'mastodon': {
'send': 'false',
'host': 'localhost',
'token': 'aaaaa-bbbbb-ccccc-ddddd-eeeee'
}
}
configfile = './apistatusd.conf'
config = configparser.ConfigParser()
config.read_dict(default_config)
if not config.read(configfile):
logging.warning('Configuration file %s not found or not readable. Using default values.',
configfile)
logger = logging.getLogger()
if not config['general']['loglevel'] in ('critical', 'error', 'warning', 'info', 'debug'):
logging.warning('Invalid loglevel %s given. Using default level %s.',
config['general']['loglevel'],
default_config['general']['loglevel'])
config.set('general', 'loglevel', default_config['general']['loglevel'])
logger.setLevel(config['general']['loglevel'].upper())
print_config(config)
# todo: zertifikate sollten nur lesbar sein!
if not certs_readable(config):
logging.error('Cert check failed\nExit')
sys.exit(1)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.verify_mode = ssl.CERT_OPTIONAL
context.load_cert_chain(certfile=config['server']['cert'],
keyfile=config['server']['key'])
context.load_verify_locations(cafile=config['client']['cert'])
context.options = ssl.OP_CIPHER_SERVER_PREFERENCE
# ensure, compression is disabled (disabled by default anyway at the moment)
context.options |= ssl.OP_NO_COMPRESSION
logging.debug('SSL context created')
print_context(context)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
logging.debug('Socket created')
mySocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
keep = mySocket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
logging.debug('Socket keepalive: {}'.format(keep))
try:
mySocket.bind((config['server']['host'], int(config['server']['port'])))
mySocket.listen(5)
except Exception as e:
logging.error('Unable to bind and listen')
logging.error('{}'.format(e))
sys.exit(1)
logging.info('Listening on {} at Port {}'.format(config['server']['host'],
config['server']['port']))
while True:
try:
fromSocket, fromAddr = mySocket.accept()
logging.info('Client connected: {}:{}'.format(fromAddr[0], fromAddr[1]))
try:
fromSocket.settimeout(float(config['general']['timeout']))
logging.debug('Connection timeout set to {}'.format(
config['general']['timeout']))
except Exception:
logging.error('Cannot set timeout to {}'.format(
config['general']['timeout']))
try:
conn = context.wrap_socket(fromSocket, server_side=True)
conn.settimeout(float(config['general']['timeout']))
except socket.timeout:
logging.error('Socket timeout')
continue
except Exception as e:
logging.error('Connection failed: {}'.format(e))
continue
logging.info('Connection established')
try:
cert = conn.getpeercert(binary_form=False)
display_peercert(cert)
except ValueError:
logging.debug('SSL handshake has not been done yet')
except Exception as e:
logging.debug('Unexpected error: {}'.format(e))
raw_data = conn.recv(1)
if receive_buffer_is_valid(raw_data) is True:
status, timestamp = get_status_and_time(raw_data)
if change_status(status, timestamp, config['api']['api']) is True:
answer = raw_data
if config['mastodon']['send'].lower() == 'true':
logging.debug('Try to toot status')
else: logging.debug('Toot is set to false')
if conn:
logging.debug('Send {} back'.format(raw_data))
conn.send(answer)
sleep(0.1) # protection against dos
except KeyboardInterrupt:
logging.info('Keyboard interrupt received')
sys.exit(1)
except Exception as e:
logging.error('{}'.format(e))
continue
return 0
if __name__ == '__main__':
main()