Flight Project

Overview

  • End-to-end reference project that ingests aeronautical telemetry, cleans it with Polars and pushes curated artefacts back into the AGILab export directory.

  • Demonstrates how to orchestrate file-based distributions, run inside the cluster pool and keep the Streamlit user interface responsive while the worker pipeline operates asynchronously.

  • Bundles Streamlit lab material (lab_steps.toml) and prompt examples to help you reproduce the workflow showcased in AGILab live demos.

Scientific highlights

The worker derives distances and speeds from time-stamped telemetry. For two samples with latitude/longitude \((\\phi_1, \\lambda_1)\) and \((\\phi_2, \\lambda_2)\), a common great-circle approximation is the haversine formula:

\[\begin{split}d = 2R \\arcsin\\left(\\sqrt{\\sin^2\\frac{\\Delta\\phi}{2} + \\cos\\phi_1\\cos\\phi_2\\sin^2\\frac{\\Delta\\lambda}{2}}\\right)\end{split}\]

with Earth radius \(R\) and angular differences \(\\Delta\\phi = \\phi_2 - \\phi_1\), \(\\Delta\\lambda = \\lambda_2 - \\lambda_1\). Given timestamps \(t_1, t_2\), an instantaneous speed estimate is \(v \\approx d / (t_2 - t_1)\).

Manager (flight.flight)

  • Wraps the runnable application. Converts user-supplied arguments into a validated FlightArgs model, normalises data URIs, and initialises cluster dispatch by seeding WorkDispatcher.

  • Provides from_toml / to_toml helpers so snippets on the Execute page can reload configuration and persist overrides.

  • Handles managed-PC specifics (path remapping, data directory resets) and keeps the dataframe export folder clean between runs.

Args (flight.flight_args)

  • Pydantic models that capture the full surface area of the project’s configuration, including dataset location, slicing parameters and cluster toggles.

  • Ships conversion utilities for reading/writing app_settings.toml and merging overrides that are injected by the Execute page.

Worker (flight_worker.flight_worker)

  • Extends PolarsWorker to preprocess raw telemetry, compute geodesic distances between samples, and partition files across the cluster.

  • Contains Cython hooks (flight_worker.pyx) and pre/post-install scripts so you can see how compiled extensions are integrated into a distribution.

  • Demonstrates Windows-friendly path handling and data staging for managed environments.

Assets & Tests

  • app_test.py exercises the full install → distribute → run flow.

  • test/_test_* modules focus on unit-level behaviour for manager, worker and orchestration glue.

  • Modules and lab_steps.toml contain the Streamlit lab material used by the Experiment page.

API Reference

flight package dependencies
class flight.flight.Flight(env, args=None, **kwargs)[source]

Bases: BaseWorker

Flight class provides methods to orchestrate the run.

__init__(env, args=None, **kwargs)[source]
args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode='json')[source]

Return current arguments as a serialisable dictionary.

Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

build_distribution(workers)[source]

build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another flight_worker which would have collapse the overall performance

Args:

Returns:

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

static extract_plane_from_file_name(file_path)[source]

provide airplane id from log file name

Parameters:

file_path

Returns:

classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)[source]
Return type:

Flight

get_data_from_files()[source]

get output-data slices from files or from ELK/HAWK

get_data_from_hawk()[source]

get output-data from ELK/HAWK

get_partition_by_planes(df)[source]

build the first level of the distribution tree with planes as atomics partition

Parameters:
  • s – df: dataframe containing the output-data to partition

  • df

Returns:

ivq_logs = None
static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path='app_settings.toml', section='args', create_missing=True)[source]
Return type:

None

verbose = 1
class flight.flight.FlightApp(env, args=None, **kwargs)[source]

Bases: Flight

Alias keeping legacy imports alive.

__init__(env, args=None, **kwargs)
args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode='json')

Return current arguments as a serialisable dictionary.

Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

build_distribution(workers)

build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another flight_worker which would have collapse the overall performance

Args:

Returns:

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

static extract_plane_from_file_name(file_path)

provide airplane id from log file name

Parameters:

file_path

Returns:

classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)
Return type:

Flight

get_data_from_files()

get output-data slices from files or from ELK/HAWK

get_data_from_hawk()

get output-data from ELK/HAWK

get_partition_by_planes(df)

build the first level of the distribution tree with planes as atomics partition

Parameters:
  • s – df: dataframe containing the output-data to partition

  • df

Returns:

ivq_logs = None
static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path='app_settings.toml', section='args', create_missing=True)
Return type:

None

verbose = 1
flight class diagram

Shared validation and persistence helpers for Flight project arguments.

flight.flight_args.ArgsModel

alias of FlightArgs

flight.flight_args.ArgsOverrides

alias of FlightArgsTD

