import getpass
import glob
import os
import re
import shutil
import subprocess
import traceback
import warnings
from datetime import datetime as dt
from pathlib import Path
from agi_node.polars_worker import PolarsWorker
warnings.filterwarnings('ignore')
import polars as pl
import math
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from geopy.distance import distance as Geodistance
from geopy.point import Point
import json
import logging
logger = logging.getLogger(__name__)
import plotly.graph_objects as go
import plotly.express as px
[docs]
class plane_trajectory:
df_col_names = [ # Log columns
"plane_id",
"time_s",
"speed_ms",
"alt_m",
"roll_deg",
"pitch_deg",
"yaw_deg",
"bearing_deg",
"latitude",
"longitude",
"distance",
"phase",
"plane_type"
]
[docs]
def __init__(self,
flight_id: int = 1,
waypoints: list = [(0, 0, 0), (1, 0, 10000), (2, 1, 9000), (3, 3, 5000), (-3, -3, 5000)],
yaw_angular_speed: float = 1.0,
roll_angular_speed: float = 3.0,
pitch_angular_speed: float = 2.0,
vehicle_acceleration: float = 5.0,
max_speed: float = 900.0,
max_roll: float = 30.0,
max_pitch: float = 12.0,
target_climbup_pitch: float = 8.0,
pitch_enable_speed_ratio: float = 0.30,
altitude_loss_speed_treshold: float = 400.0,
landing_speed_target: float = 200.0,
descent_pitch_target: float = -3,
landing_pitch_target: float = 3,
cruising_pitch_max: float = 3,
descent_alt_treshold_landing: float = 500,
max_speed_ratio_while_turining: float = 0.3,
enable_climb: bool = True,
enable_descent: bool = True,
default_alt_value: float = 4000.0,
plane_type: str = "classique_plane"
):
self.speed = 0
self.alt = 0
self.roll = 0
self.pitch = 0
self.yaw = 0
self.bearing = 0
self.distance = 0
self.current_waypoint_index = 1
self.time = 0
self.enable_climb = enable_climb # enable the scenario where the plane climb up
self.enable_descent = enable_descent # enable the scenario where the plane descent
self.flight_id = flight_id # the ID of the flight
self.waypoints = waypoints # list of points to pass
if len(self.waypoints) < 2:
raise ValueError(f"Expected at least 2 waypoints, but got {len(self.waypoints)}")
else:
for i in range(len(self.waypoints)):
if len(self.waypoints[i]) < 2:
raise ValueError(
f"Expected at least 2 coords (lat,lon) but got {len(self.waypoints[i])} args at index {i}")
if len(self.waypoints[i]) < 3:
self.waypoints[i] = (self.waypoints[i][1], self.waypoints[i][0], default_alt_value)
else:
self.waypoints[i] = (self.waypoints[i][1], self.waypoints[i][0], self.waypoints[i][2])
self.coords = list(self.waypoints[0][:2])
self.yaw_angular_speed = yaw_angular_speed # in deg but not used.
self.roll_angular_speed = roll_angular_speed # in deg
self.pitch_angular_speed = pitch_angular_speed # in deg
self.vehicle_acceleration = vehicle_acceleration # in m/s
self.max_speed_kmh = max_speed # in Km/h
self.target_speed_m_s = max_speed * 1000 / 3600 # in m/s
self.target_altitude_m = self.waypoints[1][2] # in meter
self.max_roll = max_roll # in deg
self.max_pitch = max_pitch # in deg
self.cruising_pitch_max = cruising_pitch_max # cruising pitch in deg
self.plane_type = plane_type # type of vehicle
# --- Pitch Enable Speed ---
self.pitch_enable_speed_ratio = pitch_enable_speed_ratio # at what speed (ratio from max speed) does the plane began to climb
self.pitch_enable_speed_m_s = self.pitch_enable_speed_ratio * self.target_speed_m_s # actual speed that the plane is allow to pitch up
# --- detect waypoints ---
self.waypoint_arrival_threshold_m = self.target_speed_m_s / 2 # threshold for the passage of the waypoints
# --- landing charact ---
self.vehicle_deceleration = vehicle_acceleration * 1.2 # not editable but give the plane deceleration
self.landing_speed_kmh = landing_speed_target # speed that we are targetting for the landing
self.landing_speed_m_s = landing_speed_target * 1000 / 3600 # convert kilometer per seconds to meter per seconds
self.stall_speed_threshold_kmh = altitude_loss_speed_treshold # speed where the plane began to stall, plane lose altitude due to low speed
self.stall_speed_threshold_m_s = altitude_loss_speed_treshold * 1000 / 3600 # convert the previous speed from kilometer per seconds to meter per seconds
self.descent_pitch_target_deg = descent_pitch_target # descent pitch target in deg
self.descent_altitude_threshold_landing_m = descent_alt_treshold_landing # give the threshold where the plane prepare for landing (nose pitch up a bit)
self.landing_pitch_target_deg = landing_pitch_target # the finale pitch (nose up) for the landing in degree
self.climb_pitch_target_deg = target_climbup_pitch # pitch of the climbup in degree
# --- Turning charac ---
self.max_speed_ratio_while_turning = max_speed_ratio_while_turining # lower speed for turns ratio from max speed eg 0.3 give 0.3*maxspeed
# --- init case No climb ---
if not enable_climb:
self.alt = self.waypoints[0][2]
self.speed = self.target_speed_m_s
self.bearing = self.calculate_bearing(tuple(self.coords), tuple(self.waypoints[1][:2]))
min_distance = self.get_min_waypoints_distance()
for idx in range(1, len(waypoints)):
lat1, lon1, _ = waypoints[idx - 1]
lat2, lon2, _ = waypoints[idx]
distance = self.haversine_distance(lat1, lon1, lat2, lon2)
if distance < min_distance:
print(f"Distance between waypoint {idx - 1} and {idx}: {distance:.2f}m")
raise ValueError(
f"Waypoints #{idx - 1} and #{idx} are too close: {distance:.2f}m < {min_distance:.2f}m, change their positions or decrease the max speed"
)
def __str__(self):
return (
f"TrajectoryLogger(\n"
f" time = {self.time} s\n"
f" speed = {self.speed} m/s\n"
f" alt = {self.alt} m\n"
f" roll = {self.roll}°\n"
f" pitch = {self.pitch}°\n"
f" yaw = {self.yaw}°\n"
f" bearing = {self.bearing}°\n"
f" coords = {self.coords}\n"
f" waypoints = {self.waypoints}\n"
f" yaw_angular_speed = {self.yaw_angular_speed}°/s\n"
f" roll_angular_speed = {self.roll_angular_speed}°/s\n"
f" pitch_angular_speed = {self.pitch_angular_speed}°/s\n"
f" vehicule_acceleration = {self.vehicle_acceleration} m/s²\n"
f" max_speed = {self.max_speed_kmh} km/h ({self.target_speed_m_s:.2f} m/s)\n"
f" Alt_Target = {self.target_altitude_m} m\n"
f" max_roll = {self.max_roll}°\n"
f" max_pitch = {self.max_pitch}°\n"
f")"
)
__repr__ = __str__
[docs]
def calculate_bearing(self, coord1, coord2):
"""
Calculate the compass bearing from coord1 to coord2.
This function computes the initial bearing (also called forward azimuth) that
you would follow from the start coordinate to reach the end coordinate on
the Earth's surface, assuming a spherical Earth model.
The bearing is calculated clockwise from the north direction (0° to 360°).
Parameters:
coord1 (tuple): Latitude and longitude of the start point (degrees).
coord2 (tuple): Latitude and longitude of the end point (degrees).
Returns:
float: Bearing angle in degrees from North (0° to 360°).
Math:
- Converts lat/lon to radians.
- Uses spherical trigonometry formulas to compute bearing:
x = sin(delta_longitude) * cos(lat2)
y = cos(lat1)*sin(lat2) - sin(lat1)*cos(lat2)*cos(delta_longitude)
- Bearing = atan2(x, y) converted to degrees and normalized to [0,360).
"""
lat1_rad = math.radians(coord1[0])
lat2_rad = math.radians(coord2[0])
diff_long_rad = math.radians(coord2[1] - coord1[1])
x = math.sin(diff_long_rad) * math.cos(lat2_rad)
y = (math.cos(lat1_rad) * math.sin(lat2_rad) -
math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(diff_long_rad))
initial_bearing_rad = math.atan2(x, y)
initial_bearing_deg = math.degrees(initial_bearing_rad)
compass_bearing = (initial_bearing_deg + 360) % 360
return compass_bearing
[docs]
def estimate_level_off_alt_gain(self):
"""
Estimate the altitude gained during the aircraft's pitch reduction phase.
This method estimates how much altitude the aircraft will gain while
reducing its pitch angle from the current pitch to zero (level flight),
assuming constant pitch angular speed and target speed.
Returns:
float: Estimated altitude gain in meters during leveling off.
Math:
- Time to level off is current pitch divided by pitch angular speed.
- Average pitch during leveling is assumed to be half the initial pitch.
- Vertical speed component is target speed * sin(average pitch).
- Estimated altitude gain = vertical speed * time to level.
"""
if self.pitch <= 0 or self.pitch_angular_speed <= 1e-6:
return 0
time_to_level = self.pitch / self.pitch_angular_speed
avg_pitch_deg = self.pitch / 2.0
avg_pitch_rad = math.radians(avg_pitch_deg)
# Predict using target speed as plane should be near target speed when leveling off
predict_speed = self.target_speed_m_s
avg_vertical_speed = predict_speed * math.sin(avg_pitch_rad)
estimated_gain = avg_vertical_speed * time_to_level
return estimated_gain
[docs]
def simulate_takeoff_and_climb(self, dt=1.0, pitch_threshold=0.05):
"""
Simulate the aircraft's takeoff roll, climb phase, and predictive level-off.
The simulation uses geographic coordinates (latitude/longitude) and updates
aircraft state every dt seconds. It transitions through three phases:
- Takeoff Roll: aircraft accelerates on runway with zero pitch.
- Climb: aircraft climbs at maximum pitch until near target altitude.
- Level Off: aircraft reduces pitch to level flight near target altitude.
Inputs:
dt (float): Time step for simulation update (seconds).
pitch_threshold (float): Pitch angle threshold to consider leveling complete (degrees).
Returns:
pd.DataFrame: A DataFrame logging the aircraft state at each time step with
columns for time, speed, altitude, roll, pitch, yaw, bearing, position, etc.
Math and Logic:
- Uses pitch angular speed and vehicle acceleration to update pitch and speed.
- Calculates vertical and horizontal speed components from pitch.
- Updates geographic position using geopy's geodesic destination.
- Tracks phase transitions based on speed and altitude.
"""
print(f"--- Starting Takeoff Roll, Climb & Predictive Level Off Simulation ---")
print(f"Using Geographic Coordinates (Lat/Lon). Pitch enabled > {self.pitch_enable_speed_m_s:.1f} m/s")
print(
f"Target Alt: {self.target_altitude_m} m, Target Speed: {self.target_speed_m_s:.2f} m/s, Max Pitch: {self.max_pitch}°")
print(f"-----------------------------------------------------------------------")
log_entries = []
time_elapsed = 0
phase = "Takeoff Roll" # Start phase
level_off_initiated = False
pitch_enabled = False # Flag: can pitch yet?
# Set initial bearing toward next checkpoint
if len(self.waypoints) > 1:
self.bearing = self.calculate_bearing(self.coords, self.waypoints[1][:2])
self.yaw = self.bearing # Assume instant runway alignment
print(f"Initial bearing calculated using class method: {self.bearing:.2f}°")
else:
print("Warning: Only one checkpoint. Cannot calculate bearing. Setting to 0.")
self.bearing = 0
self.yaw = 0
while True:
predicted_altitude_gain = 0.0
trigger_altitude = self.target_altitude_m # Default
# Enable pitch if speed threshold reached
if not pitch_enabled and self.speed >= self.pitch_enable_speed_m_s:
print(
f"Time {time_elapsed:.1f}s: Speed ({self.speed:.1f} m/s) reached threshold ({self.pitch_enable_speed_m_s:.1f} m/s). Pitch enabled.")
pitch_enabled = True
phase = "Climb"
# Determine target pitch based on phase
if phase == "Takeoff Roll":
target_pitch = 0.0 # Keep pitch flat on ground
elif phase == "Climb":
predicted_altitude_gain = self.estimate_level_off_alt_gain()
trigger_altitude = self.target_altitude_m - predicted_altitude_gain
if self.alt >= trigger_altitude:
print(
f"Time {time_elapsed:.1f}s: Alt ({self.alt:.1f}m) reached trigger ({trigger_altitude:.1f}m) for level off. Initiating Level Off.")
phase = "Level Off"
level_off_initiated = True
target_pitch = 0.0
else:
target_pitch = self.max_pitch if pitch_enabled else 0.0
else: # Level Off
target_pitch = 0.0
level_off_initiated = True
target_speed = self.target_speed_m_s
# Log current state
current_pitch_rad = math.radians(self.pitch)
initial_vs = self.speed * math.sin(current_pitch_rad)
initial_hs = self.speed * math.cos(current_pitch_rad)
log_entry = {
"plane_id": self.flight_id,
"time_s": time_elapsed,
"speed_ms": self.speed,
"alt_m": self.alt,
"roll_deg": self.roll,
"pitch_deg": self.pitch,
"yaw_deg": self.yaw,
"bearing_deg": self.bearing,
"latitude": self.coords[0],
"longitude": self.coords[1],
"distance": self.distance,
"phase": "Initial Climb",
"plane_type": self.plane_type,
}
log_entries.append(log_entry)
# Control adjustments for pitch and speed
pitch_error = target_pitch - self.pitch
delta_pitch = np.clip(pitch_error, -self.pitch_angular_speed * dt, self.pitch_angular_speed * dt)
next_pitch = self.pitch + delta_pitch
next_pitch = np.clip(next_pitch, -self.climb_pitch_target_deg / 2, self.climb_pitch_target_deg)
if self.speed < target_speed:
delta_speed = self.vehicle_acceleration * dt
next_speed = min(
self.speed + delta_speed * math.sin(math.pi * 0.05 + math.pi * (self.speed / target_speed) * 0.95),
target_speed)
else:
next_speed = target_speed
# Simulate physics for next step
next_pitch_rad = math.radians(next_pitch)
vertical_speed = next_speed * math.sin(next_pitch_rad)
horizontal_speed = next_speed * math.cos(next_pitch_rad)
# Update altitude
delta_alt = vertical_speed * dt
next_alt = self.alt + delta_alt
if phase != "Takeoff Roll" and next_alt < 0:
next_alt = 0
# Update coordinates with geopy if available, else fallback
distance_meters = horizontal_speed * dt
if distance_meters > 1e-3:
if 'Point' in globals() and 'geodesic' in globals():
start_point = Point(latitude=self.coords[0], longitude=self.coords[1])
destination = geodesic(meters=distance_meters).destination(point=start_point, bearing=self.bearing)
next_coords = [destination.latitude, destination.longitude]
else:
lat1_rad = math.radians(self.coords[0])
lon1_rad = math.radians(self.coords[1])
bearing_rad = math.radians(self.bearing)
earth_radius_m = 6371000 # Earth radius approx
ang_dist = distance_meters / earth_radius_m
lat2_rad = math.asin(math.sin(lat1_rad) * math.cos(ang_dist) +
math.cos(lat1_rad) * math.sin(ang_dist) * math.cos(bearing_rad))
lon2_rad = lon1_rad + math.atan2(math.sin(bearing_rad) * math.sin(ang_dist) * math.cos(lat1_rad),
math.cos(ang_dist) - math.sin(lat1_rad) * math.sin(lat2_rad))
next_coords = [math.degrees(lat2_rad), math.degrees(lon2_rad)]
else:
next_coords = list(self.coords)
# Update state for next iteration
self.speed = next_speed
self.pitch = next_pitch
self.alt = next_alt
self.coords = next_coords
self.yaw = self.bearing
self.distance += next_speed
time_elapsed += dt
# Check for end of simulation
if level_off_initiated and abs(self.pitch) < pitch_threshold:
alt_error = abs(self.alt - self.target_altitude_m)
print(f"--- Level Off Complete. Pitch ({self.pitch:.2f}°) near zero at time {time_elapsed:.1f}s. "
f"Final Alt: {self.alt:.1f}m (Error: {alt_error:.1f}m) ---")
final_log_entry = {
"plane_id": self.flight_id,
"time_s": time_elapsed,
"speed_ms": self.speed,
"alt_m": self.alt,
"roll_deg": self.roll,
"pitch_deg": self.pitch,
"yaw_deg": self.yaw,
"bearing_deg": self.bearing,
"latitude": self.coords[0],
"longitude": self.coords[1],
"distance": self.distance,
"phase": "Initial Climb",
"plane_type": self.plane_type
}
log_entries.append(final_log_entry)
break
# Safety break for max simulation time
if time_elapsed > 72000:
print("Warning: Simulation exceeded maximum time limit (20hr).")
final_log_entry = {
"plane_id": self.flight_id,
"time_s": time_elapsed,
"speed_ms": self.speed,
"alt_m": self.alt,
"roll_deg": self.roll,
"pitch_deg": self.pitch,
"yaw_deg": self.yaw,
"bearing_deg": self.bearing,
"latitude": self.coords[0],
"longitude": self.coords[1],
"distance": self.distance,
"phase": "Initial Climb",
"plane_type": self.plane_type
}
log_entries.append(final_log_entry)
break
trajectory_log_df = pd.DataFrame(log_entries)
trajectory_log_df = trajectory_log_df.reindex(columns=self.df_col_names)
return trajectory_log_df
[docs]
def estimate_altitude_change_for_pitch_correction(self, dt=1.0):
"""
Estimates total altitude change expected while correcting pitch angle to zero.
This method simulates the pitch angle reduction towards zero, calculating
cumulative altitude gain or loss during the pitch correction maneuver.
Parameters:
dt (float): Time step in seconds for the pitch correction simulation.
Returns:
float: Estimated net altitude change (meters) during pitch correction.
Positive values indicate altitude gain; negative values indicate loss.
Explanation:
- Simulates gradual pitch angle approach to zero at reduced angular speed.
- Altitude changes are integrated from vertical speed component at each step.
- Positive pitch reduces altitude gain; negative pitch reduces altitude.
"""
pitch = self.pitch
pitch_angular_speed = self.pitch_angular_speed / 10
altitude_difference = 0
while True:
pitch_delta = pitch_angular_speed * dt
if pitch > 0:
pitch -= pitch_delta
if pitch < 0:
pitch = 0
altitude_difference += self.speed * math.sin(math.radians(pitch)) * dt
if pitch < 0:
pitch += pitch_delta
if pitch > 0:
pitch = 0
altitude_difference -= self.speed * math.sin(math.radians(pitch)) * dt
if pitch == 0:
break
return altitude_difference
[docs]
def simulate_cruise_to_waypoint(self, dt=1.0):
"""
Simulates cruising flight to the current target waypoint with altitude adjustments.
This function guides the aircraft towards the waypoint, controlling pitch to
approach the target altitude smoothly, accelerating up to target speed, and
adjusting position iteratively until close to the waypoint.
Parameters:
dt (float): Time step in seconds for each simulation update.
Returns:
pd.DataFrame: A DataFrame logging the aircraft state at each step, including
speed, altitude, pitch, roll, yaw, bearing, coordinates, distance traveled,
and the current phase.
Explanation:
- Calculates bearing to waypoint and adjusts heading.
- Determines target pitch based on altitude difference and distance.
- Adjusts pitch up or down to reach target altitude smoothly.
- Updates speed, position, altitude according to physics.
- Uses geodesic calculations to update geographic coordinates.
- Continues until aircraft is within half the target speed distance to waypoint.
"""
self.bearing = self.calculate_bearing(self.coords, self.waypoints[self.current_waypoint_index][:2])
log_entries = []
prev_altitude = -1
prev_delta_altitude = -1
do_log = True
adapt_altitude_to_target = True # Stage 1: adjust altitude to target
distance_to_waypoint = geodesic(tuple(self.coords), self.waypoints[self.current_waypoint_index][:2]).meters
altitude_diff = abs(self.alt - self.waypoints[self.current_waypoint_index][2])
hypotenuse = math.sqrt(altitude_diff ** 2 + distance_to_waypoint ** 2)
min_angle = abs(math.degrees(math.acos(distance_to_waypoint / hypotenuse)))
# Determine target pitch based on min_angle and pitch limits
if min_angle < self.cruising_pitch_max / 5:
target_pitch = min_angle * 4
elif min_angle < self.cruising_pitch_max / 2:
target_pitch = min_angle * 2
elif min_angle < self.cruising_pitch_max:
target_pitch = self.cruising_pitch_max
elif min_angle < self.max_pitch:
target_pitch = min_angle
else:
target_pitch = min_angle + 4
pitch_direction_up = self.waypoints[self.current_waypoint_index][2] > self.alt
while True:
new_bearing = self.calculate_bearing(self.coords, self.waypoints[self.current_waypoint_index][:2])
yaw_change = new_bearing - self.bearing
self.bearing = new_bearing
# Accelerate towards target speed if needed
if self.speed < self.target_speed_m_s:
speed_increment = self.vehicle_acceleration * dt
self.speed += speed_increment
if self.speed > self.target_speed_m_s:
self.speed = self.target_speed_m_s
if adapt_altitude_to_target:
delta_altitude = abs(self.alt - self.waypoints[self.current_waypoint_index][2])
if (self.estimate_altitude_change_for_pitch_correction(dt=dt) >= delta_altitude
or (prev_delta_altitude != -1 and delta_altitude > prev_delta_altitude)):
log_entries.extend(self.perform_pitch_correction_to_level(dt=dt))
do_log = False
adapt_altitude_to_target = False
self.pitch = 0
else:
pitch_delta = self.pitch_angular_speed * 0.1 * dt
if pitch_direction_up:
self.pitch += pitch_delta
if self.pitch > target_pitch:
self.pitch = target_pitch
else:
self.pitch -= pitch_delta
if self.pitch < -target_pitch:
self.pitch = -target_pitch
prev_delta_altitude = delta_altitude
if do_log:
horizontal_distance = self.speed * math.cos(math.radians(self.pitch)) * dt
self.distance += horizontal_distance
destination = Geodistance(meters=horizontal_distance).destination(
point=Point(self.coords[0], self.coords[1]),
bearing=self.bearing
)
self.alt += self.speed * math.sin(math.radians(self.pitch)) * dt
self.coords[0], self.coords[1] = destination.latitude, destination.longitude
self.time += 1
log_entries.append({
"plane_id": self.flight_id,
"time_s": self.time,
"speed_ms": self.speed,
"alt_m": self.alt,
"roll_deg": self.roll,
"pitch_deg": self.pitch,
"yaw_deg": yaw_change,
"bearing_deg": self.bearing,
"latitude": self.coords[0],
"longitude": self.coords[1],
"distance": self.distance,
"phase": f"Cruise to waypoint {self.current_waypoint_index}",
"plane_type": self.plane_type,
})
else:
do_log = True
# Break loop when close enough to waypoint (half target speed)
if geodesic(tuple(self.coords),
self.waypoints[self.current_waypoint_index][:2]).meters < self.target_speed_m_s / 2:
break
self.current_waypoint_index += 1
trajectory_log_df = pd.DataFrame(log_entries)
# Ensure columns match df_col_names
trajectory_log_df = trajectory_log_df.reindex(columns=self.df_col_names)
return trajectory_log_df
[docs]
def estimate_descent_distance(self, dt=1.0, target_angle=-3, descent_altitude_threshold_landing=500,
pitch_angular_speed=1):
"""
Estimates the horizontal distance required for the aircraft to safely descend
from its current altitude to the landing threshold altitude, following a specified
descent pitch angle and speed profile.
Parameters:
dt (float): Simulation time step in seconds.
target_angle (float): Desired descent pitch angle in degrees (negative for descent).
descent_altitude_threshold_landing (float): Altitude threshold in meters
at which landing procedures are initiated.
pitch_angular_speed (float): Angular speed for pitch adjustment during descent.
Returns:
float: Estimated horizontal distance in meters needed to complete the descent.
Explanation:
- Simulates descent trajectory by iteratively adjusting pitch angle toward target.
- Models speed reduction when below landing altitude and near stall speeds.
- Calculates geographic position updates via geodesic calculations.
- Applies a simplified altitude loss acceleration factor for stall conditions.
- Stops simulation when altitude reaches zero or below.
"""
coords = self.coords.copy()
speed = self.speed
altitude = self.alt
distance_traveled = 0
pitch = self.pitch
alt_loss_max_acceleration = -1.5 * speed * math.sin(math.radians(target_angle)) * dt
while True:
new_bearing = self.calculate_bearing(self.coords, self.waypoints[self.current_waypoint_index][:2])
yaw_change = new_bearing - self.bearing
self.bearing = new_bearing
if altitude > descent_altitude_threshold_landing:
if pitch > target_angle:
pitch_delta = dt * self.pitch_angular_speed
pitch -= pitch_delta
pitch = max(pitch, target_angle)
horizontal_distance = speed * math.cos(math.radians(pitch)) * dt
distance_traveled += horizontal_distance
destination = Geodistance(meters=horizontal_distance).destination(
point=Point(coords[0], coords[1]),
bearing=self.bearing
)
altitude += speed * math.sin(math.radians(pitch)) * dt
coords[0], coords[1] = destination.latitude, destination.longitude
else:
if pitch < self.landing_pitch_target_deg and speed < self.stall_speed_threshold_m_s:
pitch_delta = dt * self.pitch_angular_speed * 0.10 * pitch_angular_speed
pitch += pitch_delta
pitch = min(pitch, self.landing_pitch_target_deg)
elif pitch < 0 and speed >= self.stall_speed_threshold_m_s:
pitch_delta = dt * self.pitch_angular_speed * 0.10 * pitch_angular_speed
pitch += pitch_delta
pitch = min(pitch, 0)
if speed > self.landing_speed_m_s:
deceleration = dt * self.vehicle_deceleration
speed_reduction_factor = math.sin(
0.1 * math.pi +
0.9 * math.pi *
(
(self.target_speed_m_s - self.landing_speed_m_s) -
(self.speed - self.landing_speed_m_s)
) / (self.target_speed_m_s - self.landing_speed_m_s)
)
speed -= deceleration * speed_reduction_factor
speed = max(speed, self.landing_speed_m_s)
if speed < self.stall_speed_threshold_m_s:
altitude -= alt_loss_max_acceleration / (speed - self.landing_speed_m_s + 1)
horizontal_distance = speed * math.cos(math.radians(pitch)) * dt
distance_traveled += horizontal_distance
destination = Geodistance(meters=horizontal_distance).destination(
point=Point(coords[0], coords[1]),
bearing=self.bearing
)
altitude += speed * math.sin(math.radians(pitch)) * dt
coords[0], coords[1] = destination.latitude, destination.longitude
if altitude < 0:
altitude = 0
break
return distance_traveled
[docs]
def cruise_to_destination(self, dt=1.0):
"""
Controls the aircraft cruise phase toward the final destination waypoint,
calculating when to initiate descent based on estimated required descent distance.
Parameters:
dt (float): Simulation time step in seconds.
Returns:
pd.DataFrame: Complete flight log from cruise to final descent, combining
cruise and descent logs, including positions, speeds, altitudes, and phases.
Explanation:
- Calculates the distance required to descend safely using estimate_descent_distance().
- Adjusts descent parameters if needed to ensure landing within waypoint.
- Simulates cruise flight until aircraft is within descent initiation distance.
- Then calls perform_descent() to complete the approach and landing.
- Logs all states continuously and concatenates cruise and descent data.
"""
self.bearing = self.calculate_bearing(self.coords, self.waypoints[-1][:2])
log_entries = []
dist_needed_for_landing = self.estimate_descent_distance(
dt=dt,
target_angle=self.descent_pitch_target_deg,
descent_alt_treshold_landing=self.descent_altitude_threshold_landing_m,
pitch_angular_speed=1
)
distance_to_final_waypoint = geodesic(tuple(self.coords), self.waypoints[-1][:2]).meters
initial_descent_pitch_target = self.descent_pitch_target_deg
pitch_angular_speed = 1
if dist_needed_for_landing > distance_to_final_waypoint:
print(
"Landing setup with current plane simulation parameters not possible, attempting more aggressive parameters to land at last waypoint")
# Adjust descent parameters if distance needed to land is too large
while dist_needed_for_landing > distance_to_final_waypoint:
if self.speed < self.target_speed_m_s:
speed_increment = self.vehicle_acceleration * dt
self.speed += speed_increment
if self.speed > self.target_speed_m_s:
self.speed = self.target_speed_m_s
self.descent_pitch_target_deg -= 1
self.descent_altitude_threshold_landing_m += 100
pitch_angular_speed = abs(initial_descent_pitch_target) / abs(self.descent_pitch_target_deg)
dist_needed_for_landing = self.estimate_descent_distance(
dt=dt,
target_angle=self.descent_pitch_target_deg,
descent_alt_treshold_landing=self.descent_altitude_threshold_landing_m,
pitch_angular_speed=pitch_angular_speed
)
# Cruise until within descent distance, then perform descent
while True:
current_distance_to_waypoint = geodesic(tuple(self.coords), self.waypoints[-1][:2]).meters
if current_distance_to_waypoint > dist_needed_for_landing:
new_bearing = self.calculate_bearing(self.coords, self.waypoints[self.current_waypoint_index][:2])
yaw_change = new_bearing - self.bearing
self.bearing = new_bearing
horizontal_speed = self.speed * math.cos(math.radians(self.pitch))
self.distance += horizontal_speed * dt
destination = Geodistance(meters=horizontal_speed * dt).destination(
point=Point(self.coords[0], self.coords[1]),
bearing=self.bearing
)
self.time += 1
self.coords[0], self.coords[1] = destination.latitude, destination.longitude
log_entries.append({
"plane_id": self.flight_id,
"time_s": self.time,
"speed_ms": self.speed,
"alt_m": self.alt,
"roll_deg": self.roll,
"pitch_deg": self.pitch,
"yaw_deg": yaw_change,
"bearing_deg": self.bearing,
"latitude": self.coords[0],
"longitude": self.coords[1],
"distance": self.distance,
"phase": f"Cruise to last waypoint ({self.current_waypoint_index})",
"plane_type": self.plane_type,
})
else:
descent_log_df = self.perform_descent(
dt=dt,
target_angle=self.descent_pitch_target_deg,
descent_alt_treshold_landing=self.descent_altitude_threshold_landing_m,
pitch_angular_speed=pitch_angular_speed
)
break
trajectory_log_df = pd.DataFrame(log_entries)
# Ensure exact columns using reindex before returning
trajectory_log_df = trajectory_log_df.reindex(columns=self.df_col_names)
trajectory_log_df = pd.concat([trajectory_log_df, descent_log_df], ignore_index=True)
return trajectory_log_df
[docs]
def calculate_turn_direction_and_angle(self, target_bearing):
"""
Determines the optimal turn direction (left or right) and the minimal angular
difference needed to reach the target bearing from the current bearing.
Parameters:
target_bearing (float): Target heading bearing in degrees (0-360).
Returns:
tuple(bool, float):
- bool: True if the aircraft should turn right; False if left.
- float: Angle in degrees representing the smallest rotation needed.
Explanation:
- Uses modular arithmetic on bearings (0-360 degrees).
- Chooses turn direction to minimize angular travel.
- Accounts for wrap-around at 0/360 degrees.
"""
bearing_difference = self.bearing - target_bearing
abs_difference = abs(bearing_difference)
if abs_difference <= 180:
turn_right = bearing_difference < 0
rotation_angle = abs(target_bearing - self.bearing)
else:
if bearing_difference > 0:
turn_right = True
rotation_angle = abs(self.bearing - 360 - target_bearing)
else:
turn_right = False
rotation_angle = abs(self.bearing + 360 - target_bearing)
return turn_right, rotation_angle
[docs]
def haversine_distance(self, lat1, lon1, lat2, lon2):
R = 6371000 # Earth radius in meters
phi1 = np.radians(lat1)
phi2 = np.radians(lat2)
dphi = np.radians(lat2 - lat1)
dlambda = np.radians(lon2 - lon1)
a = np.sin(dphi / 2.0) ** 2 + np.cos(phi1) * np.cos(phi2) * np.sin(dlambda / 2.0) ** 2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
return R * c
[docs]
def estimate_roll_pitch_correction_rotation(self, dt=1.0, current_roll=None):
"""
Estimates the total angular rotation in degrees the aircraft will undergo while
correcting its roll angle back to zero by simulating incremental roll adjustments.
Parameters:
dt (float): Time step in seconds.
current_roll (float or None): Optional starting roll angle in degrees;
if None, uses the aircraft's current roll attribute.
Returns:
float: Absolute total rotation in degrees accumulated during roll correction.
Explanation:
- Simulates roll angle decay towards zero at a constant angular speed.
- Calculates cumulative rotation induced by pitch angular speed and roll.
- Rotation is calculated as the integral of angular increments per dt.
"""
total_rotation = 0
roll = self.roll if current_roll is None else current_roll
while True:
roll_change = self.roll_angular_speed * dt
if roll > 0:
roll -= roll_change
if roll < 0:
roll = 0
elif roll < 0:
roll += roll_change
if roll > 0:
roll = 0
incremental_rotation = (self.pitch_angular_speed * abs(roll) / self.max_roll) * math.sin(math.radians(roll))
total_rotation += incremental_rotation
if roll == 0:
break
return abs(total_rotation)
[docs]
def get_min_waypoints_distance(self, dt=1.0):
"""
Executes the aircraft's turn maneuver towards the next waypoint,
controlling roll angle, adjusting speed for sharper turns, and
updating position and orientation over time. then compute the min distance needed between waypoints.
Parameters:
dt (float): Time step in seconds for each iteration of the turn simulation.
Returns:
pd.DataFrame: A DataFrame log of the aircraft's state at each timestep during
the turn, including position, attitude, speed, and phase details.
"""
def local_calculate_turn_direction_and_angle(bearing, target_bearing):
bearing_difference = bearing - target_bearing
abs_difference = abs(bearing_difference)
if abs_difference <= 180:
turn_right = bearing_difference < 0
rotation_angle = abs(target_bearing - bearing)
else:
if bearing_difference > 0:
turn_right = True
rotation_angle = abs(bearing - 360 - target_bearing)
else:
turn_right = False
rotation_angle = abs(bearing + 360 - target_bearing)
return turn_right, rotation_angle
total_rotation = 0
log_entries = []
speed = self.target_speed_m_s
bearing = 0
yaw = 0
roll = 0
pitch = 0
alt = 10000
coords = [0, 0] # mutable copy
distance = 0
time = 0
rotation_needed = 180
turn_right = False
speed_ratio = rotation_needed / 180
target_speed = self.target_speed_m_s * (1 - ((1 - self.max_speed_ratio_while_turning) * math.sin(speed_ratio)))
while True:
target_bearing = self.calculate_bearing(tuple(coords), [-1, 0, 10000])
_, rotation_needed = local_calculate_turn_direction_and_angle(bearing, target_bearing)
# Reduce speed for sharper turns if needed
if speed > target_speed:
speed_decrement = dt * self.vehicle_deceleration
speed -= speed_decrement
if speed < target_speed:
speed = target_speed
# Check if rotation required is less than estimated correction rotation
if rotation_needed < self.estimate_roll_pitch_correction_rotation(dt=dt, current_roll=roll):
# log_entries.extend(self.perform_roll_pitch_correction(dt=dt))
break
# Adjust roll angle based on turn direction
roll_delta = self.roll_angular_speed * dt
if turn_right:
roll += roll_delta
if roll > self.max_roll:
roll = self.max_roll
else:
roll -= roll_delta
if roll < -self.max_roll:
roll = -self.max_roll
# Calculate incremental rotation based on roll
incremental_rotation = (self.pitch_angular_speed * abs(roll) / self.max_roll) * math.sin(math.radians(roll))
total_rotation += incremental_rotation
bearing += incremental_rotation
# Keep bearing within [0, 360]
if bearing > 360:
bearing -= 360
if bearing < 0:
bearing += 360
# Calculate pitch based on roll
pitch = (self.pitch_angular_speed * abs(roll) / self.max_roll) * math.cos(math.radians(roll))
# Update altitude and distance
alt += min(speed * math.sin(math.radians(pitch)) * dt, 0.5)
distance += speed * math.cos(math.radians(pitch)) * dt
# Calculate new position
displacement = speed * math.cos(math.radians(pitch)) * dt
destination = Geodistance(meters=displacement).destination(
point=Point(coords[0], coords[1]),
bearing=bearing
)
coords[0], coords[1] = destination.latitude, destination.longitude
time += 1
# Log current state
log_entries.append({
"plane_id": self.flight_id,
"time_s": time,
"speed_ms": speed,
"alt_m": alt,
"roll_deg": roll,
"pitch_deg": pitch,
"yaw_deg": yaw,
"bearing_deg": bearing,
"latitude": coords[0],
"longitude": coords[1],
"distance": distance,
"phase": f"Turn to waypoint {self.current_waypoint_index}",
"plane_type": self.plane_type,
})
trajectory_log = pd.DataFrame(log_entries)
trajectory_log = trajectory_log.reindex(columns=self.df_col_names)
def haversine_distance(lat1, lon1, lat2, lon2):
"""
Compute the Haversine distance between two points in decimal degrees.
Returns distance in meters.
"""
R = 6371000 # Earth radius in meters
phi1 = np.radians(lat1)
phi2 = np.radians(lat2)
dphi = np.radians(lat2 - lat1)
dlambda = np.radians(lon2 - lon1)
a = np.sin(dphi / 2.0) ** 2 + np.cos(phi1) * np.cos(phi2) * np.sin(dlambda / 2.0) ** 2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
return R * c
trajectory_log["distance_from_origin_m"] = haversine_distance(
0, 0,
trajectory_log["latitude"].values,
trajectory_log["longitude"].values
)
return trajectory_log["distance_from_origin_m"].max()
[docs]
def plot(self, df):
"""
Generates 2D plots of altitude, speed, and pitch angle over time during a flight
phase using Plotly, displaying key flight parameters to visualize performance.
Parameters:
df (pd.DataFrame): DataFrame containing logged flight data with columns:
- 'time_s': time in seconds
- 'alt_m': altitude in meters
- 'speed_ms': speed in meters/second
- 'pitch_deg': pitch angle in degrees
- plus other columns for context (not all plotted)
Returns:
None: Displays interactive plots in the output environment.
Explanation:
- Altitude vs Time plot shows altitude changes during the flight phase.
- Speed vs Time plot includes a horizontal line indicating target speed.
- Pitch vs Time plot includes a horizontal line indicating max pitch limit.
"""
print("\nFinal State after Climb:")
print(self)
print("\n Trajectory Log Head:")
print(df.head())
print("\n Trajectory Log Tail:")
print(df.tail())
# Plot the altitude
print("\nPlotting Altitude vs Time...")
fig_alt = px.line(df, x='time_s', y='alt_m', title='Altitude during Climb Phase')
fig_alt.update_layout(xaxis_title="Time (seconds)", yaxis_title="Altitude (meters)")
fig_alt.show()
# Optional: Plot speed
fig_spd = px.line(df, x='time_s', y='speed_ms', title='Speed during Climb Phase')
fig_spd.update_layout(xaxis_title="Time (seconds)", yaxis_title="Speed (m/s)")
# Add target speed line
fig_spd.add_hline(y=self.target_speed_m_s, line_dash="dot", annotation_text="Target Speed",
annotation_position="bottom right")
fig_spd.show()
# Optional: Plot Pitch
fig_pitch = px.line(df, x='time_s', y='pitch_deg', title='Pitch Angle during Climb Phase')
fig_pitch.update_layout(xaxis_title="Time (seconds)", yaxis_title="Pitch (degrees)")
fig_pitch.add_hline(y=self.max_pitch, line_dash="dot", annotation_text="Max Pitch",
annotation_position="bottom right")
fig_pitch.show()
[docs]
def plot_3d_flight_path(self, df):
"""
Visualizes the 3D flight path of the aircraft using Plotly, plotting latitude,
longitude, and altitude with markers colored by altitude and hover information.
Parameters:
df (pd.DataFrame): DataFrame containing flight data with required columns:
'alt_m', 'latitude', 'longitude', 'phase', 'roll_deg', 'pitch_deg',
'bearing_deg', 'speed_ms'.
Returns:
None: Displays an interactive 3D plot of the flight path.
Explanation:
- Uses scatter3d with lines and markers colored by altitude.
- Normalizes axis scales to maintain aspect ratio between lat/lon.
- Provides detailed hover info for each point including flight phase and attitudes.
"""
required_columns = ['alt_m', 'latitude', 'longitude', 'phase', 'roll_deg', 'pitch_deg', 'bearing_deg',
'speed_ms']
if not all(col in df.columns for col in required_columns):
raise ValueError(f"DataFrame must contain {required_columns}")
fig = go.Figure(data=[go.Scatter3d(
x=df['longitude'],
y=df['latitude'],
z=df['alt_m'],
mode='lines+markers',
marker=dict(
size=4,
color=df['alt_m'], # color based on altitude
colorscale='Viridis',
opacity=0.8
),
line=dict(
color='blue',
width=2
),
customdata=df[['phase', 'roll_deg', 'pitch_deg', 'bearing_deg', 'speed_ms']].values,
hovertemplate=(
"Longitude: %{x}<br>" +
"Latitude: %{y}<br>" +
"Altitude: %{z} m<br>" +
"Phase: %{customdata[0]}<br>" +
"Roll: %{customdata[1]}°<br>" +
"Pitch: %{customdata[2]}°<br>" +
"Bearing: %{customdata[3]}°<br>" +
"Speed: %{customdata[4]} m/s<br>" +
"<extra></extra>"
)
)])
# Calcul des plages
delta_x = abs(df['longitude'].max() - df['longitude'].min())
delta_y = abs(df['latitude'].max() - df['latitude'].min())
delta_z = abs(df['alt_m'].max() - df['alt_m'].min())
# Normalisation pour avoir une base XY à échelle 1:1
max_base = max(delta_x, delta_y)
aspect_ratio = dict(
x=delta_x / max_base,
y=delta_y / max_base,
z=0.05 # ou fixe à 0.5 si trop grand
)
fig.update_layout(
title='3D Flight Path',
scene=dict(
xaxis_title='Longitude',
yaxis_title='Latitude',
zaxis_title='Altitude (m)',
aspectmode='manual',
aspectratio=aspect_ratio
),
margin=dict(l=0, r=0, b=0, t=40)
)
fig.show()
[docs]
def calculate_trajectory(self, dt=1, plot=False, plot_3D=False):
"""
Computes the full flight trajectory by simulating takeoff, climb, cruise, turns,
and descent phases, aggregating the results into a comprehensive flight log.
Parameters:
dt (float): Time step in seconds for simulation steps.
plot (bool): If True, generates 2D plots of altitude, speed, and pitch.
plot_3D (bool): If True, generates a 3D interactive plot of the flight path.
Returns:
pd.DataFrame: Combined DataFrame logging aircraft state through all flight phases,
including positions, speeds, attitudes, and flight phases.
Explanation:
- Simulates takeoff and climb if climb enabled.
- Simulates turns and cruise legs between waypoints.
- Optionally simulates descent and landing.
- Supports visual output for deeper insight into trajectory and flight dynamics.
"""
if self.enable_climb:
trajectory_log_df = self.simulate_takeoff_and_climb(dt=dt, pitch_threshold=0.05)
turn_log_df = self.perform_turn_to_next_waypoint(dt=dt)
trajectory_log_df = pd.concat([trajectory_log_df, turn_log_df], ignore_index=True)
else:
trajectory_log_df = pd.DataFrame([{
"plane_id": self.flight_id,
"time_s": self.time,
"speed_ms": self.speed,
"alt_m": self.alt,
"roll_deg": self.roll,
"pitch_deg": self.pitch,
"yaw_deg": self.yaw,
"bearing_deg": self.bearing,
"latitude": self.coords[0],
"longitude": self.coords[1],
"distance": self.distance,
"phase": "initial State",
"plane_type": self.plane_type
}])
trajectory_log_df = trajectory_log_df.reindex(columns=self.df_col_names)
while self.current_waypoint_index < len(self.waypoints) - 1:
print(
f"Calculating for waypoint {self.current_waypoint_index} coords: {self.waypoints[self.current_waypoint_index]}")
cruise_log_df = self.simulate_cruise_to_waypoint(dt=dt)
trajectory_log_df = pd.concat([trajectory_log_df, cruise_log_df], ignore_index=True)
turn_log_df = self.perform_turn_to_next_waypoint(dt=dt)
trajectory_log_df = pd.concat([trajectory_log_df, turn_log_df], ignore_index=True)
if self.enable_descent:
descent_log_df = self.cruise_to_destination(dt=dt)
trajectory_log_df = pd.concat([trajectory_log_df, descent_log_df], ignore_index=True)
else:
cruise_log_df = self.simulate_cruise_to_waypoint(dt=dt)
trajectory_log_df = pd.concat([trajectory_log_df, cruise_log_df], ignore_index=True)
if plot:
self.plot(trajectory_log_df)
if plot_3D:
self.plot_3d_flight_path(trajectory_log_df)
return trajectory_log_df
[docs]
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great-circle distance between two points
on the Earth specified by longitude and latitude.
Returns distance in kilometers.
"""
lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(
dlon / 2) ** 2
c = 2 * math.asin(math.sqrt(a))
R = 6371.0
return R * c
[docs]
class FlightTrajectoryWorker(PolarsWorker):
"""Class derived from AgiDataWorker"""
pool_vars = {}
[docs]
def start(self):
global global_vars
self.pool_vars["args"] = self.args
self.pool_vars["verbose"] = self.verbose
global_vars = self.pool_vars
data_out = Path(self.args['data_out']).expanduser()
try:
shutil.rmtree(data_out, ignore_errors=False, onerror=self.onerror)
os.makedirs(data_out, exist_ok=True)
except Exception as e:
print(f'Error removing directory: {e}')
if self.verbose > 0:
print(f'from: {__file__}\n', end='')
[docs]
def work_init(self):
"""Initialize work by reading from shared space."""
global global_vars
pass
[docs]
def pool_init(self, worker_vars):
"""Initialize the pool with worker variables.
Args:
worker_vars (dict): Variables specific to the worker.
"""
global global_vars
global_vars = worker_vars
[docs]
def work_pool(self, file):
"""Parse IVQ log files.
Args:
file (str): The log file to parse.
Returns:
pl.DataFrame: Parsed data.
"""
global global_vars
args = self.args
verbose = self.verbose
file = int(file)
data_dir_path = Path(args['data_dir'])
expanded_data_dir_path = data_dir_path.expanduser()
full_waypoints_path = expanded_data_dir_path / args['waypoints']
with open(full_waypoints_path, 'r') as waypoints_list:
list_waypoints = json.load(waypoints_list)
data = list_waypoints['features'][file]['geometry']['coordinates'][0
] if isinstance(list_waypoints['features'][file]['geometry'][
'coordinates'][0][0], list) else list_waypoints['features'][file][
'geometry']['coordinates']
try:
plane = plane_trajectory(flight_id=file, waypoints=data,
yaw_angular_speed=args['yaw_angular_speed'],
roll_angular_speed=args['roll_angular_speed'],
pitch_angular_speed=args['pitch_angular_speed'],
vehicle_acceleration=args['vehicule_acceleration'],
max_speed=args['max_speed'], max_roll=args['max_roll'],
max_pitch=args['max_pitch'], target_climbup_pitch=args[
'target_climbup_pitch'], pitch_enable_speed_ratio=args[
'pitch_enable_speed_ratio'], altitude_loss_speed_treshold=
args['altitude_loss_speed_treshold'], descent_pitch_target=
args['descent_pitch_target'], landing_pitch_target=args[
'landing_pitch_target'], cruising_pitch_max=args[
'cruising_pitch_max'], descent_alt_treshold_landing=args[
'descent_alt_treshold_landing'],
max_speed_ratio_while_turining=args[
'max_speed_ratio_while_turining'], enable_climb=args[
'enable_climb'], enable_descent=args['enable_descent'],
default_alt_value=args['default_alt_value'], plane_type=
args['plane_type'])
df = plane.calculate_trajectory(dt=1)
col_name = df.columns.tolist()
col_name.remove('time_s')
col_name.insert(0, 'time_s')
df = df.reindex(columns=col_name)
if ("sat" in args["plane_type"].lower()):
df["roll_deg"] = 0
df["pitch_deg"] = 0
df["bearing_deg"] = 0
df["yaw_deg"] = 0
df = pl.from_pandas(df)
except ValueError as e:
print(f'Initialization failed: {e}')
return df
[docs]
def work_done(self, worker_df):
"""Concatenate dataframe if any and save the results.
Args:
worker_df (pl.DataFrame): Output dataframe for one plane.
"""
if worker_df.is_empty():
return
try:
id = worker_df['plane_id'][0]
self.data_out = Path(self.args['data_out']).expanduser()
os.makedirs(self.data_out, exist_ok=True)
timestamp = dt.now().strftime('%Y-%m-%d_%H-%M-%S')
filename = f'{self.data_out}/{self.args["plane_type"]}_{id}_{timestamp}.csv'
worker_df.write_csv(str(filename))
except Exception as e:
print(traceback.format_exc())
print(f'Error saving dataframe for plane {id} : {e}')
[docs]
def stop(self):
try:
"""Finalize the worker by listing saved dataframes."""
files = glob.glob(os.path.join(self.data_out, '**'), recursive=True
)
df_files = [f for f in files if re.search('\\.(csv|parquet)$', f)]
n_df = len(df_files)
if self.verbose > 0:
print(f'FlightTrajectoryWorker.worker_end - {n_df} dataframes:')
for f in df_files:
print(Path(f))
if not n_df:
print('No dataframe created')
except Exception as err:
print(f'Error while trying to find dataframes: {err}')
super().stop()