Source code for realtwin._realtwin

##############################################################################
# Copyright (c) 2024, Oak Ridge National Laboratory                          #
# All rights reserved.                                                       #
#                                                                            #
# This file is part of RealTwin and is distributed under a GPL               #
# license. For the licensing terms see the LICENSE file in the top-level     #
# directory.                                                                 #
#                                                                            #
# Contributors: ORNL Real-Twin Team                                          #
# Contact: realtwin@ornl.gov                                                 #
##############################################################################

"""The real-twin developed by ORNL Applied Research and Mobility System (ARMS) group"""
import os
import shutil
from pathlib import Path
import sys
import time
import pyufunc as pf
from rich.console import Console
from rich import print as rprint
# environment setup
from realtwin.util_lib.create_venv import venv_create, venv_delete
from realtwin.func_lib._a_install_simulator.inst_sumo import install_sumo

# input data loading
from realtwin.func_lib._b_load_inputs.loader_config import load_input_config

# scenario generation
# from realtwin.util_lib.download_elevation_tif import download_elevation_tif_by_bbox
from realtwin.util_lib.check_abstract_scenario_inputs import check_abstract_inputs
from realtwin.func_lib._c_abstract_scenario._abstractScenario import AbstractScenario
from realtwin.func_lib._c_abstract_scenario.rt_matchup_table_generation import generate_matchup_table
from realtwin.func_lib._c_abstract_scenario.rt_matchup_table_generation import format_junction_bearing
from realtwin.func_lib._c_abstract_scenario.rt_demand_generation import generate_turn_demand, update_matchup_table

from realtwin.func_lib._d_concrete_scenario._concreteScenario import ConcreteScenario

# simulation
from realtwin.func_lib._e_simulation._generate_simulation import SimPrep

# calibration
from realtwin.func_lib._f_calibration.calibration_sumo import cali_sumo
from realtwin.data_lib.data_lib_config import sel_behavior_routes as sel_behavior_routes_demo

console = Console()
# info: dim cyan, warning: magenta, danger: bold red


