-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
349 lines (292 loc) · 14.8 KB
/
Copy pathmain.py
File metadata and controls
349 lines (292 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
"""
Main script of the TDP benchmarking system.
"""
import matplotlib
matplotlib.use('Agg') # For parallel plot generation without deadlocks and threading issues
import datetime
import os
import random
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable, Optional, Dict, List, Tuple
import numpy as np
from pathlib import Path
from tqdm import tqdm
from ACO_simple_cached import ant_colony_optimization_simple_cached
from ACO_timeAware_cached import ant_colony_optimization_time_aware_cached
from CDCS import custom_discrete_cuckoo_search
from DCS import discrete_cuckoo_search
from GA import genetic_algorithm
from random_tours import random_tours
from tdp import TravelingDeliverymanProblem
from visualizer import Visualizer
def print_headline(headline: str, level = 1, width: int = 61):
"""
Print a formatted headline.
"""
filler_symbol = "=" if level == 1 else ("*" if level == 2 else "-")
print("\n" + f"{filler_symbol}" * width)
print(headline.center(width))
print(f"{filler_symbol}" * width + "\n")
def print_finish(subheadline: str, level = 1, width: int = 61):
"""
Print a formatted finishing line.
"""
filler_symbol = "=" if level == 1 else ("*" if level == 2 else "-")
print("\n" + f"{filler_symbol}" * width)
print(subheadline.center(width))
print(f"{filler_symbol}" * width + "\n")
def summarize_dataset(dataset_path: str | Path) -> None:
"""
Summarize the TSP dataset.
:param dataset_path: Path to the dataset file
"""
problem = TravelingDeliverymanProblem(dataset_path, algorithm_name="Dataset Summary")
print(f"Loaded TSP problem from: {dataset_path}")
print(f"Number of nodes: {problem.dimension}")
problem.finalize_logging()
Path.unlink(Path(problem.logfile_path), missing_ok=True)
def execute_algorithm(dataset_path: str | Path, algorithm: Callable,
algorithm_name: Optional[str] = None,
log_dir = "results",
seed = 42,
track_tried_tours = False) -> tuple[TravelingDeliverymanProblem, np.ndarray, float]:
"""
Execute a TSP algorithm on the given problem instance.
:param dataset_path: Path to the dataset file
:param algorithm: TSP algorithm function to execute
:param algorithm_name: Optional name of the algorithm for logging, if None uses function name
:param log_dir: Directory to save the logs
:param seed: Seed for the algorithms
:param track_tried_tours: Weather to track tour that have been tried but haven't resulted in an improvement
:return: Tuple(solved TDP instance, best_tour, best_obj)
"""
algorithm_name = algorithm_name if algorithm_name is not None else algorithm.__name__
problem = TravelingDeliverymanProblem(dataset_path, algorithm_name, log_dir=log_dir)
print_headline(algorithm_name, 3, 61)
try:
best_tour, best_obj = algorithm(problem, seed=seed, track_tried_tours=track_tried_tours)
finally:
problem.finalize_logging()
print_finish(f'"{algorithm_name}" - Execution Complete!', 3, 61)
return problem, best_tour, best_obj
def show_results(solved_problems: list[TravelingDeliverymanProblem]):
"""
Display results summary for a list of solved TSP problems.
:param solved_problems: List of TravelingDeliverymanProblem instances
"""
for problem in solved_problems:
print_headline(problem.algorithm_name, 3, 61)
visualizer = Visualizer(problem)
visualizer.print_metrics_report()
print_finish(f'"{problem.algorithm_name}" - Results Display Complete!', 3, 61)
def process_visualizations(problems: list[TravelingDeliverymanProblem],
show_convergence: bool = True,
save_conv: bool = True,
show_solution: bool = True,
save_solution: bool = True):
"""
Show visualizations for a list of solved TSP problems.
Skips visualization for big size problems, since they take ages, feel free to activate if needed, but calculate them
afterward is recommended.
:param problems: List of TravelingDeliverymanProblem instances
:param show_convergence: Show convergence metrics
:param save_conv: Save convergence metrics
:param show_solution: Show solution metrics
:param save_solution: Save solution metrics
:return: None
"""
for problem in problems:
if problem.current_best_obj == -float('inf') or problem.current_best_obj is None:
print(
f"INFO: Algorithm {problem.algorithm_name} does not produced a feasible solution, skipping visualization!")
return
if problem.dimension > 200:
print(
f"INFO: {problem.name} excluded from geographical visualization, due to the high number of nodes ({problem.dimension})!")
return
visualizer = Visualizer(problem)
if show_convergence:
visualizer.show_convergence_plot()
if save_conv:
visualizer.save_convergence_plot()
if show_solution:
visualizer.show_solution_plot()
if save_solution:
visualizer.save_solution_plot()
def _worker_test_seed(seed: int,
batch_idx: int,
total_seeds: int,
main_log_dir: str,
datasets: List[Path],
algorithms: Dict[Callable, str]):
"""
Worker function executed for parallel execution in different core.
Encapsulates the logic for running benchmarks on a single seed.
"""
curr_log_dir = f"{main_log_dir}/{seed}"
Path(curr_log_dir).mkdir(parents=True, exist_ok=True)
try:
random.shuffle(datasets) # better ram/cpu utilization
for dataset in datasets:
print_headline(f'Testing Dataset {dataset} with seed "{seed}"', 2, 61)
summarize_dataset(dataset)
# Execute TDP Algorithms
print_headline("TSP Algorithm Execution", 2, 61)
problem_results = []
for algorithm, name in algorithms.items():
try:
resulting_tdp, _, _ = execute_algorithm(dataset, algorithm, algorithm_name=name,
log_dir=curr_log_dir, seed=seed)
if resulting_tdp.current_best_obj == -float('inf'):
print_headline(f"INFO: Algorithm {name} could not find feasible solution for {dataset}", 3, 61)
else:
# WARNING: Grows quickly for large problems, take care and may deactivate saving here.
problem_results.append(resulting_tdp)
except Exception as e:
print_headline(f"WARNING: Algorithm {name} failed at problem {dataset}", 2, 61)
print(str(e))
print("Result ignored, continuing")
print_finish("TSP Algorithm Execution Complete", 2, 61)
# Display results
print_headline("Results Summary", 2, 61)
show_results(problem_results)
print_finish("Results Summary - Complete", 2, 61)
# Show visualizations
print_headline("Visualization", 2, 61)
process_visualizations(problem_results, False, True, False, True)
print_finish("Visualization Complete", 3, 61)
print_finish(f'Dataset "{dataset}" Complete!', 2, 61)
print_finish(f"Execution complete! (Seed: {seed}, Batch {batch_idx + 1}/{total_seeds})", 2, 61)
return seed, True, None
except Exception as e:
return seed, False, e
def test(datasets: List[Path], algorithms: Dict[Callable, str], log_main_dir_name: str, master_seed: int = 42, num_runs: int = 30, max_worker = 50):
"""
Executes the given algorithms on all given datasets.
The test function automatically test for all given seeds in the list. The results will be saved in the following format:
log_main_dir_name_%Y%m%d-%H%M%S
|-> {generated_seed_1}
|-> {generated_seed_2
...
:param datasets: List of datasets e.g. [datasets/SolomonPotvinBengio_tdp/rc_206.1.tdp]
:param algorithms: {algorithm: '<name>'}
:log_main_dir_name: Log directory for the results, see above for structure
:param master_seed: The single seed used to generate all other seeds
:param num_runs: How many times to run the full benchmark (generating n seeds)
:return: None
"""
rng = random.Random(master_seed)
seeds = [rng.randint(0, 1000000) for _ in range(num_runs)]
print_headline(f"Master Seed: {master_seed} | Generating {num_runs} seeds: {seeds[:5]}...", 1, 61)
# Create main result dir
main_log_dir = f"{log_main_dir_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
Path(main_log_dir).mkdir(parents=True, exist_ok=True)
max_workers = min(max(1, os.cpu_count() - 1), max_worker)
print(f"Starting parallel execution with {max_workers} workers.")
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all seeds to the executor
futures = {
executor.submit(
_worker_test_seed,
seed, i, len(seeds), main_log_dir, datasets, algorithms
): seed
for i, seed in enumerate(seeds)
}
# Track progress with tqdm
for future in tqdm(as_completed(futures), total=len(seeds), desc="Processing Seeds", unit="seed"):
seed = futures[future]
try:
_, success, error = future.result()
if not success:
print(f"\nSeed {seed} failed with error: {error}")
except Exception as e:
print(f"\nCRITICAL: Worker for seed {seed} crashed: {e}")
print_finish(f"Execution complete! (All seeds done!)", 1, 61)
def main():
"""
Execute all known algorithms on all known problems. Try to catch error and monitor them.
"""
# All *.tdp files in datasets/custom and datasets/SolomonPotvinBengio_tdp
problem_definition_paths = (list(Path("datasets/custom").glob("*.tdp")) +
list(Path("datasets/SolomonPotvinBengio_tdp").glob("*.tdp")))
algorithms = {random_tours: "Random Tours (10k)",
discrete_cuckoo_search: "DCS (basic)",
custom_discrete_cuckoo_search: "DCS (custom)",
genetic_algorithm: "Genetic Algorithm",
ant_colony_optimization_simple_cached: "Ant Colony Optimization (simple)",
ant_colony_optimization_time_aware_cached: "Ant Colony Optimization (time)"}
test(problem_definition_paths, algorithms, log_main_dir_name="D:/results/results_10_seeds_cached", master_seed=42, num_runs=10)
def _worker_video_test_seed(seed: int,
main_log_dir: str,
problem_definition: Path,
algorithm: Tuple[Callable, str]):
"""
Quickly coded function to track all tried tours and generate videos in parallel for final presentation purposes.
Please refer to main for normal benchmarking, these functions are not properly tested and stable.
"""
print_headline(f'Testing Definition {problem_definition} with seed "{seed}"', 2, 61)
algorithm_func, algorithm_name = algorithm
resulting_tdp, _, _ = execute_algorithm(problem_definition,
algorithm_func,
algorithm_name=algorithm_name,
log_dir=main_log_dir,
seed=seed,
track_tried_tours = True)
if resulting_tdp.current_best_obj == -float('inf'):
print_headline(f"INFO: Algorithm {algorithm_name} could not find feasible solution for {problem_definition}", 3, 61)
else:
print_headline(f"INFO: Algorithm {algorithm_name} found feasible solution for {problem_definition}", 3, 61)
resulting_tdp.finalize_logging()
visualizer = Visualizer(resulting_tdp)
visualizer.generate_solution_video(f"{main_log_dir}/{algorithm_name}_solution.mp4",
fps=10,
frames_per_tried_tour=25,
frames_per_improvement=0,
frames_final_improvement=60,
resolution=(800, 720),
dpi=150,
max_tried_tours_per_improvement=1000,
codec='libx264')
print_headline(f"INFO: Video Generation finished for {algorithm_name} and {problem_definition}", 3, 61)
return algorithm, True, None
def tried_tour_demo():
"""
Quickly coded function to track all tried tours and generate videos in parallel for final presentation purposes.
Please refer to main for normal benchmarking, these functions are not properly tested and stable.
"""
seed = 116739
problem_definition = Path("datasets/custom/kochi_small.tdp")
dataset_name = problem_definition.stem
algorithms = {random_tours: "Random Tours (10k)",
discrete_cuckoo_search: "DCS (basic)",
ant_colony_optimization_simple_cached: "ACO (simple)"}
print_headline(f"Starting tried tour logging, Seed: {seed}", 1, 61)
# Create main result dir
main_log_dir = f"results/video_fp_25{str(dataset_name)}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
Path(main_log_dir).mkdir(parents=True, exist_ok=True)
max_workers = max(1, os.cpu_count() - 1)
print(f"Starting parallel execution with {max_workers} workers.")
# Easy safe, but slow loop. For some reason the future return an unexpected number of values to unpack, cant fix so far.
#for algorithm in algorithms.items():
# _worker_video_test_seed(seed, main_log_dir, problem_definition, algorithm)
# Parallel loop
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
_worker_video_test_seed,
seed, main_log_dir, problem_definition, algorithm
): algorithm
for algorithm in algorithms.items()
}
for future in tqdm(as_completed(futures), total=len(algorithms), desc="Processing Seeds", unit="seed"):
algorithm, _ = futures[future]
_, algorithm_name = algorithm
try:
_, success, error = future.result()
if not success:
print(f"\nAlgorithm {algorithm_name} failed with error: {error}")
except Exception as e:
print(f"\nCRITICAL: Worker for algorithm {algorithm_name} crashed: {e}")
print_finish(f"Execution complete! (All algorithms done!)", 1, 61)
if __name__ == "__main__":
main()