Flight Project
Overview
End-to-end reference project that ingests file-based aeronautical telemetry, cleans it with Polars and pushes curated artefacts back into the AGILab export directory.
Demonstrates how to orchestrate file-based distributions, run inside the cluster pool and keep the web interface responsive while the worker pipeline operates asynchronously.
Bundles web-view lab material (
lab_stages.toml) and prompt examples to help you reproduce the workflow showcased in AGILab live demos.
Scientific highlights
The worker derives segment distances from time-stamped telemetry. For two samples with latitude/longitude \((\phi_1, \lambda_1)\) and \((\phi_2, \lambda_2)\), a common great-circle approximation is the haversine formula:
with Earth radius \(R\) and angular differences
\(\Delta\phi = \phi_2 - \phi_1\), \(\Delta\lambda = \lambda_2 - \lambda_1\).
The public demo stores this per-sample segment distance in the historical
speed column so existing analysis pages and reducer contracts remain
compatible.
Public scope
flight_project is intentionally file-only. The wider AGILab connector
catalog supports SQL, object-storage, and OpenSearch-compatible connector
definitions, but this built-in app rejects search-index data sources instead of
exposing a partially implemented path.
Manager (flight.flight)
Wraps the runnable application. Converts user-supplied arguments into a validated FlightArgs model, normalises data URIs, and initialises cluster dispatch by seeding
WorkDispatcher.Provides
from_toml/to_tomlhelpers so snippets on the Orchestrate page can reload configuration and persist overrides.Handles managed-PC specifics (path remapping, data directory resets) and keeps the dataframe export folder clean between runs.
Args (flight.flight_args)
Pydantic models that capture the supported public configuration, including dataset location, slicing parameters and cluster toggles.
Ships conversion utilities for reading/writing
app_settings.tomland merging overrides that are injected by the Orchestrate page.
Worker (flight_worker.flight_worker)
Extends
PolarsWorkerto preprocess raw telemetry, compute great-circle segment distances between samples, and partition files across the cluster.Can be compiled by the AGILab dispatcher when Cython is enabled; generated
.pyx/.cfiles are build artefacts, not source-of-truth files.Demonstrates Windows-friendly path handling and data staging for managed environments.
Assets & Tests
test/test_flight_project_runtime_args.pycovers argument validation, file inventory building, worker defaults, and reduce-artifact emission.Cluster validation and UI tests cover the default
view_mapsroute, first-proof flow, and Release Decision reduce-artifact discovery.
API Reference
- class flight.flight.Flight(env, args=None, **kwargs)[source]
Bases:
BaseWorkerFlight class provides methods to orchestrate the run.
- as_dict(mode='json')[source]
Return current arguments as a serialisable dictionary.
- Return type:
dict[str,Any]
- build_distribution(workers)[source]
Build worker -> aircraft -> file batches for file-based telemetry.
- static extract_plane_from_file_name(file_path)[source]
provide airplane id from log file name
- Parameters:
file_path
Returns:
- classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)[source]
- Return type:
- get_partition_by_planes(df)[source]
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- class flight.flight.FlightApp(env, args=None, **kwargs)[source]
Bases:
FlightAlias keeping legacy imports alive.
Shared validation and persistence helpers for Flight project arguments.
- flight.flight_args.ArgsModel
alias of
FlightArgs
- flight.flight_args.ArgsOverrides
alias of
FlightArgsTD
- class flight.flight_args.FlightArgs(**data)[source]
Bases:
BaseModelValidated configuration for the Flight worker.
- data_in
- data_out
- data_source
- datemax
- datemin
- files
- model_config = {'extra': 'forbid', 'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- nfile
- nread
- nskip
- output_format
- reset_target
- sampling_rate
- class flight.flight_args.FlightArgsTD[source]
Bases:
TypedDict- data_in
- data_out
- data_source
- datemax
- datemin
- files
- nfile
- nread
- nskip
- output_format
- reset_target
- sampling_rate
- flight.flight_args.apply_source_defaults(args, *, host_ip=None)[source]
Ensure source-specific defaults for missing values.
- Return type:
- flight.flight_args.dump_args(args, settings_path, *, section='args', create_missing=True)[source]
- Return type:
None
- flight.flight_args.dump_args_to_toml(args, settings_path, section='args', create_missing=True)[source]
Persist arguments back to the TOML file (overwriting only the section).
- Return type:
None
- flight.flight_args.load_args_from_toml(settings_path, section='args')[source]
Load arguments from a TOML file, applying model defaults when missing.
- Return type:
- flight.flight_args.merge_args(base, overrides=None)[source]
Return a new instance with overrides applied on top of
base.- Return type:
- class flight_worker.flight_worker.FlightWorker[source]
Bases:
PolarsWorkerClass derived from AgiDataWorker
- calculate_speed(new_column_name, df)[source]
Compute the segment distance in meters between consecutive coordinate pairs and add it under the legacy
speedcolumn name used by this demo. Assumes that the previous coordinate columns are already present.- Return type:
DataFrame
- pool_init(worker_vars)[source]
Initialize the pool with worker variables.
- Parameters:
worker_vars (dict) – Variables specific to the worker.
- pool_vars = {}
- preprocess_df(df)[source]
Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.
- Return type:
DataFrame