#!/usr/bin/python3 # file: apistatusd.py # date: 26.07.2019 # email: berhsi@web.de # Status server, listening for door status updates. The IPv4 address and port # to listen on are configurable, by default localhost:10001 is used. The # connection is secured by TLS and client side authentication. import json import logging import os import socket import ssl import sys import threading from mastodon import Mastodon from time import time, localtime, strftime, sleep import configparser class Toot(threading.Thread): ''' The thread to toot the status to mastodon. ''' def __init__(self, status, timestamp, config): ''' param1: boolean param2: integer param3: dictionary ''' threading.Thread.__init__(self) self.status = status self.config = config self.timestamp = timestamp self.mastodon = Mastodon(api_base_url = self.config['mastodon']['host'], access_token = self.config['mastodon']['token']) def run(self): ''' return: boolean send_toot(status, timestamp, config['mastodon']['host'], config['mastodon']['token']) ''' msg = None timeformat = '%d.%m.%Y %H:%M Uhr' timestring = strftime(timeformat, localtime(self.timestamp)) if self.status not in (True, False): logging.error('Invalid status to toot') timestring = strftime(timeformat, localtime(self.timestamp)) logging.debug('Try to toot status to {}'.format(host)) if self.status == True: msg = 'The krautspace is open since: {}'.format(timestring) elif self.status == False: msg = 'The krautspace is closed since: {}'.format(timestring) logging.debug('Send message: {}'.format(msg)) try: mastodon = Mastodon(api_base_url = host, access_token = token) mastodon.toot(mag) except Exception as e: logging.error('Failed to toot status') return False return False def send_toot(self): ''' Starts the thread ''' send_toot(status, timestamp, config['mastodon']['host'], config['mastodon']['token']) def certs_readable(config): ''' checks at start, if the needed certificates defined (no nullstring) and readable. param 1: dictionary return: boolean ''' for i in (config['server']['key'], config['server']['cert'], config['client']['cert']): if i == '' or os.access(i, os.R_OK) is False: logging.error('Cannot read {}'.format(i)) return False return True def print_config(config): ''' Logs the config with level debug. ''' logging.debug('Using config:') for section in config.sections(): logging.debug('Section {}'.format(section)) for i in config[section]: logging.debug(' {}: {}'.format(i, config[section][i])) def print_ciphers(cipherlist): ''' Prints the list of allowed ciphers. param1: dictionary return: boolean ''' logging.debug('Available ciphers') for i in cipherlist: for j in i.keys(): if j in ('name', 'protocol'): logging.debug('{}: {}'.format(j, i[j])) def print_context(ctx): ''' Prints the ssl settings for the given ssl context. param1: context object ''' logging.debug('----------- context ----------------') logging.debug('Minimum version ssl: {}'.format(ctx.minimum_version)) logging.debug('Maximum version ssl: {}'.format(ctx.maximum_version)) logging.debug('SSL options enabled: {}'.format(ctx.options)) logging.debug('Protocol: {}'.format(ctx.protocol)) logging.debug('Verify flags certificates: {}'.format(ctx.verify_flags)) logging.debug('Verify mode: {}'.format(ctx.verify_mode)) print_ciphers(ctx.get_ciphers()) logging.debug('------------------------------------') def display_peercert(cert): ''' Displays the values of a given certificate. param1: dictionary or none ''' if cert == None: logging.debug('Peer does not offer a certificate') elif len(cert) == 0: logging.debug('Peer certificate was not valid') else: logging.debug('Peer certificate commonName: {}'.format( cert['subject'][5][0][1])) logging.debug('Peer certificate serialNumber: {}'.format( cert['serialNumber'])) logging.debug('Peer certificate notBefore: {}'.format( cert['notBefore'])) logging.debug('Peer certificate notAfter: {}'.format( cert['notAfter'])) def receive_buffer_is_valid(raw_data): ''' Checks validity of the received buffer contents. param 1: byte object return: boolean ''' if raw_data.decode('utf-8', 'strict') in ('0', '1'): logging.debug('Argument is valid: {}'.format(raw_data)) return True logging.debug('Argument is not valid: {}'.format(raw_data)) return False def change_status(status, timestamp, filename): ''' Write the new status together with a timestamp into the Space API JSON. param 1: byte object param 2: string return: boolean ''' logging.debug('Change status API') # todo: use walrus operator := when migrating to python >= 3.8 data = read_api(filename) if data is False: return False if os.access(filename, os.W_OK): logging.debug('API file is writable') with open(filename, 'w') as api_file: logging.debug('API file open successfull') data["state"]["open"] = status data["state"]["lastchange"] = timestamp try: json.dump(data, api_file, indent=4) except Exception as e: logging.error('Failed to change API file') logging.error('{}'.format(e)) return False logging.debug('API file changed') else: logging.error('API file is not writable. Wrong permissions?') return False logging.info('API file successfull changed to {}'.format(status)) return True def read_api(api): ''' Reads the Space API JSON into a dict. Returns the dict on success and False on failure. param 1: string return: dict or boolean ''' logging.debug('Open API file: {}'.format(api)) # return early if the API JSON cannot be read if not os.access(api, os.R_OK): logging.error('Failed to read API file') return False logging.debug('API is readable') with open(api, 'r') as api_file: logging.debug('API file successfully opened') try: api_json_data = json.load(api_file) logging.debug('API file read successfull') except Exception as e: logging.error('Failed to read API file: {}'.format(e)) return False return api_json_data def get_status_and_time(raw_data): ''' Create a timestamp, changes the value of the given byte into a string and returns both. param 1: byte object return: tuple (boolean, integer) ''' status = True if raw_data.decode('utf-8', 'strict') == '1' else False timestamp = int(str(time()).split('.')[0]) logging.debug('Set values for timestamp: {} and status: {}'.format( str(timestamp), str(status))) return (status, timestamp) def main(): ''' The main function - open a socket, create a ssl context, load certs and listen for connections. At SSL context we set only one available cipher suite and disable compression. OP_NO_COMPRESSION: prevention against CRIME attack OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains CBC 4 attack (cve-2011-3389) ''' answer = '3'.encode(encoding='utf-8', errors='strict') loglevel = logging.WARNING formatstring = '%(asctime)s: %(levelname)s: %(message)s' logging.basicConfig(format=formatstring, level=loglevel) default_config = { 'general': { 'timeout': 3.0, 'loglevel': 'warning' }, 'server': { 'host': 'localhost', 'port': 10001, 'cert': './certs/server.crt', 'key': './certs/server.key' }, 'client': { 'cert': './certs/client.crt' }, 'api': { 'api': './api', 'template': './api_template' }, 'mastodon': { 'send': 'false', 'host': 'localhost', 'token': 'aaaaa-bbbbb-ccccc-ddddd-eeeee' } } configfile = './apistatusd.conf' config = configparser.ConfigParser() config.read_dict(default_config) if not config.read(configfile): logging.warning('Configuration file %s not found or not readable. Using default values.', configfile) logger = logging.getLogger() if not config['general']['loglevel'] in ('critical', 'error', 'warning', 'info', 'debug'): logging.warning('Invalid loglevel %s given. Using default level %s.', config['general']['loglevel'], default_config['general']['loglevel']) config.set('general', 'loglevel', default_config['general']['loglevel']) logger.setLevel(config['general']['loglevel'].upper()) print_config(config) # todo: zertifikate sollten nur lesbar sein! if not certs_readable(config): logging.error('Cert check failed\nExit') sys.exit(1) context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.verify_mode = ssl.CERT_OPTIONAL context.load_cert_chain(certfile=config['server']['cert'], keyfile=config['server']['key']) context.load_verify_locations(cafile=config['client']['cert']) context.options = ssl.OP_CIPHER_SERVER_PREFERENCE # ensure, compression is disabled (disabled by default anyway at the moment) context.options |= ssl.OP_NO_COMPRESSION logging.debug('SSL context created') print_context(context) with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket: logging.debug('Socket created') mySocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) keep = mySocket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) logging.debug('Socket keepalive: {}'.format(keep)) try: mySocket.bind((config['server']['host'], int(config['server']['port']))) mySocket.listen(5) except Exception as e: logging.error('Unable to bind and listen') logging.error('{}'.format(e)) sys.exit(1) logging.info('Listening on {} at Port {}'.format(config['server']['host'], config['server']['port'])) while True: try: fromSocket, fromAddr = mySocket.accept() logging.info('Client connected: {}:{}'.format(fromAddr[0], fromAddr[1])) try: fromSocket.settimeout(float(config['general']['timeout'])) logging.debug('Connection timeout set to {}'.format( config['general']['timeout'])) except Exception: logging.error('Cannot set timeout to {}'.format( config['general']['timeout'])) try: conn = context.wrap_socket(fromSocket, server_side=True) conn.settimeout(float(config['general']['timeout'])) except socket.timeout: logging.error('Socket timeout') continue except Exception as e: logging.error('Connection failed: {}'.format(e)) continue logging.info('Connection established') try: cert = conn.getpeercert(binary_form=False) display_peercert(cert) except ValueError: logging.debug('SSL handshake has not been done yet') except Exception as e: logging.debug('Unexpected error: {}'.format(e)) raw_data = conn.recv(1) if receive_buffer_is_valid(raw_data) is True: status, timestamp = get_status_and_time(raw_data) if change_status(status, timestamp, config['api']['api']) is True: answer = raw_data if config['mastodon']['send'].lower() == 'true': toot_threat = Toot(status, timestamp, config) toot_thread.run() logging.debug('Toot thread started') else: logging.debug('Toot is set to false') if conn: logging.debug('Send {} back'.format(raw_data)) conn.send(answer) sleep(0.1) # protection against dos except KeyboardInterrupt: logging.info('Keyboard interrupt received') sys.exit(1) except Exception as e: logging.error('{}'.format(e)) continue return 0 if __name__ == '__main__': main()