Source code for realtwin.func_lib._b_load_inputs.loader_config
##############################################################################
# Copyright (c) 2024, Oak Ridge National Laboratory #
# All rights reserved. #
# #
# This file is part of RealTwin and is distributed under a GPL #
# license. For the licensing terms see the LICENSE file in the top-level #
# directory. #
# #
# Contributors: ORNL Real-Twin Team #
# Contact: realtwin@ornl.gov #
##############################################################################
import os
import yaml
import re
from pathlib import Path
from zipfile import ZipFile
import pyufunc as pf
from rich.console import Console
console = Console()
[docs]
def get_bounding_box_from(vertices: str | list) -> tuple:
"""get the bounding box from the vertices string
Args:
vertices (str): the vertices of the network in string format
"(lon, lat),(lon, lat),..."
Notes:
The vertices format can be found in configuration file
Returns:
tuple: the bounding box of the network: (min_lon, min_lat, max_lon, max_lat)
"""
if isinstance(vertices, str):
# Regular expression to extract the coordinate pairs
pattern = r"\((-?\d+\.\d+),\s*(-?\d+\.\d+)\)"
matches = re.findall(pattern, vertices)
lon_lst = [float(match[0]) for match in matches]
lat_lst = [float(match[1]) for match in matches]
elif isinstance(vertices, list):
# Check if the list contains tuples
if all(isinstance(item, list) and len(item) == 2 for item in vertices):
lon_lst = [float(item[0]) for item in vertices]
lat_lst = [float(item[1]) for item in vertices]
else:
raise ValueError("Invalid format: List must contain list of [lon, lat].")
else:
raise ValueError("Invalid format: vertices must be a string or list.")
return (min(lon_lst), min(lat_lst), max(lon_lst), max(lat_lst))
[docs]
def load_input_config(path_config: str) -> dict:
"""load input configuration from yaml file
Args:
path_config (str): the path of the configuration file in yaml format
Raises:
FileNotFoundError: if the file is not found
ValueError: if the file is not in yaml format
Returns:
dict: the dictionary of the configuration data
"""
# TDD check whether the file exists and is a yaml file
if not os.path.exists(path_config):
raise FileNotFoundError(f" :File not found: {path_config}")
if not (path_config.endswith('.yaml') or path_config.endswith('.yml')):
raise ValueError(f" :File is not in yaml format: {path_config}")
# read the yaml file and return the configuration dictionary
with open(path_config, 'r', encoding="utf-8") as yaml_data:
config = yaml.safe_load(yaml_data)
# check whether input_dir exists
if config.get('input_dir') is None:
# set input_dir to current working directory if not specified
config['input_dir'] = pf.path2linux(os.getcwd())
else:
# convert input_dir to linux format
config['input_dir'] = pf.path2linux(config['input_dir'])
# check whether demo mode is enabled
available_demo_data = ["chattanooga"]
if demo_data := config.get('demo_data'):
if not isinstance(demo_data, str):
config['demo_data'] = None
console.log(" :Demo data is not a string. Demo mode is disabled.")
elif demo_data.lower() in available_demo_data:
try:
# copy demo data to the input directory
demo_data_path = pf.path2linux(
Path(__file__).parent.parent.parent / "data_lib" / f"{demo_data.lower()}.zip")
with ZipFile(demo_data_path, 'r') as zip_ref:
extract_path = os.path.splitext(demo_data_path)[0]
os.makedirs(extract_path, exist_ok=True)
zip_ref.extractall(config['input_dir'])
console.log(f" :Demo data {demo_data} extracted to {config['input_dir']}.")
# update input directory to the extracted demo data
config["input_dir"] = pf.path2linux(Path(config['input_dir']) / f"{demo_data.lower()}")
config["Network"]["NetworkName"] = demo_data
# use dummy coordinates to make sure program works (in generate_inputs)
config["Network"]["NetworkVertices"] = [[-85.14977588011192, 35.040346288414916],
[-85.15823020212477, 35.04345144844759],
[-85.15829457513502, 35.043293338482925],
[-85.14986171079225, 35.04018378032611]]
except Exception as e:
console.log(f" :Demo data {demo_data} extraction failed for {e}. Demo mode is disabled.")
config['demo_data'] = None
else:
config['demo_data'] = None
console.log(f"Demo data {demo_data} currently not available. Available demo data: {available_demo_data}")
console.log("Demo mode is disabled.")
# check output_dir from input configuration file
if config.get('output_dir') is None:
# set output_dir to input_dir/output if not specified
config['output_dir'] = pf.path2linux(os.path.join(config['input_dir'], 'output'))
elif not os.path.exists(config['output_dir']):
config['output_dir'] = pf.path2linux(os.path.join(config['input_dir'], 'output'))
# check whether key sections exist in the configuration file
key_sections = ["Traffic", 'Network', 'Control']
for key in key_sections:
if key not in config:
console.log(f"[bold]{key} section is not found in the configuration file.")
# update network bbox if vertices are provided in the input configuration file
if vertices := config.get('Network', {}).get('NetworkVertices'):
bbox = config.get('Network', {}).get('Net_BBox')
# update the bounding box if it is not provided
if not bbox:
config['Network']['Net_BBox'] = get_bounding_box_from(vertices)
return config