Source code for realtwin.func_lib._e_simulation._aimsun

##############################################################################
# Copyright (c) 2024, Oak Ridge National Laboratory                          #
# All rights reserved.                                                       #
#                                                                            #
# This file is part of RealTwin and is distributed under a GPL               #
# license. For the licensing terms see the LICENSE file in the top-level     #
# directory.                                                                 #
#                                                                            #
# Contributors: ORNL Real-Twin Team                                          #
# Contact: realtwin@ornl.gov                                                 #
##############################################################################

"""The module to prepare the Aimsun network and demand data for simulation."""

import os
from xml.etree import ElementTree as ET
import pandas as pd


[docs] class AimsunPrep: ''' constructor '''
[docs] def __init__(self): self.Network = {} # self.NetworkWithElevation = {} self.Demand = {} self.Signal = {}
def is_empty(self): """Check if the AimsunPrep object is empty.""" pass def importDemand(self, ConcreteScn, SimulationStartTime, SimulationEndTime): """Import demand data from the concrete scenario.""" NetworkName = ConcreteScn.Supply.NetworkName AimsunPath = os.path.join('MyNetwork', 'AIMSUN') if os.path.exists(AimsunPath): os.remove(AimsunPath) # Delete the file os.mkdir(AimsunPath) # Create the .flow.xml InflowDf = ConcreteScn.Demand.Inflow InflowDf = InflowDf[(InflowDf['IntervalStart'] >= SimulationStartTime) & (InflowDf['IntervalEnd'] <= SimulationEndTime)] # Read the OpenDRIVE (.xodr) file file_path = f'MyNetwork/OpenDrive/{NetworkName}.xodr' with open(file_path, 'r', encoding="utf-8") as f: xodr_content = f.read() # Parse the XML content tree = ET.ElementTree(ET.fromstring(xodr_content)) root = tree.getroot() # Initialize an empty list to store the results data_list = [] # Iterate through all the 'road' elements in the XML file for road in root.findall('road'): road_id = road.attrib['id'] link = road.find('link') # Initialize AimsunID as None aimsun_id = None # Extract predecessor and successor information if link is not None: predecessor = link.find('predecessor') successor = link.find('successor') # Both predecessor and successor are present if predecessor is not None and successor is not None: if predecessor.attrib['elementType'] != "road" and successor.attrib['elementType'] != "road": aimsun_id = f"{predecessor.attrib['elementId']}-{successor.attrib['elementId']}" # Only predecessor is present elif predecessor is not None: if predecessor.attrib['elementType'] != "road": aimsun_id = f"{predecessor.attrib['elementId']}-out" # Only successor is present elif successor is not None: if successor.attrib['elementType'] != "road": aimsun_id = f"in-{successor.attrib['elementId']}" # Neither predecessor nor successor is present else: aimsun_id = "in-out" # Append to list data_list.append({'OpenDriveFromID': road_id, 'AimsunID': aimsun_id}) # Create DataFrame from the list RoadLookup = pd.DataFrame(data_list) RoadLookup = RoadLookup.dropna(subset=['AimsunID']) # Step 1: Group the DataFrame by 'AimsunID' and identify groups with the same 'AimsunID' grouped = RoadLookup.groupby('AimsunID') # Initialize an empty dictionary to store the new AimsunID mappings new_aimsun_ids = {} # Step 2: For each group, locate the corresponding roads in the OpenDRIVE file for aimsun_id, group in grouped: # Skip groups with only one member if len(group) <= 1: continue road_x_values = [] # Step 3: Extract the x of the first <geometry> in <planView> for each road for _, row in group.iterrows(): road_id = row['OpenDriveFromID'] road_element = root.find(f"./road[@id='{road_id}']") if road_element is not None: plan_view = road_element.find('planView') if plan_view is not None: first_geometry = plan_view.find('geometry') if first_geometry is not None: x = float(first_geometry.attrib.get('x', 0)) road_x_values.append((road_id, x)) # Step 4: Sort the roads based on the x value road_x_values.sort(key=lambda x: x[1]) # Step 5: Rename the AimsunID based on the sequence and conditions for i, (road_id, _) in enumerate(road_x_values): if aimsun_id.startswith("in-") and not aimsun_id.endswith("-out"): new_aimsun_id = f"in{i + 1}-{aimsun_id.split('-')[1]}" elif aimsun_id.endswith("-out"): new_aimsun_id = f"{aimsun_id.split('-')[0]}-out{i + 1}" else: new_aimsun_id = aimsun_id # Keep as is new_aimsun_ids[road_id] = new_aimsun_id # Update the DataFrame with the new AimsunID values RoadLookup['NewAimsunID'] = RoadLookup['OpenDriveFromID'].map(new_aimsun_ids).fillna(RoadLookup['AimsunID']) # Add "1-" after the "-" in each NewAimsunID RoadLookup['NewAimsunID'] = RoadLookup['NewAimsunID'].apply(lambda x: x.replace('-', '-1-')) RoadLookup.drop('AimsunID', axis=1, inplace=True) # Rename the 'NewAimsunID' column to 'AimsunID' RoadLookup.rename(columns={'NewAimsunID': 'AimsunID'}, inplace=True) # Convert the 'OpenDriveFromID' columns to the same data type (string) in both DataFrames InflowDf = InflowDf.copy() InflowDf['OpenDriveFromID'] = InflowDf['OpenDriveFromID'].astype(str) # RoadLookup = RoadLookup.copy RoadLookup['OpenDriveFromID'] = RoadLookup['OpenDriveFromID'].astype(str) # Perform the merge operation again merged_df = pd.merge(InflowDf, RoadLookup, on='OpenDriveFromID', how='left').copy() # Keep only the specified columns in the merged DataFrame and rearrange the column order Inflow = merged_df[['IntervalStart', 'IntervalEnd', 'AimsunID', 'Count']] Inflow.loc[:, 'IntervalStart'] = Inflow['IntervalStart'].astype(int) Inflow.loc[:, 'IntervalEnd'] = Inflow['IntervalEnd'].astype(int) Inflow.to_csv('MyNetwork/Aimsun/inflow.txt', index=False, header=False, sep=',') # Display the merged DataFrame self.Demand = 'MyNetwork/Aimsun/inflow.txt'