Flight-Telemetry Project
Overview
End-to-end reference project that ingests file-based aeronautical telemetry, cleans it with Polars and pushes curated artefacts back into the AGILab export directory.
Demonstrates how to orchestrate file-based distributions, run inside the cluster pool and keep the web interface responsive while the worker pipeline operates asynchronously.
Demonstrates the real-world worker-only Cython pattern: Polars ingestion, output writing, pages, and reducer contracts stay in Python, while the per-row haversine distance kernel can run as a typed compiled worker hot loop.
Bundles web-view lab material (
lab_stages.toml) and prompt examples to help you reproduce the workflow showcased in AGILab live demos.
Scientific highlights
The worker derives segment distances from time-stamped telemetry. For two samples with latitude/longitude \((\phi_1, \lambda_1)\) and \((\phi_2, \lambda_2)\), a common great-circle approximation is the haversine formula:
with Earth radius \(R\) and angular differences
\(\Delta\phi = \phi_2 - \phi_1\), \(\Delta\lambda = \lambda_2 - \lambda_1\).
The public demo stores this per-sample segment distance in the historical
speed column so existing analysis pages and reducer contracts remain
compatible.
The worker also records speed_kernel_runtime, speed_dtype_contract, and
speed_kernel_checksum_m in the reducer summary. That makes Cython mode
auditable without moving dataframe I/O, map/network analysis, or artifact
generation out of normal Python.
Public scope
flight_telemetry_project is intentionally file-only. The wider AGILab connector
catalog supports SQL, object-storage, and OpenSearch-compatible connector
definitions, but this built-in app rejects search-index data sources instead of
exposing a partially implemented path.
Manager (flight_telemetry.flight_telemetry)
Wraps the runnable application. Converts user-supplied arguments into a validated FlightArgs model, normalises data URIs, and initialises cluster dispatch by seeding
WorkDispatcher.Provides
from_toml/to_tomlhelpers so snippets on the Orchestrate page can reload configuration and persist overrides.Handles managed-PC specifics (path remapping, data directory resets) and keeps the dataframe export folder clean between runs.
Args (flight_telemetry.flight_args)
Pydantic models that capture the supported public configuration, including dataset location, slicing parameters and cluster toggles.
Ships conversion utilities for reading/writing
app_settings.tomland merging overrides that are injected by the Orchestrate page.
Worker (flight_telemetry_worker.flight_telemetry_worker)
Extends
PolarsWorkerto preprocess raw telemetry, compute great-circle segment distances between samples, and partition files across the cluster.Can be compiled by the AGILab dispatcher when Cython is enabled; generated
.pyx/.cfiles are build artefacts, not source-of-truth files.Keeps Cython scoped to the worker hot loop. The app manager, UI forms, analysis pages, and reducer schema remain regular Python.
Demonstrates Windows-friendly path handling and data staging for managed environments.
Assets & Tests
test/test_flight_telemetry_project_runtime_args.pycovers argument validation, file inventory building, worker defaults, and reduce-artifact emission.Cluster validation and UI tests cover the default
view_mapsroute, first-proof flow, and Release Decision reduce-artifact discovery.
API Reference
- class flight_telemetry.flight_telemetry.FlightTelemetry(env, args=None, **kwargs)[source]
Bases:
BaseWorkerFlightTelemetry class provides methods to orchestrate the run.
- as_dict(mode='json')[source]
Return current arguments as a serialisable dictionary.
- Return type:
dict[str,Any]
- build_distribution(workers)[source]
Build worker -> aircraft -> file batches for file-based telemetry.
- static extract_plane_from_file_name(file_path)[source]
provide airplane id from log file name
- Parameters:
file_path
Returns:
- classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)[source]
- Return type:
- get_partition_by_planes(df)[source]
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- class flight_telemetry.flight_telemetry.FlightTelemetryApp(env, args=None, **kwargs)[source]
Bases:
FlightTelemetryNamed AGILAB manager class for the flight-telemetry runtime.
Shared validation and persistence helpers for flight-telemetry project arguments.
- flight_telemetry.flight_args.ArgsModel
alias of
FlightArgs
- flight_telemetry.flight_args.ArgsOverrides
alias of
FlightArgsTD
- class flight_telemetry.flight_args.FlightArgs(**data)[source]
Bases:
BaseModelValidated configuration for the flight telemetry worker.
- data_in
- data_out
- data_source
- datemax
- datemin
- files
- model_config = {'extra': 'forbid', 'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- nfile
- nread
- nskip
- output_format
- reset_target
- sampling_rate
- class flight_telemetry.flight_args.FlightArgsTD[source]
Bases:
TypedDict- data_in
- data_out
- data_source
- datemax
- datemin
- files
- nfile
- nread
- nskip
- output_format
- reset_target
- sampling_rate
- flight_telemetry.flight_args.apply_source_defaults(args, *, host_ip=None)[source]
Ensure source-specific defaults for missing values.
- Return type:
- flight_telemetry.flight_args.dump_args(args, settings_path, *, section='args', create_missing=True)[source]
- Return type:
None
- flight_telemetry.flight_args.dump_args_to_toml(args, settings_path, section='args', create_missing=True)[source]
Persist arguments back to the TOML file (overwriting only the section).
- Return type:
None
- flight_telemetry.flight_args.load_args_from_toml(settings_path, section='args')[source]
Load arguments from a TOML file, applying model defaults when missing.
- Return type:
- flight_telemetry.flight_args.merge_args(base, overrides=None)[source]
Return a new instance with overrides applied on top of
base.- Return type:
- class flight_telemetry_worker.flight_telemetry_worker.FlightTelemetryWorker[source]
Bases:
PolarsWorkerClass derived from AgiDataWorker
- calculate_speed(new_column_name, df)[source]
Compute the segment distance in meters between consecutive coordinate pairs and add it under the legacy
speedcolumn name used by this demo. Assumes that the previous coordinate columns are already present.- Return type:
DataFrame
- pool_init(worker_vars)[source]
Initialize the pool with worker variables.
- Parameters:
worker_vars (dict) – Variables specific to the worker.
- pool_vars = {}
Shared state handed to
pool_initin every pool child. Apps usually setself.pool_vars = {"args": self.args}instart(). For process-based worker families (pandas/fireducks) the worker instance andpool_varsmust be picklable.
- preprocess_df(df)[source]
Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.
- Return type:
DataFrame