class flight.flight_args.FlightArgs(**data)[source]

Bases: BaseModel

Validated configuration for the Flight worker.

__init__(**data)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

classmethod construct(_fields_set=None, **values)
Return type:

Self

copy(*, include=None, exclude=None, update=None, deep=False)

Returns a copy of the model.

!!! warning “Deprecated”

This method is now deprecated; use model_copy instead.

If you need include or exclude, use:

`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `

Parameters:
  • include (Set[int] | Set[str] | Mapping[int, Any] | Mapping[str, Any] | None) – Optional set or mapping specifying which fields to include in the copied model.

  • exclude (Set[int] | Set[str] | Mapping[int, Any] | Mapping[str, Any] | None) – Optional set or mapping specifying which fields to exclude in the copied model.

  • update (Optional[Dict[str, Any]]) – Optional dictionary of field-value pairs to override field values in the copied model.

  • deep (bool) – If True, the values of fields that are Pydantic models will be deep-copied.

Return type:

Self

Returns:

A copy of the model with included, excluded and updated fields as specified.

data_in: Path
data_out: Path | None
data_source: Literal['file', 'hawk']
datemax: date
datemin: date
dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
Return type:

Dict[str, Any]

files: str
classmethod from_orm(obj)
Return type:

Self

json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
Return type:

str

model_computed_fields = {}
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod model_construct(_fields_set=None, **values)

Creates a new instance of the Model class with validated data.

Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.

!!! note

model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.

Parameters:
  • _fields_set (set[str] | None) – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.

  • values (Any) – Trusted or pre-validated data dictionary.

Return type:

Self

Returns:

A new instance of the Model class with validated data.

model_copy(*, update=None, deep=False)
!!! abstract “Usage Documentation”

[model_copy](../concepts/models.md#model-copy)

Returns a copy of the model.

!!! note

The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).

Parameters:
  • update (Mapping[str, Any] | None) – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.

  • deep (bool) – Set to True to make a deep copy of the model.

Return type:

Self

Returns:

New model instance.

model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, exclude_computed_fields=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
!!! abstract “Usage Documentation”