[docs] class RealTwin: """The real-twin developed by ORNL Applied Research and Mobility System (ARMS) group that enables the simulation of twin-structured cities. """
[docs] def __init__(self, input_config_file: str = "", **kwargs): """Initialize the REALTWIN object. Args: input_config_file (str): The directory containing the input files. kwargs: Additional keyword arguments. Will be used in the future. """ # initialize the input directory if not input_config_file: raise Exception( "\n :Input configuration file is not provided." "\n :RealTwin requires a configuration file to be provided.") self.input_config = load_input_config(input_config_file) # add venv_create and delete as object methods self.venv_create = venv_create self.venv_delete = venv_delete self._venv_name = "venv_rt" self._proj_dir = os.getcwd() # get current working directory # extract data from kwargs self.verbose = kwargs["verbose"] if "verbose" in kwargs else False
def env_setup(self, *, sel_sim: list = None, sel_dir: list = None, strict_sumo_version: str = None, strict_vissim_version: str = None, strict_aimsun_version: str = None, **kwargs) -> bool: """Check and set up the environment for the simulation Args: sel_sim (list): select simulator to be set up. Default is None. Currently available options are ["SUMO", "VISSIM", "AIMSUN"]. sel_dir (list): A list of directories to search for the executables. Defaults to None. strict_sumo_version (str): Whether to strictly check the version is installed. if specified, will check and install the version. Default is None. strict_vissim_version (str): Whether to strictly check the version is installed. if specified, will check and install the version. Default is False. strict_aimsun_version (str): Whether to strictly check the version is installed. if specified, will check and install the version. Default is False. kwargs: Additional keyword arguments. Examples: >>> import realtwin as rt >>> twin = rt.REALTWIN(input_config_file="config.yaml", verbose=True) check simulator is installed or not, default to SUMO, optional: VISSIM, AIMSUN >>> twin.env_setup(sel_sim=["SUMO"]) add additional directories to search for the executables >>> additional_dir = [r"path-to-your-local-installed-sumo-bin"] >>> twin.env_setup(sel_sim=["SUMO"], sel_dir=additional_dir) strict version check: will install the required version if not found >>> twin.env_setup(sel_sim=["SUMO"], sumo_version="1.21.0", strict_sumo_version=True) or with additional directories >>> twin.env_setup(sel_sim=["SUMO"], sel_dir=additional_dir, >>> sumo_version="1.21.0", strict_sumo_version=True) Returns: bool: True if the environment is set up successfully, False otherwise. """ # 0. Check if the sim_env is selected, # default to SUMO, case insensitive and add self.sel_sim as a class attribute sel_sim = ["sumo"] if not sel_sim else [sim.lower() for sim in sel_sim] # 1. Check simulator installation - mapping function simulator_installation = { "sumo": install_sumo, "vissim": None, "aimsun": None, } # 2. check if the simulator is installed, if not, install it console.print("\n[bold green]Check / Install the selected simulators:") kwargs['sel_dir'] = sel_dir kwargs['strict_sumo_version'] = strict_sumo_version kwargs['strict_vissim_version'] = strict_vissim_version kwargs['strict_aimsun_version'] = strict_aimsun_version kwargs['verbose'] = self.verbose invalid_sim = [] for simulator in sel_sim: try: sim_status = simulator_installation.get(simulator)(**kwargs) if not sim_status: invalid_sim.append(simulator) except Exception: invalid_sim.append(simulator) rprint(f" :[bold magenta]Could not install {simulator} (strict version) on your operation system", end="") sel_sim_ = list(set(sel_sim) - set(invalid_sim)) if not sel_sim_: raise Exception(" :Error: No simulator is available (strict version). Please select available version(s).") self.sel_sim = sel_sim_ return True def generate_inputs(self, *, incl_sumo_net: str = None): """ Generate user inputs, such as MatchUp table, Control and Traffic data Args: incl_sumo_net (str): The path to the updated SUMO network file (.net.xml) provided by the user. If provided, the OpenDrive network will be generated based on this SUMO network. If not provided, the OpenDrive network will be generated based on the vertices from the config file. See Also: - How to create configuration file - How to create/update MatchUp table - How to create/prepare Control and Traffic data - How to download elevation tif data from network BBOX """ with console.status("[bold cyan]Generating inputs...", spinner="dots"): console.print("\n[bold green]Check / Create input files and folders for user:") path_input = pf.path2linux(Path(self.input_config.get("input_dir"))) # check if Control folder exists in the input directory path_control = pf.path2linux(Path(path_input) / "Control") if not os.path.exists(path_control): os.makedirs(path_control) # check if the Control folder is empty elif not os.listdir(path_control): console.print(f"[dim cyan]Control folder is empty: {path_control}.") console.print(f" [dim cyan]:Control folder exists: {path_control}.[/dim cyan]\n" " :NOTICE: [bold red]Please include Synchro UTDF file (signal) inside Control folder\n") # check if Traffic folder exists in the input directory path_traffic = pf.path2linux(Path(path_input) / "Traffic") if not os.path.exists(path_traffic): os.makedirs(path_traffic) # check if the Traffic folder is empty elif not os.listdir(path_traffic): console.print(f" [magenta]:Traffic folder is empty: {path_traffic}.") console.print(f" [dim cyan]:Traffic folder exists: {path_traffic}.[/dim cyan]\n" " :NOTICE: [bold red]Please include turn movement file for each intersection " "inside Traffic folder and add the file names to the MatchupTable.xlsx " "(You will notice the generated MatchupTable.xlsx inside your input folder)." " For how to fill the MatchupTable.xlsx, please refer to official documentation\n", soft_wrap=True, no_wrap=False) # check if SUMO net file generated (in OpenDrive folder), if not, create the net. net_name = self.input_config["Network"]["NetworkName"] path_sumo_net = pf.path2linux(Path(self.input_config.get("output_dir")) / f"OpenDrive/{net_name}.net.xml") # generate abstract scenario if sumo net file does not exist self.abstract_scenario = AbstractScenario(self.input_config) if not os.path.exists(path_sumo_net): # Create original SUMO network from vertices from config file self.abstract_scenario.create_SUMO_network() # crate OpenDrive network from SUMO network, and then rewrite sumo network based on OpenDrive network self.abstract_scenario.create_OpenDrive_network() rprint(" :INFO: OpenDrive network is generated.\n", end="") # Update SUMO Network before generating OpenDrive network if demo_data := self.input_config["demo_data"]: # demo mode is enabled, use the updated SUMO network from demo data incl_sumo_net = pf.path2linux(Path(self.input_config["input_dir"]) / f"updated_net/{demo_data}.net.xml") if incl_sumo_net: # check if the file exists and end with .net.xml if incl_sumo_net.endswith(".net.xml") and os.path.exists(incl_sumo_net): self.input_config["incl_sumo_net"] = incl_sumo_net # Copy user updated net file to the OpenDrive folder incl_sumo_net = pf.path2linux(Path(incl_sumo_net)) # ensure it's absolute path if incl_sumo_net != path_sumo_net: shutil.copy(incl_sumo_net, path_sumo_net) console.print(f" [dim cyan]:INFO: SUMO network is copied to {path_sumo_net}.\n" f" [dim cyan]:Using updated SUMO network provide by user: {incl_sumo_net} " "to generate OpenDrive network.\n", soft_wrap=True, no_wrap=False) # create opendrive net from updated sumo net, and rewrite sumo net based on OpenDrive net # self.abstract_scenario.create_OpenDrive_network() rprint(" [dim cyan]:INFO: OpenDrive network is generated.", end="") else: console.print(" [magenta]:NOTE: incl_sumo_net is not exist or not with .net.xml extension.\n" " :Please provide a valid SUMO file with .net.xml extension or leave it empty.", soft_wrap=True, no_wrap=False) else: # let user know they can use their own SUMO network by using incl_sumo_net rprint(" [dim cyan]:INFO: You can use your own SUMO network by providing the path " "to the incl_sumo_net parameter. The path should be a .net.xml file. \n", end="") # create matchup table for user path_matchup = pf.path2linux(Path(self.input_config.get("input_dir")) / "MatchupTable.xlsx") # check if sumo net file exists if not os.path.exists(path_sumo_net): raise Exception(f" :Error: SUMO net file does not exist: {path_sumo_net}," "please check input configuration file and re-run the script." "For details please refer to the documentation: ") df_matchup_table = format_junction_bearing(path_sumo_net) generate_matchup_table(df_matchup_table, path_matchup) console.print(f" [dim cyan]:NOTE: Matchup table is generated and saved to {path_matchup}.[/dim cyan]\n" " :NOTICE: [bold red]Please update the Matchup table from input folder" " and then run generate_abstract_scenario()." " For details please refer to official documentation: \n", soft_wrap=True, no_wrap=False) console.rule("[bold green]Program stopped. Please prepare the Control and Traffic data and " "fill in the Matchup Table before proceeding.\n" "[bold red]Please run generate_abstract_scenario() " "after preparing the input data.\n") time.sleep(2) # wait for 2 seconds before exiting # sys.exit(0) # Stop the program to let user update the Matchup table # usr_input = False while not usr_input: usr_input = console.input(":warning: [bold magenta]Please update the generated Matchup table from " "input folder before pressing Enter or type 'y' / 'yes' to continue") if usr_input == "" or usr_input in ["y", "Y", "yes", "Yes"]: console.print(" [dim cyan]:INFO: User confirmed to continue (Matchup Table Updated).") usr_input = True def generate_abstract_scenario(self): """Generate the abstract scenario: create OpenDrive files """ # check abstract scenario inputs check_abstract_inputs(self.input_config.get("input_dir")) # 1. Generate the abstract scenario based on the input data # self.abstract_scenario = AbstractScenario(self.input_config) if not hasattr(self, 'abstract_scenario'): raise Exception(" :Error: Abstract Scenario is not generated yet. " "Please run generate_inputs() first.") # update traffic and signal path_matchup = pf.path2linux(Path(self.input_config.get("input_dir")) / "MatchupTable.xlsx") control_dir = pf.path2linux(Path(self.input_config.get("input_dir")) / "Control") traffic_dir = pf.path2linux(Path(self.input_config.get("input_dir")) / "Traffic") # Auto-fill matchup table, save to matchup table : MatchupTable_UserInput _ = update_matchup_table(path_matchup_table=path_matchup, control_dir=control_dir, traffic_dir=traffic_dir) # Tell user to manually check correctness of the Matchup Table console.input(":warning: [bold magenta]In the Matchup Table, please check if the turn movement in the " "demand and control data match with bearings in the network data. Enter any key to continue...") df_volume, df_vol_lookup = generate_turn_demand(path_matchup_table=path_matchup, control_dir=control_dir, traffic_dir=traffic_dir,) self.abstract_scenario.Traffic.VolumeLookupTable = df_vol_lookup self.abstract_scenario.update_AbstractScenario_from_input(df_volume=df_volume) console.print("\n[bold green]Abstract Scenario successfully generated.") def generate_concrete_scenario(self): """Generate the concrete scenario: generate unified scenario from abstract scenario """ # 1. Generate the concrete scenario based on abstract scenario # 2. Save the concrete scenario to the output directory if not hasattr(self, 'abstract_scenario'): console.print(" [magenta]:Warning: Abstract Scenario is not generated yet. " "Please run generate_abstract_scenario() first.") return self.concrete_scenario = ConcreteScenario() self.concrete_scenario.get_unified_scenario(self.abstract_scenario) console.print("\n[bold green]Concrete Scenario successfully generated.") def prepare_simulation(self, start_time: float = 3600 * 8, end_time: float = 3600 * 10, seed: list | int = 812, step_length: float = 0.1) -> bool: """Simulate the concrete scenario: generate simulation files for the selected simulator Args: start_time (float): The start time of the simulation. Default is 3600 * 8. end_time (float): The end time of the simulation. Default is 3600 * 10. seed (list or int): The seed for the simulation. Default is [101]. step_length (float): The simulation step size. Default is 0.1. Examples: import realtwin package >>> import realtwin as rt load the input configuration file >>> twin = rt.REALTWIN(input_config_file="config.yaml", verbose=True) check simulator is installed or not, default to SUMO >>> twin.env_setup(sel_sim=["SUMO"]) generate abstract scenario and concrete scenario >>> twin.generate_abstract_scenario() >>> twin.generate_concrete_scenario() prepare simulation with start time, end time, seed, and step size >>> twin.prepare_simulation(start_time=3600 * 8, end_time=3600 * 10, seed=[101], step_length=0.1) Returns: bool: True if the simulation is prepared successfully, False otherwise. """ # 1. prepare Simulate docs from the concrete scenario # 2. Save results to the output directory sim_prep = { "sumo": SimPrep().create_sumo_sim, "vissim": SimPrep().create_vissim_sim, "aimsun": SimPrep().create_aimsun_sim, } # TODO according sel_sim to run different simulators self.sim = SimPrep() for simulator in self.sel_sim: sim_prep.get(simulator)(self.concrete_scenario, start_time=start_time, end_time=end_time, seed=seed, step_length=step_length) console.print(f"\n[bold green]{simulator.upper()} simulation successfully Prepared.") return True def calibrate(self, *, sel_algo: dict = None, sel_behavior_routes: dict = None, update_turn_flow_algo: dict = None, update_behavior_algo: dict = None) -> bool: """Calibrate the turn and inflow, and behavioral parameters using the selected algorithms. Args: sel_algo (dict): The dictionary of algorithms to be used for calibration. Default is None, will use genetic algorithm. e.g. {"turn_inflow": "ga", "behavior": "ga"}. sel_behavior_routes (dict): The dictionary of behavior route parameters to be used for calibration. Default is None. time (in seconds) is ground truth travel time. e.g. sel_behavior_routes = {"route_1": {"time": 20, "edge_list": ["edge_id_1", "edge_d_2", ...]}, "route_2" {"time": 40, "edge_list":["edge_id_1", "edge_d_2", ...]} ...}. update_turn_flow_algo (dict): The dictionary of algorithms to be used for updating turn flow. Default is None, will use genetic algorithm. Please refer to input configuration file for keys for each algorithm. e.g. update_turn_flow_algo = {"ga_config": {}, "sa_config":{}, "ts_config":{}}. update_behavior_algo (dict): The dictionary of algorithms to be used for updating behavior. Default is None, will use genetic algorithm. Please refer to input configuration file for keys for each algorithm. e.g. update_behavior_algo = {"ga_config":{}, "sa_config":{}, "ts_config": {}}. """ # TDD print() if sel_algo is None: # default to genetic algorithm sel_algo = {"turn_inflow": "ga", "behavior": "ga"} console.print(f" [dim cyan]:sel_algo not specified, use default value: {sel_algo}") if not isinstance(sel_algo, dict): sel_algo = {"turn_inflow": "ga", "behavior": "ga"} console.print(" [bold red]:Error: parameter sel_algo must be a dict with" " keys of 'turn_inflow' and 'behavior', using" f" default values: {sel_algo}") # check if the selected algorithm is supported within the package # convert the algorithm to lower case sel_algo = {key: value.lower() for key, value in sel_algo.items()} if (algo := sel_algo["turn_inflow"]) not in ["ga", "sa", "ts"]: console.print(f" [dim cyan]:Selected algorithms are {sel_algo}") console.print(f" [dim cyan]:{algo} for turn and inflow calibration is not supported. " "Must be one of ['ga', 'sa', 'ts']") return False if (algo := sel_algo["behavior"]) not in ["ga", "sa", "ts"]: console.print(f" [dim cyan]:Selected algorithms are {sel_algo}") console.print(f" [div cyan]:{algo} for behavior calibration is not supported. " "Must be one of ['ga', 'sa', 'ts']") return False # parse user additional parameters for calibration user_kwargs = {} if sel_behavior_routes: # use user defined behavior route, if not provided, automatically select two routes from the network user_kwargs["sel_behavior_routes"] = sel_behavior_routes if self.input_config["demo_data"] and sel_behavior_routes_demo.get(self.input_config["demo_data"]): # use predefined behavior routes for demo data user_kwargs["sel_behavior_routes"] = sel_behavior_routes_demo.get(self.input_config["demo_data"]) if update_turn_flow_algo: user_kwargs["update_turn_flow_algo"] = update_turn_flow_algo if update_behavior_algo: user_kwargs["update_behavior_algo"] = update_behavior_algo # run calibration based on the selected algorithm if "sumo" in self.sel_sim: cali_sumo(sel_algo=sel_algo, input_config=self.input_config, verbose=self.verbose, **user_kwargs) if "vissim" in self.sel_sim: pass if "aimsun" in self.sel_sim: pass console.print("[bold green]Calibration successfully completed.\n") return True def post_process(self): """Post-process the simulation results. """ # 1. Post-process the simulation results # 2. Save the post-processed results to the output directory def visualize(self): """Visualize the simulation results. """
# 1. Visualize the simulation results # 2. Save the visualization results to the output directory