doorstatus/source/server/apistatusd.py

407 lines
15 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# file: apistatusd.py
# date: 26.07.2019
# mail: berhsi@web.de
# Status server, listening for door status updates. The IPv4 address and port
# to listen on are configurable, by default localhost:10001 is used. The
# connection is secured by TLS and client side authentication.
try:
import json
import logging
import os
import socket
import ssl
import sys
import requests
import threading
from time import time, localtime, strftime, sleep
import configparser
except ImportException as e:
print('Import error: {}'.format(e))
def certs_readable(config):
'''
checks at start, if the needed certificates defined (no nullstring) and
readable.
param 1: dictionary
return: boolean
'''
for i in (config['server']['key'], config['server']['cert'],
config['client']['cert']):
if i == '' or os.access(i, os.R_OK) is False:
logging.error('Cannot read {}'.format(i))
return False
return True
def print_config(config):
'''
Logs the config with level debug.
'''
logging.debug('Using config:')
for section in config.sections():
logging.debug('Section {}'.format(section))
for i in config[section]:
if i == 'token':
logging.debug(' {}: {}'.format(i, 'aaaaa-bbbbb-ccccc-ddddd-eeeee'))
else:
logging.debug(' {}: {}'.format(i, config[section][i]))
def print_ciphers(cipherlist):
'''
Prints the list of allowed ciphers.
param1: dictionary
return: boolean
'''
logging.debug('Available ciphers')
for i in cipherlist:
for j in i.keys():
if j in ('name', 'protocol'):
logging.debug('{}: {}'.format(j, i[j]))
def print_context(ctx):
'''
Prints the ssl settings for the given ssl context.
param1: context object
'''
logging.debug('----------- context ----------------')
logging.debug('Minimum version ssl: {}'.format(ctx.minimum_version))
logging.debug('Maximum version ssl: {}'.format(ctx.maximum_version))
logging.debug('SSL options enabled: {}'.format(ctx.options))
logging.debug('Protocol: {}'.format(ctx.protocol))
logging.debug('Verify flags certificates: {}'.format(ctx.verify_flags))
logging.debug('Verify mode: {}'.format(ctx.verify_mode))
print_ciphers(ctx.get_ciphers())
logging.debug('------------------------------------')
def display_peercert(cert):
'''
Displays the values of a given certificate.
param1: dictionary or none
'''
if cert == None:
logging.debug('Peer does not offer a certificate')
elif len(cert) == 0:
logging.debug('Peer certificate was not valid')
else:
logging.debug('Peer certificate commonName: {}'.format(
cert['subject'][5][0][1]))
logging.debug('Peer certificate serialNumber: {}'.format(
cert['serialNumber']))
logging.debug('Peer certificate notBefore: {}'.format(
cert['notBefore']))
logging.debug('Peer certificate notAfter: {}'.format(
cert['notAfter']))
def receive_buffer_is_valid(raw_data):
'''
Checks validity of the received buffer contents.
param 1: byte object
return: boolean
'''
if raw_data.decode('utf-8', 'strict') in ('0', '1'):
logging.debug('Argument is valid: {}'.format(raw_data))
return True
logging.debug('Argument is not valid: {}'.format(raw_data))
return False
def change_status(status, timestamp, filename):
'''
Write the new status together with a timestamp into the Space API JSON.
param 1: byte object
param 2: string
return: boolean
'''
logging.debug('Change status API')
# todo: use walrus operator := when migrating to python >= 3.8
data = read_api(filename)
if data is False:
return False
if os.access(filename, os.W_OK):
logging.debug('API file is writable')
with open(filename, 'w') as api_file:
logging.debug('API file open successfull')
data["state"]["open"] = status
data["state"]["lastchange"] = timestamp
try:
json.dump(data, api_file, indent=4)
except Exception as e:
logging.error('Failed to change API file')
logging.error('{}'.format(e))
return False
logging.debug('API file changed')
else:
logging.error('API file is not writable. Wrong permissions?')
return False
logging.info('API file successfull changed to {}'.format(status))
return True
def read_api(api):
'''
Reads the Space API JSON into a dict. Returns the dict on success and
False on failure.
param 1: string
return: dict or boolean
'''
logging.debug('Open API file: {}'.format(api))
# return early if the API JSON cannot be read
if not os.access(api, os.R_OK):
logging.error('Failed to read API file')
return False
logging.debug('API is readable')
with open(api, 'r') as api_file:
logging.debug('API file successfully opened')
try:
api_json_data = json.load(api_file)
logging.debug('API file read successfull')
except Exception as e:
logging.error('Failed to read API file: {}'.format(e))
return False
return api_json_data
def get_status_and_time(raw_data):
'''
Create a timestamp, changes the value of the given byte into a string
and returns both.
param 1: byte object
return: tuple (boolean, integer)
'''
status = True if raw_data.decode('utf-8', 'strict') == '1' else False
timestamp = int(str(time()).split('.')[0])
logging.debug('Set values for timestamp: {} and status: {}'.format(
str(timestamp), str(status)))
return (status, timestamp)
def join_path(host, api):
'''
Becomes two parts (host and api) of the mastodon url and concanate them
param1: string
param2: string
return: string
'''
url = ''
try:
if host[-1] == '/' and api[0] == '/':
url = ''.join((host, api[1:]))
elif host[-1] != '/' and api[0] != '/':
url = '/'.join((host, api))
else:
url = ''.join((host, api))
except TypeError as e:
logging.error('Can´t join path: {}'.format(e))
return url
class Toot(threading.Thread):
'''
The thread to toot the status to mastodon.
'''
def __init__(self, status, timestamp, config):
'''
param1: boolean
param2: integer
param3: dictionary
'''
threading.Thread.__init__(self)
self.status = status
self.config = config
self.timestamp = timestamp
self.api = '/api/v1/statuses'
self.auth = {'Authorization': ''}
self.data = {'status': ''}
self.url = ''
def run(self):
'''
return: boolean
'''
timeformat = '%d.%m.%Y %H:%M Uhr'
# check if status is valid
if self.status not in (True, False):
logging.error('Invalid status to toot')
return False
# convert seconds into timestring
try:
timestring = strftime(timeformat, localtime(self.timestamp))
except Exception as e:
logging.error('Can not convert timestamp into timestring')
return False
# set status message
if self.status == True:
self.data['status'] = 'Krautspace is open since: {}'.format(timestring)
elif self.status == False:
self.data['status'] = 'Krautspace is closed since: {}'.format(timestring)
logging.debug('Message: {}'.format(self.data['status']))
# build mastodon api url
self.url = join_path(self.config['mastodon']['host'], self.api)
# build authentcation header
self.auth['Authorization'] = 'Bearer {}'.format(
self.config['mastodon']['token'])
# and finaly send request to mastodon
try:
logging.debug('Try to toot status')
response = requests.post(self.url, data = self.data,
headers = self.auth)
if response.status_code == 200:
logging.info('Toot successful send')
return True
else:
logging.error('Failed to toot. Response: {}'.format(response.status_code))
except Exception as e:
logging.error('Exception occurred: {}'.format(e))
return False
def main():
'''
The main function - open a socket, create a ssl context, load certs and
listen for connections. At SSL context we set only one available cipher
suite and disable compression.
OP_NO_COMPRESSION: prevention against CRIME attack
OP_DONT_ISERT_EMPTY_FRAGMENTS: prevention agains CBC 4 attack
(cve-2011-3389)
'''
answer = '3'.encode(encoding='utf-8', errors='strict')
loglevel = logging.INFO
formatstring = '%(asctime)s: %(levelname)s: %(message)s'
logging.basicConfig(format=formatstring, level=loglevel)
default_config = {
'general': {
'timeout': 3.0,
'loglevel': 'info'
},
'server': {
'host': 'localhost',
'port': 10001,
'cert': './certs/server.crt',
'key': './certs/server.key'
},
'client': {
'cert': './certs/client.crt'
},
'api': {
'api': './api',
'template': './api_template'
},
'mastodon': {
'send': 'false',
'host': 'localhost',
'token': 'aaaaa-bbbbb-ccccc-ddddd-eeeee'
}
}
logging.info('Try to read config file')
configfile = './apistatusd.conf'
config = configparser.ConfigParser()
config.read_dict(default_config)
if not config.read(configfile):
logging.warning('Configuration file %s not found or not readable. Using default values.',
configfile)
logger = logging.getLogger()
if not config['general']['loglevel'] in ('critical', 'error', 'warning', 'info', 'debug'):
logging.warning('Invalid loglevel %s given. Using default level %s.',
config['general']['loglevel'],
default_config['general']['loglevel'])
config.set('general', 'loglevel', default_config['general']['loglevel'])
logging.info('Set loglevel to {}'.format(config['general']['loglevel'].upper()))
logger.setLevel(config['general']['loglevel'].upper())
print_config(config)
# todo: zertifikate sollten nur lesbar sein!
if not certs_readable(config):
logging.error('Cert check failed\nExit')
sys.exit(1)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.verify_mode = ssl.CERT_OPTIONAL
context.load_cert_chain(certfile=config['server']['cert'],
keyfile=config['server']['key'])
context.load_verify_locations(cafile=config['client']['cert'])
context.options = ssl.OP_CIPHER_SERVER_PREFERENCE
# ensure, compression is disabled (disabled by default anyway at the moment)
context.options |= ssl.OP_NO_COMPRESSION
logging.debug('SSL context created')
print_context(context)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
logging.debug('Socket created')
mySocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
keep = mySocket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
logging.debug('Socket keepalive: {}'.format(keep))
try:
mySocket.bind((config['server']['host'], int(config['server']['port'])))
mySocket.listen(5)
except Exception as e:
logging.error('Unable to bind and listen')
logging.error('{}'.format(e))
sys.exit(1)
logging.info('Listening on {} at Port {}'.format(config['server']['host'],
config['server']['port']))
while True:
try:
fromSocket, fromAddr = mySocket.accept()
logging.info('Client connected: {}:{}'.format(fromAddr[0], fromAddr[1]))
try:
fromSocket.settimeout(float(config['general']['timeout']))
logging.debug('Connection timeout set to {}'.format(
config['general']['timeout']))
except Exception:
logging.error('Cannot set timeout to {}'.format(
config['general']['timeout']))
try:
conn = context.wrap_socket(fromSocket, server_side=True)
conn.settimeout(float(config['general']['timeout']))
except socket.timeout:
logging.error('Socket timeout')
continue
except Exception as e:
logging.error('Connection failed: {}'.format(e))
continue
logging.info('Connection established')
try:
cert = conn.getpeercert(binary_form=False)
display_peercert(cert)
except ValueError:
logging.debug('SSL handshake has not been done yet')
except Exception as e:
logging.debug('Unexpected error: {}'.format(e))
raw_data = conn.recv(1)
if receive_buffer_is_valid(raw_data) is True:
status, timestamp = get_status_and_time(raw_data)
if change_status(status, timestamp, config['api']['api']) is True:
answer = raw_data
if config['mastodon']['send'].lower() == 'true':
logging.debug('Toot is set to true')
try:
toot_thread = Toot(status, timestamp, config)
toot_thread.run()
except InitException as e:
logging.error('InitException: {}'.format(e))
except Exception as ex:
logging.debug('Exception: {}'.format(ex))
else: logging.debug('Toot is set to false')
if conn:
logging.debug('Send {} back'.format(raw_data))
conn.send(answer)
sleep(0.1) # protection against dos
except KeyboardInterrupt:
logging.info('Keyboard interrupt received')
sys.exit(1)
except Exception as e:
logging.error('{}'.format(e))
continue
return 0
if __name__ == '__main__':
main()