Flight Project

Overview

  • End-to-end reference project that ingests aeronautical telemetry, cleans it with Polars and pushes curated artefacts back into the AGILab export directory.

  • Demonstrates how to orchestrate file-based distributions, run inside the cluster pool and keep the web interface responsive while the worker pipeline operates asynchronously.

  • Bundles web-view lab material (lab_steps.toml) and prompt examples to help you reproduce the workflow showcased in AGILab live demos.

Scientific highlights

The worker derives distances and speeds from time-stamped telemetry. For two samples with latitude/longitude \((\phi_1, \lambda_1)\) and \((\phi_2, \lambda_2)\), a common great-circle approximation is the haversine formula:

\[d = 2R \arcsin\left(\sqrt{\sin^2\frac{\Delta\phi}{2} + \cos\phi_1\cos\phi_2\sin^2\frac{\Delta\lambda}{2}}\right)\]

with Earth radius \(R\) and angular differences \(\Delta\phi = \phi_2 - \phi_1\), \(\Delta\lambda = \lambda_2 - \lambda_1\). Given timestamps \(t_1, t_2\), an instantaneous speed estimate is \(v \approx d / (t_2 - t_1)\).

Manager (flight.flight)

  • Wraps the runnable application. Converts user-supplied arguments into a validated FlightArgs model, normalises data URIs, and initialises cluster dispatch by seeding WorkDispatcher.

  • Provides from_toml / to_toml helpers so snippets on the Orchestrate page can reload configuration and persist overrides.

  • Handles managed-PC specifics (path remapping, data directory resets) and keeps the dataframe export folder clean between runs.

Args (flight.flight_args)

  • Pydantic models that capture the full surface area of the project’s configuration, including dataset location, slicing parameters and cluster toggles.

  • Ships conversion utilities for reading/writing app_settings.toml and merging overrides that are injected by the Orchestrate page.

Worker (flight_worker.flight_worker)

  • Extends PolarsWorker to preprocess raw telemetry, compute geodesic distances between samples, and partition files across the cluster.

  • Contains Cython hooks (flight_worker.pyx) and pre/post-install scripts so you can see how compiled extensions are integrated into a distribution.

  • Demonstrates Windows-friendly path handling and data staging for managed environments.

Assets & Tests

  • app_test.py exercises the full install → distribute → run flow.

  • test/_test_* modules focus on unit-level behaviour for manager, worker and orchestration glue.

  • Modules and lab_steps.toml contain the web-view lab material used by the Pipeline page.

API Reference

flight package dependencies
class flight.flight.Flight(env, args=None, **kwargs)[source]

Bases: BaseWorker

Flight class provides methods to orchestrate the run.

__init__(env, args=None, **kwargs)[source]
as_dict(mode='json')[source]

Return current arguments as a serialisable dictionary.

Return type:

dict[str, Any]

build_distribution(workers)[source]

build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another flight_worker which would have collapse the overall performance

Args:

Returns:

static extract_plane_from_file_name(file_path)[source]

provide airplane id from log file name

Parameters:

file_path

Returns:

classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)[source]
Return type:

Flight

get_data_from_files()[source]

get output-data slices from files or from ELK/HAWK

get_data_from_hawk()[source]

get output-data from ELK/HAWK

get_partition_by_planes(df)[source]

build the first level of the distribution tree with planes as atomics partition

Parameters:
  • s – df: dataframe containing the output-data to partition

  • df

Returns:

ivq_logs = None
to_toml(settings_path='app_settings.toml', section='args', create_missing=True)[source]
Return type:

None

class flight.flight.FlightApp(env, args=None, **kwargs)[source]

Bases: Flight

Alias keeping legacy imports alive.

flight class diagram

Shared validation and persistence helpers for Flight project arguments.

flight.flight_args.ArgsModel

alias of FlightArgs

flight.flight_args.ArgsOverrides

alias of FlightArgsTD

class flight.flight_args.FlightArgs(**data)[source]

Bases: BaseModel

Validated configuration for the Flight worker.

data_in
data_out
data_source
datemax
datemin
files
model_config = {'extra': 'forbid', 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

nfile
nread
nskip
output_format
reset_target
sampling_rate
to_toml_payload()[source]

Return a TOML-friendly representation (Path/date → str).

Return type:

dict[str, Any]

class flight.flight_args.FlightArgsTD[source]

Bases: TypedDict

data_in
data_out
data_source
datemax
datemin
files
nfile
nread
nskip
output_format
reset_target
sampling_rate
flight.flight_args.apply_source_defaults(args, *, host_ip=None)[source]

Ensure source-specific defaults for missing values.

Return type:

FlightArgs

flight.flight_args.dump_args(args, settings_path, *, section='args', create_missing=True)[source]
Return type:

None

flight.flight_args.dump_args_to_toml(args, settings_path, section='args', create_missing=True)[source]

Persist arguments back to the TOML file (overwriting only the section).

Return type:

None

flight.flight_args.ensure_defaults(args, **kwargs)[source]
Return type:

FlightArgs

flight.flight_args.load_args(settings_path, *, section='args')[source]
Return type:

FlightArgs

flight.flight_args.load_args_from_toml(settings_path, section='args')[source]

Load arguments from a TOML file, applying model defaults when missing.

Return type:

FlightArgs

flight.flight_args.merge_args(base, overrides=None)[source]

Return a new instance with overrides applied on top of base.

Return type:

FlightArgs

flight args class diagram
flight worker package diagram
class flight_worker.flight_worker.FlightWorker[source]

Bases: PolarsWorker

Class derived from AgiDataWorker

calculate_speed(new_column_name, df)[source]

Compute the speed (in meters) between consecutive coordinate pairs and add it to the DataFrame under the provided column name. Assumes that the previous coordinate columns are already present.

Return type:

DataFrame

pool_init(worker_vars)[source]

Initialize the pool with worker variables.

Parameters:

worker_vars (dict) – Variables specific to the worker.

pool_vars = {}
preprocess_df(df)[source]

Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.

Return type:

DataFrame

start()[source]

Initialize global variables and setup paths.

stop()[source]

Returns:

work_done(worker_df)[source]

Concatenate dataframe if any and save the results.

Parameters:

worker_df (pl.DataFrame) – Output dataframe for one plane.

work_init()[source]

Initialize work by reading from shared space.

work_pool(file)[source]

Parse IVQ log files.

Parameters:

file (str) – The log file to parse.

Returns:

Parsed data.

Return type:

pl.DataFrame

flight worker class diagram