Flight Project
Overview
End-to-end reference project that ingests aeronautical telemetry, cleans it with Polars and pushes curated artefacts back into the AGILab export directory.
Demonstrates how to orchestrate file-based distributions, run inside the cluster pool and keep the web interface responsive while the worker pipeline operates asynchronously.
Bundles web-view lab material (
lab_steps.toml) and prompt examples to help you reproduce the workflow showcased in AGILab live demos.
Scientific highlights
The worker derives distances and speeds from time-stamped telemetry. For two samples with latitude/longitude \((\phi_1, \lambda_1)\) and \((\phi_2, \lambda_2)\), a common great-circle approximation is the haversine formula:
with Earth radius \(R\) and angular differences \(\Delta\phi = \phi_2 - \phi_1\), \(\Delta\lambda = \lambda_2 - \lambda_1\). Given timestamps \(t_1, t_2\), an instantaneous speed estimate is \(v \approx d / (t_2 - t_1)\).
Manager (flight.flight)
Wraps the runnable application. Converts user-supplied arguments into a validated FlightArgs model, normalises data URIs, and initialises cluster dispatch by seeding
WorkDispatcher.Provides
from_toml/to_tomlhelpers so snippets on the Orchestrate page can reload configuration and persist overrides.Handles managed-PC specifics (path remapping, data directory resets) and keeps the dataframe export folder clean between runs.
Args (flight.flight_args)
Pydantic models that capture the full surface area of the project’s configuration, including dataset location, slicing parameters and cluster toggles.
Ships conversion utilities for reading/writing
app_settings.tomland merging overrides that are injected by the Orchestrate page.
Worker (flight_worker.flight_worker)
Extends
PolarsWorkerto preprocess raw telemetry, compute geodesic distances between samples, and partition files across the cluster.Contains Cython hooks (
flight_worker.pyx) and pre/post-install scripts so you can see how compiled extensions are integrated into a distribution.Demonstrates Windows-friendly path handling and data staging for managed environments.
Assets & Tests
app_test.pyexercises the full install → distribute → run flow.test/_test_*modules focus on unit-level behaviour for manager, worker and orchestration glue.Modulesandlab_steps.tomlcontain the web-view lab material used by the Pipeline page.
API Reference
- class flight.flight.Flight(env, args=None, **kwargs)[source]
Bases:
BaseWorkerFlight class provides methods to orchestrate the run.
- as_dict(mode='json')[source]
Return current arguments as a serialisable dictionary.
- Return type:
dict[str,Any]
- build_distribution(workers)[source]
build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another flight_worker which would have collapse the overall performance
Args:
Returns:
- static extract_plane_from_file_name(file_path)[source]
provide airplane id from log file name
- Parameters:
file_path
Returns:
- classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)[source]
- Return type:
- get_partition_by_planes(df)[source]
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- class flight.flight.FlightApp(env, args=None, **kwargs)[source]
Bases:
FlightAlias keeping legacy imports alive.
Shared validation and persistence helpers for Flight project arguments.
- flight.flight_args.ArgsModel
alias of
FlightArgs
- flight.flight_args.ArgsOverrides
alias of
FlightArgsTD
- class flight.flight_args.FlightArgs(**data)[source]
Bases:
BaseModelValidated configuration for the Flight worker.
- data_in
- data_out
- data_source
- datemax
- datemin
- files
- model_config = {'extra': 'forbid', 'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- nfile
- nread
- nskip
- output_format
- reset_target
- sampling_rate
- class flight.flight_args.FlightArgsTD[source]
Bases:
TypedDict- data_in
- data_out
- data_source
- datemax
- datemin
- files
- nfile
- nread
- nskip
- output_format
- reset_target
- sampling_rate
- flight.flight_args.apply_source_defaults(args, *, host_ip=None)[source]
Ensure source-specific defaults for missing values.
- Return type:
- flight.flight_args.dump_args(args, settings_path, *, section='args', create_missing=True)[source]
- Return type:
None
- flight.flight_args.dump_args_to_toml(args, settings_path, section='args', create_missing=True)[source]
Persist arguments back to the TOML file (overwriting only the section).
- Return type:
None
- flight.flight_args.load_args_from_toml(settings_path, section='args')[source]
Load arguments from a TOML file, applying model defaults when missing.
- Return type:
- flight.flight_args.merge_args(base, overrides=None)[source]
Return a new instance with overrides applied on top of
base.- Return type:
- class flight_worker.flight_worker.FlightWorker[source]
Bases:
PolarsWorkerClass derived from AgiDataWorker
- calculate_speed(new_column_name, df)[source]
Compute the speed (in meters) between consecutive coordinate pairs and add it to the DataFrame under the provided column name. Assumes that the previous coordinate columns are already present.
- Return type:
DataFrame
- pool_init(worker_vars)[source]
Initialize the pool with worker variables.
- Parameters:
worker_vars (dict) – Variables specific to the worker.
- pool_vars = {}
- preprocess_df(df)[source]
Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.
- Return type:
DataFrame