forked from Krautspace/doorstatus
setstatus.py: clear reading status argument and make it pep8 conform
make the part with reading argv resp. interactiv more clear and readable, remove unused parts from code and make the other pep8 conform.
This commit is contained in:
parent
b053403836
commit
0c12ce86aa
1 changed files with 14 additions and 22 deletions
32
setstatus.py
32
setstatus.py
|
@ -5,13 +5,13 @@
|
||||||
# date: 26.07.2019
|
# date: 26.07.2019
|
||||||
# email: berhsi@web.de
|
# email: berhsi@web.de
|
||||||
|
|
||||||
# client, who connects to the statusserver at port 10001 to update the krautspace
|
# client, who connects to the statusserver at port 10001 to update the
|
||||||
# door status. If no status is given as argument, he reads from stdin until
|
# krautspace door status. If no status is given as argument, he reads from
|
||||||
# input is 0 or 1.
|
# stdin until input is 0 or 1.
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
from sys import exit, argv, byteorder
|
from sys import exit, argv
|
||||||
|
|
||||||
|
|
||||||
def check_arguments(argv):
|
def check_arguments(argv):
|
||||||
|
@ -42,7 +42,7 @@ def read_argument():
|
||||||
'''
|
'''
|
||||||
status = None
|
status = None
|
||||||
|
|
||||||
while status == None:
|
while status is None:
|
||||||
buf = input('Enter new status (0/1): ')
|
buf = input('Enter new status (0/1): ')
|
||||||
if buf == '0' or buf == '1':
|
if buf == '0' or buf == '1':
|
||||||
status = bytes([int(buf)])
|
status = bytes([int(buf)])
|
||||||
|
@ -50,7 +50,7 @@ def read_argument():
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
|
||||||
def main(*status):
|
def main():
|
||||||
|
|
||||||
HOST = 'localhost'
|
HOST = 'localhost'
|
||||||
PORT = 10001
|
PORT = 10001
|
||||||
|
@ -58,34 +58,26 @@ def main(*status):
|
||||||
CLIENT_CERT = './certs/client.crt'
|
CLIENT_CERT = './certs/client.crt'
|
||||||
CLIENT_KEY = './certs/client.key'
|
CLIENT_KEY = './certs/client.key'
|
||||||
SERVER_CERT = './certs/server.crt'
|
SERVER_CERT = './certs/server.crt'
|
||||||
BOM = byteorder
|
|
||||||
STATUS = None
|
STATUS = None
|
||||||
RESPONSE = None
|
RESPONSE = None
|
||||||
|
|
||||||
if status:
|
|
||||||
STATUS = status[0]
|
|
||||||
STATUS = bytes([STATUS])
|
|
||||||
print('Status: {}'.format(STATUS))
|
|
||||||
else:
|
|
||||||
if len(argv) == 1:
|
|
||||||
STATUS = read_argument()
|
|
||||||
else:
|
|
||||||
STATUS = check_arguments(argv)
|
STATUS = check_arguments(argv)
|
||||||
if STATUS == None:
|
while STATUS is None:
|
||||||
STATUS = read_argument()
|
STATUS = read_argument()
|
||||||
|
|
||||||
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile = SERVER_CERT)
|
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH,
|
||||||
|
cafile=SERVER_CERT)
|
||||||
context.options &= ~ssl.PROTOCOL_TLS
|
context.options &= ~ssl.PROTOCOL_TLS
|
||||||
context.verify_mode = ssl.CERT_OPTIONAL
|
context.verify_mode = ssl.CERT_OPTIONAL
|
||||||
# context.set_ciphers('HIGHT:!aNULL:!RC4:!DSS')
|
# context.set_ciphers('HIGHT:!aNULL:!RC4:!DSS')
|
||||||
context.load_cert_chain(certfile = CLIENT_CERT, keyfile = CLIENT_KEY)
|
context.load_cert_chain(certfile=CLIENT_CERT, keyfile=CLIENT_KEY)
|
||||||
print('SSL context created')
|
print('SSL context created')
|
||||||
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as mySocket:
|
||||||
print('Socket created')
|
print('Socket created')
|
||||||
try:
|
try:
|
||||||
conn = context.wrap_socket(mySocket, server_side = False, \
|
conn = context.wrap_socket(mySocket, server_side=False,
|
||||||
server_hostname = SERVER_NAME)
|
server_hostname=SERVER_NAME)
|
||||||
print('Connection wrapped with ssl.context')
|
print('Connection wrapped with ssl.context')
|
||||||
conn.settimeout(5.0)
|
conn.settimeout(5.0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
Loading…
Reference in a new issue