import heapq
from collections import defaultdict, deque
from typing import Dict, List, Tuple, Optional, Set
class DijkstraAdvanced:
"""Advanced Dijkstra implementation with multiple optimizations."""
def __init__(self, graph: Dict[str, List[Tuple[str, int]]]):
"""
Initialize with adjacency list representation.
graph: {node: [(neighbor, weight), ...]}
"""
self.graph = graph
self.nodes = set(graph.keys())
for node in graph:
for neighbor, _ in graph[node]:
self.nodes.add(neighbor)
def shortest_path(self, start: str, end: str) -> Tuple[int, List[str]]:
"""
Find shortest path between start and end nodes.
Returns: (distance, path)
Time: O((V + E) log V), Space: O(V)
"""
if start not in self.nodes or end not in self.nodes:
raise ValueError(f"Start '{start}' or end '{end}' not in graph")
distances = {node: float('infinity') for node in self.nodes}
distances[start] = 0
previous = {node: None for node in self.nodes}
# Priority queue: (distance, node)
pq = [(0, start)]
visited = set()
while pq:
current_distance, current = heapq.heappop(pq)
if current in visited:
continue
if current == end:
break
visited.add(current)
# Early termination optimization
if current_distance > distances[current]:
continue
for neighbor, weight in self.graph.get(current, []):
if neighbor in visited:
continue
distance = current_distance + weight
if distance < distances[neighbor]:
distances[neighbor] = distance
previous[neighbor] = current
heapq.heappush(pq, (distance, neighbor))
# Reconstruct path
path = self._reconstruct_path(previous, start, end)
return distances[end], path
def shortest_paths_single_source(self, start: str) -> Dict[str, Tuple[int, List[str]]]:
"""
Find shortest paths from start to all other nodes.
Returns: {node: (distance, path)}
"""
distances = {node: float('infinity') for node in self.nodes}
distances[start] = 0
previous = {node: None for node in self.nodes}
pq = [(0, start)]
visited = set()
while pq:
current_distance, current = heapq.heappop(pq)
if current in visited:
continue
visited.add(current)
for neighbor, weight in self.graph.get(current, []):
if neighbor in visited:
continue
distance = current_distance + weight
if distance < distances[neighbor]:
distances[neighbor] = distance
previous[neighbor] = current
heapq.heappush(pq, (distance, neighbor))
# Build result with paths
result = {}
for node in self.nodes:
if distances[node] != float('infinity'):
path = self._reconstruct_path(previous, start, node)
result[node] = (distances[node], path)
return result
def k_shortest_paths(self, start: str, end: str, k: int) -> List[Tuple[int, List[str]]]:
"""
Find k shortest paths using Yen's algorithm.
Returns: [(distance, path), ...]
"""
if k <= 0:
raise ValueError("k must be positive")
# Find shortest path
shortest_distance, shortest_path = self.shortest_path(start, end)
if shortest_distance == float('infinity'):
return []
A = [(shortest_distance, shortest_path)] # Determined shortest paths
B = [] # Potential shortest paths (priority queue)
for k_i in range(1, k):
# Find spur paths for each node in previous k-shortest path
previous_path = A[k_i - 1][1]
for i in range(len(previous_path) - 1):
spur_node = previous_path[i]
root_path = previous_path[:i + 1]
# Remove edges that are part of previous shortest paths
removed_edges = set()
for path_distance, path in A:
if len(path) > i and path[:i + 1] == root_path:
if i + 1 < len(path):
edge = (path[i], path[i + 1])
if edge not in removed_edges:
removed_edges.add(edge)
self._remove_edge(edge[0], edge[1])
# Calculate spur path
try:
spur_distance, spur_path = self.shortest_path(spur_node, end)
if spur_distance != float('infinity'):
total_path = root_path[:-1] + spur_path
total_distance = (sum(self._get_path_weight(root_path)) +
spur_distance)
heapq.heappush(B, (total_distance, total_path))
except ValueError:
pass # No path exists
# Restore removed edges
for edge in removed_edges:
self._restore_edge(edge[0], edge[1])
if not B:
break
# Add shortest path from B to A
next_shortest = heapq.heappop(B)
A.append(next_shortest)
return A
def _reconstruct_path(self, previous: Dict[str, str], start: str, end: str) -> List[str]:
"""Reconstruct path from previous pointers."""
if previous[end] is None and start != end:
return []
path = []
current = end
while current is not None:
path.append(current)
current = previous[current]
path.reverse()
return path
def _get_path_weight(self, path: List[str]) -> List[int]:
"""Get weights for edges in path."""
weights = []
for i in range(len(path) - 1):
for neighbor, weight in self.graph.get(path[i], []):
if neighbor == path[i + 1]:
weights.append(weight)
break
return weights
def _remove_edge(self, u: str, v: str):
"""Temporarily remove edge from graph."""
if u in self.graph:
self.graph[u] = [(n, w) for n, w in self.graph[u] if n != v]
def _restore_edge(self, u: str, v: str):
"""Restore edge to graph (simplified - assumes original weight)."""
# In practice, you'd need to store original weights
pass
class AStarPathfinding:
"""A* pathfinding algorithm for grid-based and weighted graphs."""
def __init__(self, grid: List[List[int]], heuristic_type: str = 'manhattan'):
"""
Initialize A* pathfinder.
grid: 2D grid where 0 = passable, 1 = obstacle
heuristic_type: 'manhattan', 'euclidean', or 'chebyshev'
"""
self.grid = grid
self.rows = len(grid)
self.cols = len(grid[0]) if grid else 0
self.heuristic_type = heuristic_type
# Movement costs (can be adjusted for different terrains)
self.movement_cost = 1
self.diagonal_cost = 1.414 # sqrt(2)
def find_path(self, start: Tuple[int, int], goal: Tuple[int, int],
allow_diagonal: bool = True) -> Tuple[List[Tuple[int, int]], float]:
"""
Find shortest path using A* algorithm.
Returns: (path, total_cost)
Time: O(b^d) where b is branching factor, d is depth
"""
if not self._is_valid(start) or not self._is_valid(goal):
raise ValueError("Start or goal position is invalid")
if self.grid[start[0]][start[1]] == 1 or self.grid[goal[0]][goal[1]] == 1:
raise ValueError("Start or goal position is blocked")
# Priority queue: (f_score, g_score, position, path)
open_set = [(0, 0, start, [start])]
closed_set = set()
g_scores = {start: 0}
while open_set:
f_score, g_score, current, path = heapq.heappop(open_set)
if current in closed_set:
continue
if current == goal:
return path, g_score
closed_set.add(current)
# Explore neighbors
for neighbor, move_cost in self._get_neighbors(current, allow_diagonal):
if neighbor in closed_set:
continue
tentative_g = g_score + move_cost
if neighbor not in g_scores or tentative_g < g_scores[neighbor]:
g_scores[neighbor] = tentative_g
h_score = self._heuristic(neighbor, goal)
f_score = tentative_g + h_score
new_path = path + [neighbor]
heapq.heappush(open_set, (f_score, tentative_g, neighbor, new_path))
return [], float('infinity') # No path found
def find_path_with_waypoints(self, start: Tuple[int, int],
waypoints: List[Tuple[int, int]],
goal: Tuple[int, int]) -> Tuple[List[Tuple[int, int]], float]:
"""Find path through multiple waypoints."""
full_path = []
total_cost = 0
current_pos = start
# Visit each waypoint in order
for waypoint in waypoints:
path_segment, cost = self.find_path(current_pos, waypoint)
if not path_segment:
return [], float('infinity')
# Avoid duplicating waypoints
if full_path:
path_segment = path_segment[1:]
full_path.extend(path_segment)
total_cost += cost
current_pos = waypoint
# Path from last waypoint to goal
final_segment, final_cost = self.find_path(current_pos, goal)
if not final_segment:
return [], float('infinity')
if full_path:
final_segment = final_segment[1:]
full_path.extend(final_segment)
total_cost += final_cost
return full_path, total_cost
def _is_valid(self, pos: Tuple[int, int]) -> bool:
"""Check if position is within grid bounds."""
row, col = pos
return 0 <= row < self.rows and 0 <= col < self.cols
def _get_neighbors(self, pos: Tuple[int, int], allow_diagonal: bool) -> List[Tuple[Tuple[int, int], float]]:
"""Get valid neighbors with movement costs."""
row, col = pos
neighbors = []
# 4-directional movement
directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # up, down, left, right
if allow_diagonal:
directions.extend([(-1, -1), (-1, 1), (1, -1), (1, 1)]) # diagonals
for dr, dc in directions:
new_row, new_col = row + dr, col + dc
new_pos = (new_row, new_col)
if (self._is_valid(new_pos) and self.grid[new_row][new_col] == 0):
# Use diagonal cost for diagonal moves
cost = self.diagonal_cost if abs(dr) + abs(dc) == 2 else self.movement_cost
neighbors.append((new_pos, cost))
return neighbors
def _heuristic(self, pos: Tuple[int, int], goal: Tuple[int, int]) -> float:
"""Calculate heuristic distance to goal."""
row1, col1 = pos
row2, col2 = goal
if self.heuristic_type == 'manhattan':
return abs(row1 - row2) + abs(col1 - col2)
elif self.heuristic_type == 'euclidean':
return ((row1 - row2) ** 2 + (col1 - col2) ** 2) ** 0.5
elif self.heuristic_type == 'chebyshev':
return max(abs(row1 - row2), abs(col1 - col2))
else:
return abs(row1 - row2) + abs(col1 - col2) # Default to Manhattan
# Business Applications Example
class RouteOptimizer:
"""Real-world route optimization for delivery systems."""
def __init__(self, city_graph: Dict[str, List[Tuple[str, int, Dict]]]):
"""
Initialize with city road network.
city_graph: {intersection: [(neighbor, distance, metadata), ...]}
metadata can include: traffic_factor, road_type, toll_cost, etc.
"""
self.graph = city_graph
self.dijkstra = DijkstraAdvanced(
{node: [(neighbor, distance) for neighbor, distance, _ in neighbors]
for node, neighbors in city_graph.items()}
)
def optimize_delivery_route(self, depot: str, delivery_locations: List[str],
vehicle_capacity: int = 100,
time_windows: Dict[str, Tuple[int, int]] = None) -> Dict:
"""
Optimize delivery route considering multiple constraints.
Returns comprehensive route analysis.
"""
if not delivery_locations:
return {"route": [depot], "total_distance": 0, "total_time": 0}
# Find shortest paths from depot to all locations
depot_paths = self.dijkstra.shortest_paths_single_source(depot)
# Simple greedy approximation for TSP (can be enhanced with more sophisticated algorithms)
route = [depot]
remaining_locations = set(delivery_locations)
current_location = depot
total_distance = 0
while remaining_locations:
# Find nearest unvisited location
nearest_location = min(remaining_locations,
key=lambda loc: self.dijkstra.shortest_path(current_location, loc)[0])
distance, path = self.dijkstra.shortest_path(current_location, nearest_location)
route.extend(path[1:]) # Avoid duplicating current location
total_distance += distance
remaining_locations.remove(nearest_location)
current_location = nearest_location
# Return to depot
return_distance, return_path = self.dijkstra.shortest_path(current_location, depot)
route.extend(return_path[1:])
total_distance += return_distance
return {
"route": route,
"total_distance": total_distance,
"total_time": self._estimate_travel_time(total_distance),
"delivery_count": len(delivery_locations)
}
def _estimate_travel_time(self, distance: int, average_speed: float = 50.0) -> float:
"""Estimate travel time based on distance and average speed."""
return distance / average_speed