-
Notifications
You must be signed in to change notification settings - Fork 5
Turn by turn directions #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 75 commits
163e0e7
61fa26b
48dbb3a
2439727
c1914f3
50450f3
5c10e2a
44ce3a9
09df30c
f11d8e3
7d82457
68e1475
0a4e223
b193295
f801616
7589e25
9c21ddc
dea9e05
03260d1
077ecd8
a10b02d
0222e6a
6fe92a1
4861325
424589d
a1d09b3
cb7b5f8
2a7b062
8741d33
cf37ad9
b9c693e
ef1958f
644e800
3690c32
bcc2099
cf26418
e4f5e90
d29b365
c566fd4
27b1e56
b904088
f4f3554
5c32195
6af1670
0be802d
269c89d
a1bdb50
8cf16d0
6c159a7
c178a2c
d39623e
f6f0bd4
73da066
83cd3ff
1faeb05
e07f8f5
ff5f151
bf4b835
f37ca4d
7653fbb
12b0688
d730b24
4bd0705
720d9ee
2bed813
3b6b69d
4f9365d
b90151a
5c7da73
40bb90d
ce5e7e0
979136e
0d518d7
a584440
e6fe0e4
b05977b
5613eff
d734b8e
3fd2b60
6ee9430
4464485
c41244d
a0df75f
98f9c7a
296faf3
87da3c0
328230a
dd8861c
70e8e10
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,245 @@ | ||
| from typing import TypedDict, Dict, List, Optional, Any | ||
| from mbm.types import Route, RouteProperties | ||
|
|
||
| class DirectionSegment(TypedDict): | ||
| gid: Optional[int] | ||
| osmData: Dict[str, Any] | ||
| maneuver: str | ||
| cardinal: str | ||
| distance: float | ||
| name: Optional[str] | ||
| effectiveName: str | ||
| featureIndex: int | ||
|
|
||
| class Direction(TypedDict): | ||
| directionSegments: List[DirectionSegment] | ||
| name: Optional[str] | ||
| effectiveName: str | ||
| distance: float | ||
| maneuver: str | ||
| heading: float | ||
| cardinal: str | ||
| type: Optional[str] | ||
| osmData: Dict[str, Any] | ||
| featureIndices: List[int] | ||
| directionText: str | ||
|
|
||
| def directions_list(features: Route) -> List[Direction]: | ||
| directions: List[Direction] = [] | ||
| previous_heading = None | ||
| previous_effective_name = None | ||
|
|
||
| for i, feature in enumerate(features): | ||
| props: RouteProperties = feature['properties'] | ||
| name = props.get('name') | ||
| heading: float = props.get('heading', 0.0) or 0.0 | ||
| distance: float = props.get('distance', 0) | ||
| route_type: Optional[str] = props.get('type') | ||
|
wrangtangle marked this conversation as resolved.
Outdated
|
||
| osm_tags: Optional[Dict[str, str]] = props.get('osm_tags') | ||
| park_name: Optional[str] = props.get('park_name') | ||
|
|
||
| osm_data = { | ||
| 'gid': props.get('gid'), | ||
| 'osm_id': props.get('osm_id'), | ||
| 'tag_id': props.get('tag_id'), | ||
| 'oneway': props.get('oneway'), | ||
| 'rule': props.get('rule'), | ||
| 'priority': props.get('priority'), | ||
| 'maxspeed_forward': props.get('maxspeed_forward'), | ||
| 'maxspeed_backward': props.get('maxspeed_backward'), | ||
| 'length_m': distance, | ||
| 'osm_tags': osm_tags, | ||
| 'park_name': park_name, | ||
| } | ||
|
|
||
| maneuver_info = _heading_to_english_maneuver(heading, previous_heading) | ||
| maneuver = maneuver_info['maneuver'] | ||
| cardinal = maneuver_info['cardinal'] | ||
|
|
||
| effective_name = name or _describe_unnamed_street(osm_tags, park_name) | ||
|
|
||
| direction_segment: DirectionSegment = { | ||
| 'gid': props.get('gid'), | ||
| 'osmData': osm_data, | ||
| 'maneuver': maneuver, | ||
| 'cardinal': cardinal, | ||
| 'distance': distance, | ||
| 'name': name, | ||
| 'effectiveName': effective_name, | ||
| 'featureIndex': i, | ||
| } | ||
|
|
||
| direction: Direction = { | ||
| 'directionSegments': [direction_segment], | ||
| 'name': name, | ||
| 'effectiveName': effective_name, | ||
| 'distance': distance, | ||
| 'maneuver': maneuver, | ||
| 'heading': heading, | ||
| 'cardinal': cardinal, | ||
| 'type': props.get('type'), | ||
| 'osmData': osm_data, | ||
| 'featureIndices': [i], | ||
| 'directionText': '', | ||
| } | ||
|
|
||
| if _should_merge_segments(maneuver, effective_name, previous_effective_name, name): | ||
| _merge_with_previous_direction(directions, direction_segment) | ||
| else: | ||
| directions.append(direction) | ||
|
|
||
| previous_heading = heading | ||
| previous_effective_name = effective_name | ||
|
|
||
| for index, direction in enumerate(directions): | ||
| street_name = direction.get('effectiveName') or direction.get('name') or 'an unknown street' | ||
| distance_text = _format_distance(direction['distance']) | ||
| if index == 0: | ||
| direction_text = f"Head {direction['cardinal']} on {street_name} for {distance_text}" | ||
| else: | ||
| direction_text = ( | ||
| f"{direction['maneuver']} onto {street_name} and head " | ||
| f"{direction['cardinal']} for {distance_text}" | ||
| ) | ||
|
|
||
| if index == len(directions) - 1: | ||
| direction_text += ' until you reach your destination' | ||
|
|
||
| direction['directionText'] = direction_text | ||
|
|
||
| return directions | ||
|
|
||
| def _nearest_45(x: float) -> int: | ||
| """Round an angle to the nearest 45-degree increment (0-360).""" | ||
| return (round(x / 45) * 45) % 360 | ||
|
|
||
| def _format_distance(meters: float) -> str: | ||
| meters_per_mile = 1609.344 | ||
| meters_per_foot = 0.3048 | ||
| miles = meters / meters_per_mile | ||
| def round_half_up(value: float) -> int: | ||
| return int(value + 0.5) | ||
|
|
||
| if miles < 0.09: | ||
| feet = round_half_up(meters / meters_per_foot) | ||
| unit = 'foot' if feet == 1 else 'feet' | ||
| return f"{feet} {unit}" | ||
|
|
||
| rounded_miles = round_half_up(miles * 10) / 10 | ||
| unit = 'mile' if rounded_miles == 1 else 'miles' | ||
| return f"{rounded_miles} {unit}" | ||
|
Comment on lines
+115
to
+129
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Question, non-blocking] This seems like a dupe of |
||
|
|
||
| # Relevant docs: | ||
| # https://wiki.openstreetmap.org/wiki/Key:highway | ||
| # https://wiki.openstreetmap.org/wiki/Tag:highway | ||
| def _describe_unnamed_street( | ||
| osm_tags: Optional[Dict[str, str]], | ||
| park_name: Optional[str] | ||
| ) -> str: | ||
| if not osm_tags or not isinstance(osm_tags, dict): | ||
| return 'an unknown street' | ||
|
|
||
| description = '' | ||
|
|
||
| if osm_tags.get('highway') == 'service' and osm_tags.get('service') == 'alley': | ||
| description = 'an alley' | ||
| elif osm_tags.get('highway') == 'service': | ||
| description = 'an access road' | ||
| elif osm_tags.get('footway') == 'crossing': | ||
| description = 'a crosswalk' | ||
| elif osm_tags.get('highway') == 'cycleway': | ||
| description = 'a bike path' | ||
| elif osm_tags.get('bicycle') == 'designated': | ||
| description = 'a bike path' | ||
| elif osm_tags.get('highway') == 'pedestrian': | ||
| description = 'a pedestrian path' | ||
| elif osm_tags.get('highway') == 'footway' or osm_tags.get('footway') == 'sidewalk': | ||
| description = 'a sidewalk' | ||
| elif osm_tags.get('highway') == 'pedestrian' and osm_tags.get('bicycle') == 'yes': | ||
| description = 'a mixed-use path' | ||
|
Comment on lines
+157
to
+158
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Nitpick, non-blocking] This conditional is a no-op since we already check |
||
| elif osm_tags.get('highway') == 'path' and osm_tags.get('bicycle') == 'permissive': | ||
| description = 'a mixed-use path' | ||
| elif park_name: | ||
| description = 'a path' | ||
| else: | ||
| description = 'an unknown street' | ||
|
|
||
| if park_name: | ||
| description += f' inside {park_name}' | ||
|
|
||
| return description | ||
|
|
||
|
|
||
| def _heading_to_english_maneuver( | ||
| heading: float, | ||
| previous_heading: Optional[float] | ||
| ) -> Dict[str, str]: | ||
| degrees = { | ||
| 0: {'maneuver': 'Continue', 'cardinal': 'north'}, | ||
| 45: {'maneuver': 'Turn slightly to the right', 'cardinal': 'northeast'}, | ||
| 90: {'maneuver': 'Turn right', 'cardinal': 'east'}, | ||
| 135: {'maneuver': 'Take a sharp right turn', 'cardinal': 'southeast'}, | ||
| 180: {'maneuver': 'Turn around', 'cardinal': 'south'}, | ||
| 225: {'maneuver': 'Take a sharp left turn', 'cardinal': 'southwest'}, | ||
| 270: {'maneuver': 'Turn left', 'cardinal': 'west'}, | ||
| 315: {'maneuver': 'Turn slightly to the left', 'cardinal': 'northwest'}, | ||
| } | ||
|
|
||
| if previous_heading is not None: | ||
| angle = _nearest_45(((heading - previous_heading) + 360) % 360) | ||
| maneuver = degrees.get(angle, {}).get('maneuver', 'Continue') | ||
| else: | ||
| maneuver = 'Continue' | ||
|
|
||
| cardinal = degrees.get(_nearest_45(heading), {}).get('cardinal', 'north') | ||
|
|
||
| return {'maneuver': maneuver, 'cardinal': cardinal} | ||
|
|
||
|
|
||
| def _should_merge_segments( | ||
| maneuver: str, | ||
| new_segment_effective_name: str, | ||
| previous_segment_effective_name: Optional[str], | ||
| name: Optional[str], | ||
| ) -> bool: | ||
| # Merge continues and slight turns when two segments have the same street name | ||
| is_slight_turn = (maneuver == 'Turn slightly to the left' or | ||
| maneuver == 'Turn slightly to the right' or maneuver == 'Continue') | ||
| is_named_street = bool(name) | ||
| same_named_street = new_segment_effective_name == previous_segment_effective_name | ||
| if is_slight_turn and is_named_street and same_named_street: | ||
| return True | ||
|
|
||
| # Merge continues on unnamed streets | ||
| if maneuver == 'Continue' and not is_named_street: | ||
| return True | ||
|
Comment on lines
+212
to
+214
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Question, non-blocking] This is always going to be true for the first segment whenever it is not a named street. Does that pose a problem for us? I notice the only similar test case has a named street as well ( |
||
|
|
||
| return False | ||
|
|
||
|
|
||
| def _merge_with_previous_direction( | ||
| directions: List[Direction], | ||
| direction_segment: DirectionSegment, | ||
| ) -> None: | ||
| if not directions: | ||
| return | ||
|
|
||
| previous_direction = directions[-1] | ||
| previous_direction['distance'] += direction_segment['distance'] | ||
| previous_direction['directionSegments'].append(direction_segment) | ||
| previous_direction['featureIndices'].append(direction_segment['featureIndex']) | ||
| name = direction_segment.get('name') | ||
| effective_name = direction_segment['effectiveName'] | ||
| osm_data = direction_segment['osmData'] | ||
|
|
||
| # Sometimes only some chicago_ways of a street are named, so check if the | ||
| # previous chicago_way is named and backfill the name if not | ||
| if name and not previous_direction['name']: | ||
| previous_direction['name'] = name | ||
| previous_direction['effectiveName'] = effective_name | ||
|
|
||
| # For unnamed streets, preserve OSM data from the current chicago_way if previous doesn't have a name | ||
| if not name and osm_data and not previous_direction['name']: | ||
| previous_direction['osmData'] = osm_data | ||
| previous_direction['effectiveName'] = effective_name | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| import json | ||
| from typing import List | ||
|
|
||
| from django.core.management.base import BaseCommand, CommandError | ||
| from django.db import connection | ||
|
|
||
| from mbm.directions import directions_list | ||
| from mbm.models import fetchall | ||
| from mbm.routing import calculate_route | ||
|
|
||
|
|
||
| def parse_coordinate_param(value: str) -> List[float]: | ||
| if not value: | ||
| raise CommandError("Coordinate value is required.") | ||
| parts = value.split(",") | ||
| if len(parts) != 2: | ||
| raise CommandError("Coordinate must be in 'lat,lng' format.") | ||
| try: | ||
| lat = float(parts[0]) | ||
| lng = float(parts[1]) | ||
| except ValueError as exc: | ||
| raise CommandError("Coordinate must be in 'lat,lng' format.") from exc | ||
| return [lat, lng] | ||
|
|
||
|
|
||
| def get_nearest_vertex_id(coord: List[float]) -> int: | ||
| with connection.cursor() as cursor: | ||
| cursor.execute( | ||
| """ | ||
| SELECT vert.id | ||
| FROM chicago_ways_vertices_pgr AS vert | ||
| ORDER BY vert.the_geom <-> ST_SetSRID( | ||
| ST_MakePoint(%s, %s), | ||
| 4326 | ||
| ) | ||
| LIMIT 1 | ||
| """, | ||
| [coord[1], coord[0]], # ST_MakePoint expects lng,lat | ||
| ) | ||
| rows = fetchall(cursor) | ||
| if rows: | ||
| return rows[0]["id"] | ||
| raise CommandError(f"No vertex found near point {coord[0]},{coord[1]}.") | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Generate directions from coordinate inputs." | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument( | ||
| "--sourceCoordinates", | ||
| required=True, | ||
| help="Source coordinate in 'lat,lng' format.", | ||
| ) | ||
| parser.add_argument( | ||
| "--targetCoordinates", | ||
| required=True, | ||
| help="Target coordinate in 'lat,lng' format.", | ||
| ) | ||
| parser.add_argument( | ||
| "--enable-v2", | ||
| action="store_true", | ||
| help="Enable the v2 routing costs.", | ||
| ) | ||
|
|
||
| def handle(self, *_args, **options): | ||
| source_coord = parse_coordinate_param(options["sourceCoordinates"]) | ||
| target_coord = parse_coordinate_param(options["targetCoordinates"]) | ||
| enable_v2 = options["enable_v2"] | ||
|
|
||
| source_vertex_id = get_nearest_vertex_id(source_coord) | ||
| target_vertex_id = get_nearest_vertex_id(target_coord) | ||
|
|
||
| features, _, _ = calculate_route( | ||
| source_vertex_id, | ||
| target_vertex_id, | ||
| enable_v2, | ||
| ) | ||
| directions = directions_list(features) | ||
|
|
||
| lines = [] | ||
| for direction in directions: | ||
| direction_text = direction.get("directionText", "") | ||
| lines.append(direction_text) | ||
|
|
||
| for segment in direction.get("directionSegments", []): | ||
| feature_index = segment.get("featureIndex") | ||
| gid = segment.get("gid") | ||
| name = segment.get("name") or segment.get("effectiveName") or "an unknown street" | ||
| distance = segment.get("distance", 0) | ||
| lines.append( | ||
| f" way {feature_index}: {name} (gid: {gid}; distance: {distance}m)" | ||
| ) | ||
|
|
||
| self.stdout.write("\n".join(lines)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Nitpick, optional] I think we may need to add import instructions for loading parks, right?