forked from Krautspace/doorstatus
0c12ce86aa
make the part with reading argv resp. interactiv more clear and readable, remove unused parts from code and make the other pep8 conform.
113 lines
3.4 KiB
Python
Executable file
113 lines
3.4 KiB
Python
Executable file
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# file: setstatus.py
|
|
# date: 26.07.2019
|
|
# email: berhsi@web.de
|
|
|
|
# client, who connects to the statusserver at port 10001 to update the
|
|
# krautspace door status. If no status is given as argument, he reads from
|
|
# stdin until input is 0 or 1.
|
|
|
|
import socket
|
|
import ssl
|
|
from sys import exit, argv
|
|
|
|
|
|
def check_arguments(argv):
|
|
'''
|
|
Checks length and validity of command line argument vectors. If there is
|
|
no argument or argument is not valid, it returns None. Otherwise it
|
|
converts the string value into a byte value.
|
|
param 1: array of strings
|
|
return: None or byte value
|
|
'''
|
|
if len(argv) == 1:
|
|
byte_value = None
|
|
else:
|
|
if argv[1].strip() == '0' or argv[1].strip() == '1':
|
|
i = int(argv[1].strip())
|
|
print('Set value to {}'.format(i))
|
|
byte_value = bytes([i])
|
|
else:
|
|
byte_value = None
|
|
return byte_value
|
|
|
|
|
|
def read_argument():
|
|
'''
|
|
Reads from stdin until the given value is valid. Convert the given
|
|
string to a byte value and return this value.
|
|
return: byte value
|
|
'''
|
|
status = None
|
|
|
|
while status is None:
|
|
buf = input('Enter new status (0/1): ')
|
|
if buf == '0' or buf == '1':
|
|
status = bytes([int(buf)])
|
|
print('Read status: {}'.format(status))
|
|
return status
|
|
|
|
|
|
def main():
|
|
|
|
HOST = 'localhost'
|
|
PORT = 10001
|
|
SERVER_NAME = 'server.status.kraut.space'
|
|
CLIENT_CERT = './certs/client.crt'
|
|
CLIENT_KEY = './certs/client.key'
|
|
SERVER_CERT = './certs/server.crt'
|
|
STATUS = None
|
|
RESPONSE = None
|
|
|
|
STATUS = check_arguments(argv)
|
|
while STATUS is None:
|
|
STATUS = read_argument()
|
|
|
|
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH,
|
|
cafile=SERVER_CERT)
|
|
context.options &= ~ssl.PROTOCOL_TLS
|
|
context.verify_mode = ssl.CERT_OPTIONAL
|
|
# context.set_ciphers('HIGHT:!aNULL:!RC4:!DSS')
|
|
context.load_cert_chain(certfile=CLIENT_CERT, keyfile=CLIENT_KEY)
|
|
print('SSL context created')
|
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
|
|
print('Socket created')
|
|
try:
|
|
conn = context.wrap_socket(mySocket, server_side=False,
|
|
server_hostname=SERVER_NAME)
|
|
print('Connection wrapped with ssl.context')
|
|
conn.settimeout(5.0)
|
|
except Exception as e:
|
|
print('Context wrapper failed: [}'.format(e))
|
|
try:
|
|
conn.connect((HOST, PORT))
|
|
print('Connection established: {}'.format(conn.getpeercert()))
|
|
except socket.timeout:
|
|
print('Connection timeout')
|
|
except Exception as e:
|
|
print('Connection failed: {}'.format(e))
|
|
exit(1)
|
|
try:
|
|
print('Send new status: {}'.format(STATUS))
|
|
conn.send(STATUS)
|
|
except Exception as e:
|
|
print('Error: {}'.format(e))
|
|
exit(2)
|
|
try:
|
|
RESPONSE = conn.recv(1)
|
|
print('Server returns: {}'.format(RESPONSE))
|
|
if RESPONSE == STATUS:
|
|
print('Status sucessfull updated')
|
|
else:
|
|
print('Failed to update status')
|
|
print('Disconnect from server')
|
|
except Exception as e:
|
|
print('Error: {}'.format(e))
|
|
exit(3)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|