[model_dump](../concepts/serialization.md#python-mode)

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Parameters:
  • mode (Union[Literal['json', 'python'], str]) – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.

  • include (Union[set[int], set[str], Mapping[int, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], Mapping[str, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], None]) – A set of fields to include in the output.

  • exclude (Union[set[int], set[str], Mapping[int, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], Mapping[str, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], None]) – A set of fields to exclude from the output.

  • context (Any | None) – Additional context to pass to the serializer.

  • by_alias (bool | None) – Whether to use the field’s alias in the dictionary key if defined.

  • exclude_unset (bool) – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults (bool) – Whether to exclude fields that are set to their default value.

  • exclude_none (bool) – Whether to exclude fields that have a value of None.

  • exclude_computed_fields (bool) – Whether to exclude computed fields. While this can be useful for round-tripping, it is usually recommended to use the dedicated round_trip parameter instead.

  • round_trip (bool) – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings (Union[bool, Literal['none', 'warn', 'error']]) – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • fallback (Optional[Callable[[Any], Any]]) – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.

  • serialize_as_any (bool) – Whether to serialize fields with duck-typing serialization behavior.

Return type:

dict[str, Any]

Returns:

A dictionary representation of the model.

model_dump_json(*, indent=None, ensure_ascii=False, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, exclude_computed_fields=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
!!! abstract “Usage Documentation”

[model_dump_json](../concepts/serialization.md#json-mode)

Generates a JSON representation of the model using Pydantic’s to_json method.

Parameters:
  • indent (int | None) – Indentation to use in the JSON output. If None is passed, the output will be compact.

  • ensure_ascii (bool) – If True, the output is guaranteed to have all incoming non-ASCII characters escaped. If False (the default), these characters will be output as-is.

  • include (Union[set[int], set[str], Mapping[int, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], Mapping[str, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], None]) – Field(s) to include in the JSON output.

  • exclude (Union[set[int], set[str], Mapping[int, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], Mapping[str, Union[set[int], set[str], Mapping[int, Union[IncEx, bool]], Mapping[str, Union[IncEx, bool]], bool]], None]) – Field(s) to exclude from the JSON output.

  • context (Any | None) – Additional context to pass to the serializer.

  • by_alias (bool | None) – Whether to serialize using field aliases.

  • exclude_unset (bool) – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults (bool) – Whether to exclude fields that are set to their default value.

  • exclude_none (bool) – Whether to exclude fields that have a value of None.

  • exclude_computed_fields (bool) – Whether to exclude computed fields. While this can be useful for round-tripping, it is usually recommended to use the dedicated round_trip parameter instead.

  • round_trip (bool) – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings (Union[bool, Literal['none', 'warn', 'error']]) – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • fallback (Optional[Callable[[Any], Any]]) – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.

  • serialize_as_any (bool) – Whether to serialize fields with duck-typing serialization behavior.

Return type:

str

Returns:

A JSON string representation of the model.

property model_extra: dict[str, Any] | None

Get extra fields set during validation.

Returns:

A dictionary of extra fields, or None if config.extra is not set to “allow”.

model_fields = {'data_in': FieldInfo(annotation=Path, required=False, default_factory=<lambda>), 'data_out': FieldInfo(annotation=Union[Path, NoneType], required=False, default=None), 'data_source': FieldInfo(annotation=Literal['file', 'hawk'], required=False, default='file'), 'datemax': FieldInfo(annotation=date, required=False, default_factory=<lambda>), 'datemin': FieldInfo(annotation=date, required=False, default_factory=<lambda>), 'files': FieldInfo(annotation=str, required=False, default='*'), 'nfile': FieldInfo(annotation=int, required=False, default=1, metadata=[Ge(ge=0)]), 'nread': FieldInfo(annotation=int, required=False, default=0, metadata=[Ge(ge=0)]), 'nskip': FieldInfo(annotation=int, required=False, default=0, metadata=[Ge(ge=0)]), 'output_format': FieldInfo(annotation=Literal['parquet', 'csv'], required=False, default='parquet'), 'reset_target': FieldInfo(annotation=bool, required=False, default=False), 'sampling_rate': FieldInfo(annotation=float, required=False, default=1.0, metadata=[Ge(ge=0)])}
property model_fields_set: set[str]

Returns the set of fields that have been explicitly set on this model instance.

Returns:

A set of strings representing the fields that have been set,

i.e. that were not filled from defaults.

classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation', *, union_format='any_of')

Generates a JSON schema for a model class.

Parameters:
  • by_alias (bool) – Whether to use attribute aliases or not.

  • ref_template (str) – The reference template.

  • union_format (Literal['any_of', 'primitive_type_array']) –

    The format to use when combining schemas from unions together. Can be one of:

    keyword to combine schemas (the default). - ‘primitive_type_array’: Use the [type](https://json-schema.org/understanding-json-schema/reference/type) keyword as an array of strings, containing each type of the combination. If any of the schemas is not a primitive type (string, boolean, null, integer or number) or contains constraints/metadata, falls back to any_of.

  • schema_generator (type[GenerateJsonSchema]) – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications

  • mode (Literal['validation', 'serialization']) – The mode in which to generate the schema.

Return type:

dict[str, Any]

Returns:

The JSON schema for the given model class.

classmethod model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

This method can be overridden to achieve a custom naming scheme for generic BaseModels.

Parameters:

params (tuple[type[Any], ...]) – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.

Return type:

str

Returns:

String representing the new class where params are passed to cls as type variables.

Raises:

TypeError – Raised when trying to generate concrete names for non-generic models.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Return type:

None

classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)

Try to rebuild the pydantic-core schema for the model.

This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.

Parameters:
  • force (bool) – Whether to force the rebuilding of the model schema, defaults to False.

  • raise_errors (bool) – Whether to raise errors, defaults to True.

  • _parent_namespace_depth (int) – The depth level of the parent namespace, defaults to 2.

  • _types_namespace (Mapping[str, Any] | None) – The types namespace, defaults to None.

Return type:

bool | None

Returns:

Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.

classmethod model_validate(obj, *, strict=None, extra=None, from_attributes=None, context=None, by_alias=None, by_name=None)

Validate a pydantic model instance.

Parameters:
  • obj (Any) – The object to validate.

  • strict (bool | None) – Whether to enforce types strictly.

  • extra (Optional[Literal['allow', 'ignore', 'forbid']]) – Whether to ignore, allow, or forbid extra data during model validation. See the [extra configuration value][pydantic.ConfigDict.extra] for details.

  • from_attributes (bool | None) – Whether to extract data from object attributes.

  • context (Any | None) – Additional context to pass to the validator.

  • by_alias (bool | None) – Whether to use the field’s alias when validating against the provided input data.

  • by_name (bool | None) – Whether to use the field’s name when validating against the provided input data.

Raises:

ValidationError – If the object could not be validated.

Return type:

Self

Returns:

The validated model instance.

classmethod model_validate_json(json_data, *, strict=None, extra=None, context=None, by_alias=None, by_name=None)
!!! abstract “Usage Documentation”

[JSON Parsing](../concepts/json.md#json-parsing)

Validate the given JSON data against the Pydantic model.

Parameters:
  • json_data (str | bytes | bytearray) – The JSON data to validate.

  • strict (bool | None) – Whether to enforce types strictly.

  • extra (Optional[Literal['allow', 'ignore', 'forbid']]) – Whether to ignore, allow, or forbid extra data during model validation. See the [extra configuration value][pydantic.ConfigDict.extra] for details.

  • context (Any | None) – Extra variables to pass to the validator.

  • by_alias (bool | None) – Whether to use the field’s alias when validating against the provided input data.

  • by_name (bool | None) – Whether to use the field’s name when validating against the provided input data.

Return type:

Self

Returns:

The validated Pydantic model.

Raises:

ValidationError – If json_data is not a JSON string or the object could not be validated.

classmethod model_validate_strings(obj, *, strict=None, extra=None, context=None, by_alias=None, by_name=None)

Validate the given object with string data against the Pydantic model.

Parameters:
  • obj (Any) – The object containing string data to validate.

  • strict (bool | None) – Whether to enforce types strictly.

  • extra (Optional[Literal['allow', 'ignore', 'forbid']]) – Whether to ignore, allow, or forbid extra data during model validation. See the [extra configuration value][pydantic.ConfigDict.extra] for details.

  • context (Any | None) – Extra variables to pass to the validator.

  • by_alias (bool | None) – Whether to use the field’s alias when validating against the provided input data.

  • by_name (bool | None) – Whether to use the field’s name when validating against the provided input data.

Return type:

Self

Returns:

The validated Pydantic model.

nfile: int
nread: int
nskip: int
output_format: Literal['parquet', 'csv']
classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
Return type:

Self

classmethod parse_obj(obj)
Return type:

Self

classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
Return type:

Self

reset_target: bool
sampling_rate: float
classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
Return type:

Dict[str, Any]

classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
Return type:

str

to_toml_payload()[source]

Return a TOML-friendly representation (Path/date → str).

Return type:

dict[str, Any]

classmethod update_forward_refs(**localns)
Return type:

None

classmethod validate(value)
Return type:

Self

class flight.flight_args.FlightArgsTD[source]

Bases: TypedDict

__init__(*args, **kwargs)
clear()

Remove all items from the dict.

copy()

Return a shallow copy of the dict.

data_in: str
data_out: str
data_source: str
datemax: str
datemin: str
files: str
classmethod fromkeys(iterable, value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items()

Return a set-like object providing a view on the dict’s items.

keys()

Return a set-like object providing a view on the dict’s keys.

nfile: int
nread: int
nskip: int
output_format: str
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

reset_target: bool
sampling_rate: float
setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E is present and has a .keys() method, then does: for k in E.keys(): D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()

Return an object providing a view on the dict’s values.

flight.flight_args.apply_source_defaults(args, *, host_ip=None)[source]

Ensure source-specific defaults for missing values.

Return type:

FlightArgs

flight.flight_args.dump_args(args, settings_path, *, section='args', create_missing=True)[source]
Return type:

None

flight.flight_args.dump_args_to_toml(args, settings_path, section='args', create_missing=True)[source]

Persist arguments back to the TOML file (overwriting only the section).

Return type:

None

flight.flight_args.ensure_defaults(args, **kwargs)[source]
Return type:

FlightArgs

flight.flight_args.load_args(settings_path, *, section='args')[source]
Return type:

FlightArgs

flight.flight_args.load_args_from_toml(settings_path, section='args')[source]

Load arguments from a TOML file, applying model defaults when missing.

Return type:

FlightArgs

flight.flight_args.merge_args(base, overrides=None)[source]

Return a new instance with overrides applied on top of base.

Return type:

FlightArgs

flight args class diagram flight worker package diagram
class flight_worker.flight_worker.FlightWorker[source]

Bases: PolarsWorker

Class derived from AgiDataWorker

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

calculate_speed(new_column_name, df)[source]

Compute the speed (in meters) between consecutive coordinate pairs and add it to the DataFrame under the provided column name. Assumes that the previous coordinate columns are already present.

Return type:

DataFrame

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

pool_init(worker_vars)[source]

Initialize the pool with worker variables.

Parameters:

worker_vars (dict) – Variables specific to the worker.

pool_vars = {}
prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

preprocess_df(df)[source]

Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.

Return type:

DataFrame

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

start()[source]

Initialize global variables and setup paths.

stop()[source]

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
work_done(worker_df)[source]

Concatenate dataframe if any and save the results.

Parameters:

worker_df (pl.DataFrame) – Output dataframe for one plane.

work_init()[source]

Initialize work by reading from shared space.

work_pool(file)[source]

Parse IVQ log files.

Parameters:

file (str) – The log file to parse.

Returns:

Parsed data.

Return type:

pl.DataFrame

works(workers_plan, workers_plan_metadata)

Executes worker tasks based on the distribution tree.

Parameters:
  • workers_plan (any) – Distribution tree structure.

  • workers_plan_metadata (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float

flight worker class diagram