added the actual batch distance script

This commit is contained in:
Amy Bowersox 2020-08-18 09:41:48 -06:00
parent 5aafac2025
commit 43b85d5214

99
src/batch_distance.py Executable file
View File

@ -0,0 +1,99 @@
# Batch_Distance.py: Batch process a bunch of distance measurements.
import sys
import argparse
import csv
import json
import math
import configparser
import urllib.parse
import urllib.request
class GeocodingError(RuntimeError):
pass
def geocode_google(config, address):
apikey = config['google']['apikey']
if not apikey:
raise GeocodingError("Google API key not specified")
query = { 'key': apikey, 'address': address, 'region': 'us'}
url = 'https://maps.googleapis.com/maps/api/geocode/json?' + \
urllib.parse.urlencode(query, quote_via=urllib.parse.quote)
with urllib.request.urlopen(url) as response:
if response.status == 200:
apireturn = json.loads(response.read())
stat = apireturn['status']
if stat == 'OK':
results = apireturn['results']
if len(results) > 1:
raise GeocodingError(f"Google API returned ambiguous results (total count {len(results)})")
coords = results[0]['geometry']['location']
return coords['lat'], coords['lng']
elif stat == 'ZERO_RESULTS':
return None
else:
raise GeocodingError(f"Google API returns status of {stat}")
else:
raise GeocodingError(f"Google API returns {response.status} HTTP status code")
OFFICE_LOCATION = (40.0187905, -105.2764775) # office location - 1433 Pearl Street, Boulder, CO
RADIUS = 6371.0 * 1000.0 # Earth radius in meters
METERS_PER_MILE = 1852.0 # number of meters per mile
def distance_miles(point1, point2):
Φ1 = math.radians(point1[0])
Φ2 = math.radians(point2[0])
ΔΦ = math.radians(point2[0] - point1[0])
Δλ = math.radians(point2[1] - point1[1])
a = math.sin(ΔΦ / 2.0) * math.sin(ΔΦ / 2.0) + math.cos(Φ1) * math.cos(Φ2) * math.sin(Δλ / 2.0) * math.sin(Δλ / 2.0)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
return RADIUS * c / METERS_PER_MILE
THRESHOLD_RADIUS = 15.0 # number of miles in "acceptable" threshold
ERROR_BAR_WIDTH = 0.5 # number of miles in "error bar" surrounding threshold
def classify_distance(dist):
if dist > (THRESHOLD_RADIUS + ERROR_BAR_WIDTH / 2):
return 'RED'
if dist >= (THRESHOLD_RADIUS - ERROR_BAR_WIDTH / 2):
return 'YELLOW'
return 'GREEN'
cmdline_parser = argparse.ArgumentParser()
cmdline_parser.add_argument('input', help='The input file containing locations to be checked.')
cmdline_parser.add_argument('output', nargs='?', help='The output file name that will receive the processed data.')
cmdline_parser.add_argument('-C', '--config', default='geoapi.ini', help='The geocoding API configuration file')
def main(args):
opts = cmdline_parser.parse_args(args)
config = configparser.ConfigParser()
config.read(opts.config)
output_name = opts.output
if not output_name:
output_name = opts.input + '.out'
with open(opts.input, newline='') as input_file:
dialect = csv.Sniffer().sniff(input_file.read(1024))
input_file.seek(0)
reader = csv.reader(input_file, dialect)
with open(output_name, mode='w') as output_file:
writer = csv.writer(output_file, dialect)
for in_row in reader:
coords = geocode_google(config, in_row[1])
if coords:
dist = distance_miles(OFFICE_LOCATION, coords)
out_row = [in_row[0], in_row[1], coords[0], coords[1], dist, classify_distance(dist)]
else:
out_row = [in_row[0], in_row[1], '*** Unable to locate']
writer.writerow(out_row)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))