doorstatus/scripts/test_udp_api.py

82 lines
2.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2020-12-28 22:01:32 +01:00
# Copyright (c):
#
# Philipp Matthias Schäfer (philipp.matthias.schaefer@posteo.de), 2020
#
# This file is part of the KrautStatus' Arduino code.
#
# The Clean CommonMark library is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The Clean CommonMark library is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along
# with the Clean CommonMark library. If not, see
# <https://www.gnu.org/licenses/>.
"""
Test KrautStatus's UDP API for a given IP and (optionally) port.
"""
import argparse
import ipaddress
import socket
def port(string):
"Convert string to an unsigend integer that is a valid port number"
port = int(string)
if port < 0 or port > 65535:
raise ValueError()
return port
def main():
parser = argparse.ArgumentParser(__doc__)
parser.add_argument("ip",
metavar="IP",
type=ipaddress.ip_address,
help="IP address of the KrautStatus Arduino")
parser.add_argument("port",
nargs="?",
metavar="PORT",
default=12345,
type=port,
help="port that the KrautStatus Arduino listens to")
args = parser.parse_args()
receiver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
receiver.bind(('0.0.0.0', args.port))
print(f'Listening on 0.0.0.0:{args.port}.')
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# We do not use \0 or \1 here, so that we do not trigger ourselves when
# testing the script locally (127.0.0.1)
sender.sendto(b'\2', (str(args.ip), args.port))
print(f'Sent null byte to {args.ip}:{args.port}.')
while True:
status, address = receiver.recvfrom(1)
if not address[0] == str(args.ip):
continue
if status[0] == 0:
print('Responded: door closed')
return
if status[0] == 1:
print('Resondend: door open')
return
if __name__ == "__main__":
main()