debugausgaben für den ssl-context hinzu

This commit is contained in:
example 2022-03-11 13:28:03 +01:00
parent 5f3bb44c7b
commit 58d9c327c9

View file

@ -43,35 +43,51 @@ def print_config(config):
for i in config[section]: for i in config[section]:
logging.debug(' {}: {}'.format(i, config[section][i])) logging.debug(' {}: {}'.format(i, config[section][i]))
def print_ciphers(cipherlist): def print_ciphers(cipherlist):
''' '''
Prints the list of allowed ciphers. Prints the list of allowed ciphers.
param1: dictionary param1: dictionary
return: boolean return: boolean
''' '''
print('Available ciphers') logging.debug('Available ciphers')
for i in cipherlist: for i in cipherlist:
print('\n')
for j in i.keys(): for j in i.keys():
print('{}: {}'.format(j, i[j])) if j in ('name', 'protocol'):
print('\n') logging.debug('{}: {}'.format(j, i[j]))
def print_context(ctx):
'''
Prints the ssl settings for the given ssl context.
param1: context object
'''
logging.debug('----------- context ----------------')
logging.debug('Minimum version ssl: {}'.format(ctx.minimum_version))
logging.debug('Maximum version ssl: {}'.format(ctx.maximum_version))
logging.debug('SSL options enabled: {}'.format(ctx.options))
logging.debug('Protocol: {}'.format(ctx.protocol))
logging.debug('Verify flags certificates: {}'.format(ctx.verify_flags))
logging.debug('Verify mode: {}'.format(ctx.verify_mode))
print_ciphers(ctx.get_ciphers())
logging.debug('------------------------------------')
def display_peercert(cert): def display_peercert(cert):
''' '''
Displays the values of a given certificate. Displays the values of a given certificate.
param1: dictionary param1: dictionary or none
return: boolean
''' '''
for i in cert.keys(): if cert == None:
print('{}:'.format(i)) logging.debug('Peer does not offer a certificate')
if i in ('subject', 'issuer'): elif len(cert) == 0:
for j in cert[i]: logging.debug('Peer certificate was not valid')
print('\t{}'.format(j)) else:
else: logging.debug('Peer certificate commonName: {}'.format(
print('\t{}'.format(cert[i])) cert['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format(
cert['serialNumber']))
logging.debug('Peer certificate notBefore: {}'.format(
cert['notBefore']))
logging.debug('Peer certificate notAfter: {}'.format(
cert['notAfter']))
def receive_buffer_is_valid(raw_data): def receive_buffer_is_valid(raw_data):
''' '''
@ -225,12 +241,11 @@ def main():
context.load_cert_chain(certfile=config['server']['cert'], context.load_cert_chain(certfile=config['server']['cert'],
keyfile=config['server']['key']) keyfile=config['server']['key'])
context.load_verify_locations(cafile=config['client']['cert']) context.load_verify_locations(cafile=config['client']['cert'])
context.set_ciphers('EECDH+AESGCM') # only ciphers for tls 1.2 and 1.3
context.options = ssl.OP_CIPHER_SERVER_PREFERENCE context.options = ssl.OP_CIPHER_SERVER_PREFERENCE
# ensure, compression is disabled (disabled by default anyway at the moment) # ensure, compression is disabled (disabled by default anyway at the moment)
context.options |= ssl.OP_NO_COMPRESSION context.options |= ssl.OP_NO_COMPRESSION
logging.debug('SSL context created') logging.debug('SSL context created')
# print_ciphers(context.get_ciphers()) print_context(context)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket: with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
logging.debug('Socket created') logging.debug('Socket created')
@ -263,13 +278,18 @@ def main():
conn.settimeout(float(config['general']['timeout'])) conn.settimeout(float(config['general']['timeout']))
except socket.timeout: except socket.timeout:
logging.error('Socket timeout') logging.error('Socket timeout')
continue
except Exception as e: except Exception as e:
logging.error('Connection failed: {}'.format(e)) logging.error('Connection failed: {}'.format(e))
continue
logging.info('Connection established') logging.info('Connection established')
logging.info('Peer certificate commonName: {}'.format( try:
conn.getpeercert()['subject'][5][0][1])) cert = conn.getpeercert(binary_form=False)
logging.debug('Peer certificate serialNumber: {}'.format( display_peercert(cert)
conn.getpeercert()['serialNumber'])) except ValueError:
logging.debug('SSL handshake has not been done yet')
except Exception as e:
logging.debug('Unexpected error: {}'.format(e))
raw_data = conn.recv(1) raw_data = conn.recv(1)
if receive_buffer_is_valid(raw_data) is True: if receive_buffer_is_valid(raw_data) is True: