#!/usr/bin/env python3 # -*- coding: utf-8 -*- # file: setstatus.py # date: 26.07.2019 # email: berhsi@web.de # Setstatus.py is part of doorstatus - a programm to deal with the # krautspaces doorstatus. # client, who connects to the statusserver at port 10001 to update the # krautspace door status. If no status is given as argument, he reads from # stdin until input is 0 or 1. import os import ssl import socket import logging import configparser from time import sleep from sys import exit, argv # basic configuration loglevel = logging.WARNING formatstring = '%(asctime)s: %(levelname)s: %(message)s' logging.basicConfig(format=formatstring, level=loglevel) class SetStatus: def __init__(self): """ """ self.status = None self.config = None self.log = None self.context = None self.connection = None self.configfile = './setstatus.conf' self.config = configparser.ConfigParser() self.default_config = { 'general': { 'timeout': 5.0, 'loglevel': 'warning' }, 'server': { 'host': 'localhost', 'port': 10001, 'cert': './certs/server-pub.pem', 'fqdn': 'kraut.space' }, 'client': { 'cert': './certs/client-pub.pem', 'key': './certs/client-key.pem' } } def check_status(self): """ return: boolean """ try: self.status = int(self.status) except Exception as e: self.log.error('Status argument does not represent a integer') return False if self.status in (0, 1): self.log.debug('Set value to {}'.format(self.status)) self.status = bytes([self.status]) return True return False def set_config(self): """ """ self.log = logging.getLogger() # read config file self.config.read_dict(self.default_config) if not self.config.read(self.configfile): self.log.warning('Configuration file {} not found or not ' ' readable.'.format(self.configfile)) self.log.warning('Using default values.') # set loglevel config_loglevel = self.config['general']['loglevel'] default_loglevel = self.default_config['general']['loglevel'] if not config_loglevel.lower() in ('critical', 'error','warning', 'info', 'debug'): self.log.warning('Invalid loglevel {} given. Using default ' 'level {}.'.format(config_loglevel, default_loglevel)) self.config.set('general', 'loglevel', default_loglevel) self.log.setLevel(config_loglevel.upper()) def check_certs(self, certs): """ Check if certs readable. """ self.log.debug('Check certificates') for certfile in certs: if os.access(certfile, os.R_OK) is False: self.log.error('Failed to read certificate: {}'.format( \ certfile)) return False return True def log_config(self): """ Logs the config if loglevel is debug. """ logging.debug('Using config:') for section in self.config.sections(): logging.debug('Section {}'.format(section)) for i in self.config[section]: logging.debug(' {}: {}'.format(i, self.config[section][i])) def create_ssl_context(self): """ """ context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.config['server']['cert']) if not context: self.log.error('Failed to create SSL Context') return False context.set_ciphers('EECDH+AESGCM') # only ciphers for tls 1.2 and 1.3 context.options |= getattr(ssl._ssl, 'OP_NO_COMPRESSION', 0) try: context.load_cert_chain(certfile=self.config['client']['cert'], keyfile=self.config['client']['key']) except Exception as e: self.log.error('Failed to load cert chain') return False; self.log.debug('SSL context created') return context def create_ssl_socket(self, config, context): """ """ bare_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) if not bare_socket: logging.error('Socket creation failed') return False logging.debug('Socket created') try: secure_socket = context.wrap_socket(bare_socket, server_side=False, server_hostname=config['server']['fqdn']) logging.debug('Socket wrapped with SSL') except Exception as e: logging.error('Context wrapper failed: {}'.format(e)) return False try: secure_socket.settimeout(float(config['general']['timeout'])) except Exception as e: logging.debug('Failed to set timeout: {}'.format(e)) return secure_socket def create_ssl_connection(self): """ """ counter = 0 conn = False while conn is False and counter < 5: ssl_socket = self.create_ssl_socket(self.config, self.context) if ssl_socket is False: exit(4) try: self.log.debug('Connect {}: {}'.format(self.config['server']['host'], int(self.config['server']['port']))) conn = ssl_socket.connect((self.config['server']['host'], int(self.config['server']['port']))) except socket.timeout: self.log.error('Connection timeout') ssl_socket.close() sleep(5) counter += 1 except Exception as e: self.log.error('Connection failed: {}'.format(e)) exit(5) self.log.debug('Conection established') self.log.debug('Peer certificate commonName: {}'.format( ssl_socket.getpeercert()['subject'][5][0][1])) self.log.debug('Peer certificate serialNumber: {}'.format( ssl_socket.getpeercert()['serialNumber'])) return ssl_socket def run(self, status): """ starts the engine. param 1: integer """ self.status = status # read config self.set_config() # check given status if self.check_status() is False: self.log.error('No valid status given') exit(1) # log config if level is debug if self.config['general']['loglevel'].lower() == 'debug': self.log_config() # check certificates are readable certs = (self.config['server']['cert'], self.config['client']['cert'], self.config['client']['key']) if self.check_certs(certs) is False: exit(2) # create ssl context self.context = self.create_ssl_context() if self.context is False: exit(3) # get connection self.connection = self.create_ssl_connection() # send status try: self.log.debug('Send new status: {}'.format(self.status)) self.connection.send(self.status) except Exception as e: self.log.error('Error: {}'.format(e)) exit(6) try: response = self.connection.recv(1) self.log.debug('Server returns: {}'.format(response)) if response == self.status: self.log.info('Status sucessfull updated') else: self.log.error('Failed to update status') self.log.debug('Disconnect from server') except Exception as e: self.log.error('Error: {}'.format(e)) exit(7) if __name__ == '__main__': s = SetStatus() if len(argv) < 2: log.error('Usage: setstatus.py <0|1>') exit(255) else: s.run(argv[1])