-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlocation_share.py
159 lines (135 loc) · 5.72 KB
/
location_share.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
"""
This module provides classes for sharing and receiving location data over UDP.
Wire format is JSON. See the __main__ block for example usage.
There are also some command-line capabilities...
To send a test location to port 8869, then exit:
python3 location_share.py --send_test_ip=localhost --send_test_port=8869
To listen for shared locations on port 6666, and print them out:
python3 location_share.py --port=6666
"""
import argparse
import socket
import sys
import time
import json
import logging
class LocationShare:
"""
Object representing a single shared location observation.
lat: Latitude in decimal degrees
lon: Longitude in decimal degrees
alt_ft_msl: Altitude in feet MSL
timestamp: Unix timestamp in seconds UTC, when the location was recorded
department: string - organization name
unit_no: int - Unit/vehicle/target number, ideally unique per department
name: string - Human-readable name of the object being tracked, if available
"""
def __init__(self, lat: float, lon: float, alt_ft_msl: int, # pylint: disable=too-many-arguments
timestamp: int, department: str, unit_no: int,
name: str = None):
self.lat = lat
self.lon = lon
self.alt_ft_msl = alt_ft_msl
self.timestamp = timestamp
self.department = department
self.unit_no = unit_no
if name:
self.name = name
else:
self.name = f"{department}_{unit_no}"
@classmethod
def from_dict(cls, location_dict):
""" Create a LocationShare object from a dictionary."""
loc = None
try:
loc = cls(**location_dict)
except (KeyError, TypeError) as e:
logging.error(f"Error creating LocationShare object: {e}")
return loc
def to_dict(self):
"""Return a dictionary representation of the object."""
return vars(self)
def to_json(self):
"""Return a JSON string representation of the object."""
return json.dumps(self.to_dict())
class LocationSender:
"""Class that sends LocationShare objects to a specified IP and port."""
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def __del__(self):
self.sock.close()
def send_location(self, loc: LocationShare) -> int:
"""Send a LocationShare object, returns 0 on success."""
try:
location_json = loc.to_json()
location_bytes = location_json.encode()
except Exception as e: # pylint: disable=broad-except
logging.error(f"Error encoding location data: {e}")
return -1
try:
self.sock.sendto(location_bytes, (self.ip, self.port))
except Exception as e: # pylint: disable=broad-except
logging.error(f"Error sending location data: {e}")
return -1
return 0
class LocationReceiver:
def __init__(self, ip: str, port: int, ip_whitelist: list = None):
"""Class for receiving LocationShare objects.
ip: IP address to bind to
port: Port to bind to
ip_whitelist: List of IP addresses to accept data from.
If None, all IPs are accepted.
"""
self.ip = ip
self.port = port
self.ip_whitelist = ip_whitelist
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.ip, self.port))
def __del__(self):
self.sock.close()
def receive_location(self) -> LocationShare:
"""Blocking call, returns one location position, or None on failure."""
try:
RECV_LEN = 1024
location_bytes, address = self.sock.recvfrom(RECV_LEN)
if len(location_bytes) >= RECV_LEN:
logging.warning(f"Received data len >= {RECV_LEN} bytes")
if self.ip_whitelist and address[0] not in self.ip_whitelist:
logging.error("Received data from unauthorized address: " + address[0])
return None
location_json = location_bytes.decode()
location_dict = json.loads(location_json)
loc = LocationShare.from_dict(location_dict)
return loc
except Exception as e: # pylint: disable=broad-except
logging.error(f"Error receiving shared location: {e}")
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Location Share')
parser.add_argument('--port', type=int, default=8869,
help='The port to listen on.')
parser.add_argument('--send_test_ip', type=str, default=None,
help='Send a test location to the given ip, then exit')
parser.add_argument('--send_test_port', type=int, default=8869,
help='Port to send the test location to.')
args = parser.parse_args()
# Send a test location and exit
if args.send_test_ip:
sender = LocationSender(args.send_test_ip, args.send_test_port)
ts = int(time.time())
test_loc = LocationShare(40.8678983, -119.3353406, 4000, ts,
"AIRPORT_TEST", 2, "Airport Truck #1")
sender.send_location(test_loc)
print(f"Sent test location to {args.send_test_ip}, exiting: ",
f"{test_loc.to_json()}")
sys.exit(0)
# Listen for data and print it to stdout
print(f"Listening for shared locations on port {args.port}")
receiver = LocationReceiver("0.0.0.0", args.port, None)
while True:
received_loc = receiver.receive_location()
if received_loc is not None:
print("Received shared location: " + received_loc.to_json())