#!/usr/bin/env python3 # -*- coding: utf-8 -*- # file: setstatus.py # date: 26.07.2019 # email: berhsi@web.de # Setstatus.py is part of doorstatus - a programm to deal with the # krautspaces doorstatus. # client, who connects to the statusserver at port 10001 to update the # krautspace door status. import os import ssl import socket import logging import configparser from time import sleep from sys import exit, argv # basic configuration loglevel = logging.WARNING formatstring = '%(asctime)s: %(levelname)s: %(message)s' logging.basicConfig(format=formatstring, level=loglevel) class SetStatus: def __init__(self): """ """ self.status = None self.config = None self.log = None self.context = None self.connection = None self.configfile = './setstatus.conf' self.config = configparser.ConfigParser() self.default_config = { 'general': { 'timeout': 5.0, 'loglevel': 'warning' }, 'server': { 'host': 'localhost', 'port': 10001, 'cert': './certs/server-pub.pem', 'fqdn': 'kraut.space' }, 'client': { 'cert': './certs/client-pub.pem', 'key': './certs/client-key.pem' } } def check_status(self): """ checkes, if the self.status variable is a valid value return: boolean """ if self.status in ('0', '1'): self.log.debug('Set status to {}'.format(self.status)) return True self.log.debug('{} is not a valid status'.format(self.status)) return False def set_config(self): """ Tries to read and use the values from the configuration file. If this failes, we still use the default values. """ self.log = logging.getLogger() # read config file self.config.read_dict(self.default_config) if not self.config.read(self.configfile): self.log.warning('Configuration file {} not found or not ' ' readable.'.format(self.configfile)) self.log.warning('Using default values.') # set loglevel config_loglevel = self.config['general']['loglevel'] default_loglevel = self.default_config['general']['loglevel'] if not config_loglevel.lower() in ('critical', 'error','warning', 'info', 'debug'): self.log.warning('Invalid loglevel {} given. Using default ' 'level {}.'.format(config_loglevel, default_loglevel)) self.config.set('general', 'loglevel', default_loglevel) self.log.setLevel(config_loglevel.upper()) def check_certs(self, certs): """ Check if certs are readable. return: boolean """ self.log.debug('Check certificates') for certfile in certs: if os.access(certfile, os.R_OK) is False: self.log.error('Failed to read certificate: {}'.format( \ certfile)) return False return True def display_cert(self, cert): """ param 1: cert object """ self.log.debug('--- Peer cert ---') self.log.debug('Subject: {}'.format(cert['subject'])) self.log.debug('SerialNumber: {}'.format(cert['serialNumber'])) self.log.debug('-----------------') def log_config(self): """ Logs the config if loglevel is debug. """ logging.debug('Using config:') for section in self.config.sections(): logging.debug('Section {}'.format(section)) for i in self.config[section]: logging.debug(' {}: {}'.format(i, self.config[section][i])) def create_ssl_context(self): """ Creates SSL context return: context object or false """ try: context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) except Exception as e: self.log.error('Failed to create SSL Context') return False context.load_verify_locations(cafile=self.config['server']['cert']) context.load_cert_chain(certfile=self.config['client']['cert'], keyfile=self.config['client']['key']) context.set_ciphers('EECDH+AESGCM') # only ciphers for tls 1.2 and 1.3 context.options |= getattr(ssl._ssl, 'OP_NO_COMPRESSION', 0) self.log.debug('SSL context created') return context def create_ssl_socket(self, config, context): """ Opens a socket and wrapes the socket into the given ssl context. param1: dictionary param2: ssl context return: ssl-socket or false """ bare_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) if not bare_socket: logging.error('Socket creation failed') return False logging.debug('Socket created') try: secure_socket = context.wrap_socket(bare_socket, server_side=False, server_hostname=config['server']['fqdn']) logging.debug('Socket wrapped with SSL') except Exception as e: logging.error('Context wrapper failed: {}'.format(e)) return False try: secure_socket.settimeout(float(config['general']['timeout'])) except Exception as e: logging.debug('Failed to set timeout: {}'.format(e)) return secure_socket def create_ssl_connection(self): """ """ counter = 0 conn = False while conn is False and counter < 5: ssl_socket = self.create_ssl_socket(self.config, self.context) if ssl_socket is False: exit(4) try: self.log.debug('Connect {}: {}'.format(self.config['server']['host'], int(self.config['server']['port']))) conn = ssl_socket.connect((self.config['server']['host'], int(self.config['server']['port']))) except socket.timeout: self.log.error('Connection timeout') ssl_socket.close() sleep(5) counter += 1 except Exception as e: self.log.error('Connection failed: {}'.format(e)) exit(5) self.log.debug('Conection established') self.display_cert(ssl_socket.getpeercert()) return ssl_socket def run(self, status): """ starts the engine. param 1: integer """ self.status = status # read config self.set_config() # check given status if self.check_status() is False: exit(1) # log config if level is debug if self.config['general']['loglevel'].lower() == 'debug': self.log_config() # check certificates are readable certs = (self.config['server']['cert'], self.config['client']['cert'], self.config['client']['key']) if self.check_certs(certs) is False: exit(2) # create ssl context self.context = self.create_ssl_context() if self.context is False: exit(3) # get a ssl encrypted connection self.connection = self.create_ssl_connection() # send status try: self.log.debug('Send new status: {}'.format(self.status)) self.connection.send(self.status.encode(encoding='utf-8', errors='strict')) except Exception as e: self.log.error('Error: {}'.format(e)) exit(6) try: response = self.connection.recv(1).decode(encoding='utf-8', errors='strict') self.log.debug('Server returns: {}'.format(response)) if response == self.status: self.log.info('Status sucessfull updated') else: self.log.error('Failed to update status') self.log.debug('Disconnect from server') except Exception as e: self.log.error('Error: {}'.format(e)) exit(7) if __name__ == '__main__': s = SetStatus() if len(argv) < 2: log.error('Usage: setstatus.py <0|1>') exit(255) else: s.run(argv[1])