doorstatus/setstatus.py
berhsi 0c12ce86aa setstatus.py: clear reading status argument and make it pep8 conform
make the part with reading argv resp. interactiv more clear and readable,
remove unused parts from code and make the other pep8 conform.
2019-09-19 09:44:52 +02:00

113 lines
3.4 KiB
Python
Executable file

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# file: setstatus.py
# date: 26.07.2019
# email: berhsi@web.de
# client, who connects to the statusserver at port 10001 to update the
# krautspace door status. If no status is given as argument, he reads from
# stdin until input is 0 or 1.
import socket
import ssl
from sys import exit, argv
def check_arguments(argv):
'''
Checks length and validity of command line argument vectors. If there is
no argument or argument is not valid, it returns None. Otherwise it
converts the string value into a byte value.
param 1: array of strings
return: None or byte value
'''
if len(argv) == 1:
byte_value = None
else:
if argv[1].strip() == '0' or argv[1].strip() == '1':
i = int(argv[1].strip())
print('Set value to {}'.format(i))
byte_value = bytes([i])
else:
byte_value = None
return byte_value
def read_argument():
'''
Reads from stdin until the given value is valid. Convert the given
string to a byte value and return this value.
return: byte value
'''
status = None
while status is None:
buf = input('Enter new status (0/1): ')
if buf == '0' or buf == '1':
status = bytes([int(buf)])
print('Read status: {}'.format(status))
return status
def main():
HOST = 'localhost'
PORT = 10001
SERVER_NAME = 'server.status.kraut.space'
CLIENT_CERT = './certs/client.crt'
CLIENT_KEY = './certs/client.key'
SERVER_CERT = './certs/server.crt'
STATUS = None
RESPONSE = None
STATUS = check_arguments(argv)
while STATUS is None:
STATUS = read_argument()
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH,
cafile=SERVER_CERT)
context.options &= ~ssl.PROTOCOL_TLS
context.verify_mode = ssl.CERT_OPTIONAL
# context.set_ciphers('HIGHT:!aNULL:!RC4:!DSS')
context.load_cert_chain(certfile=CLIENT_CERT, keyfile=CLIENT_KEY)
print('SSL context created')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
print('Socket created')
try:
conn = context.wrap_socket(mySocket, server_side=False,
server_hostname=SERVER_NAME)
print('Connection wrapped with ssl.context')
conn.settimeout(5.0)
except Exception as e:
print('Context wrapper failed: [}'.format(e))
try:
conn.connect((HOST, PORT))
print('Connection established: {}'.format(conn.getpeercert()))
except socket.timeout:
print('Connection timeout')
except Exception as e:
print('Connection failed: {}'.format(e))
exit(1)
try:
print('Send new status: {}'.format(STATUS))
conn.send(STATUS)
except Exception as e:
print('Error: {}'.format(e))
exit(2)
try:
RESPONSE = conn.recv(1)
print('Server returns: {}'.format(RESPONSE))
if RESPONSE == STATUS:
print('Status sucessfull updated')
else:
print('Failed to update status')
print('Disconnect from server')
except Exception as e:
print('Error: {}'.format(e))
exit(3)
if __name__ == '__main__':
main()