Flight Project
Overview
End-to-end reference project that ingests aeronautical telemetry, cleans it with Polars and pushes curated artefacts back into the AGILab export directory.
Demonstrates how to orchestrate file-based distributions, run inside the cluster pool and keep the Streamlit user interface responsive while the worker pipeline operates asynchronously.
Bundles Streamlit lab material (
lab_steps.toml) and prompt examples to help you reproduce the workflow showcased in AGILab live demos.
Scientific highlights
The worker derives distances and speeds from time-stamped telemetry. For two samples with latitude/longitude \((\\phi_1, \\lambda_1)\) and \((\\phi_2, \\lambda_2)\), a common great-circle approximation is the haversine formula:
with Earth radius \(R\) and angular differences \(\\Delta\\phi = \\phi_2 - \\phi_1\), \(\\Delta\\lambda = \\lambda_2 - \\lambda_1\). Given timestamps \(t_1, t_2\), an instantaneous speed estimate is \(v \\approx d / (t_2 - t_1)\).
Manager (flight.flight)
Wraps the runnable application. Converts user-supplied arguments into a validated FlightArgs model, normalises data URIs, and initialises cluster dispatch by seeding
WorkDispatcher.Provides
from_toml/to_tomlhelpers so snippets on the Execute page can reload configuration and persist overrides.Handles managed-PC specifics (path remapping, data directory resets) and keeps the dataframe export folder clean between runs.
Args (flight.flight_args)
Pydantic models that capture the full surface area of the project’s configuration, including dataset location, slicing parameters and cluster toggles.
Ships conversion utilities for reading/writing
app_settings.tomland merging overrides that are injected by the Execute page.
Worker (flight_worker.flight_worker)
Extends
PolarsWorkerto preprocess raw telemetry, compute geodesic distances between samples, and partition files across the cluster.Contains Cython hooks (
flight_worker.pyx) and pre/post-install scripts so you can see how compiled extensions are integrated into a distribution.Demonstrates Windows-friendly path handling and data staging for managed environments.
Assets & Tests
app_test.pyexercises the full install → distribute → run flow.test/_test_*modules focus on unit-level behaviour for manager, worker and orchestration glue.Modulesandlab_steps.tomlcontain the Streamlit lab material used by the Experiment page.
API Reference
- class flight.flight.Flight(env, args=None, **kwargs)[source]
Bases:
BaseWorkerFlight class provides methods to orchestrate the run.
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode='json')[source]
Return current arguments as a serialisable dictionary.
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- build_distribution(workers)[source]
build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another flight_worker which would have collapse the overall performance
Args:
Returns:
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- static extract_plane_from_file_name(file_path)[source]
provide airplane id from log file name
- Parameters:
file_path
Returns:
- classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)[source]
- Return type:
- get_partition_by_planes(df)[source]
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path='app_settings.toml', section='args', create_missing=True)[source]
- Return type:
None
- verbose = 1
- class flight.flight.FlightApp(env, args=None, **kwargs)[source]
Bases:
FlightAlias keeping legacy imports alive.
- __init__(env, args=None, **kwargs)
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode='json')
Return current arguments as a serialisable dictionary.
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- build_distribution(workers)
build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another flight_worker which would have collapse the overall performance
Args:
Returns:
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- static extract_plane_from_file_name(file_path)
provide airplane id from log file name
- Parameters:
file_path
Returns:
- classmethod from_toml(env, settings_path='app_settings.toml', section='args', **overrides)
- Return type:
- get_data_from_files()
get output-data slices from files or from ELK/HAWK
- get_data_from_hawk()
get output-data from ELK/HAWK
- get_partition_by_planes(df)
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path='app_settings.toml', section='args', create_missing=True)
- Return type:
None
- verbose = 1
Shared validation and persistence helpers for Flight project arguments.
- flight.flight_args.ArgsModel
alias of
FlightArgs
- flight.flight_args.ArgsOverrides
alias of
FlightArgsTD
- class flight.flight_args.FlightArgs(**data)[source]
Bases:
BaseModelValidated configuration for the Flight worker.
- __init__(**data)
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- classmethod construct(_fields_set=None, **values)
- Return type:
Self
- copy(*, include=None, exclude=None, update=None, deep=False)
Returns a copy of the model.
- !!! warning “Deprecated”
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `- Parameters:
include (
Set[int] |Set[str] |Mapping[int,Any] |Mapping[str,Any] |None) – Optional set or mapping specifying which fields to include in the copied model.exclude (
Set[int] |Set[str] |Mapping[int,Any] |Mapping[str,Any] |None) – Optional set or mapping specifying which fields to exclude in the copied model.update (
Optional[Dict[str,Any]]) – Optional dictionary of field-value pairs to override field values in the copied model.deep (
bool) – If True, the values of fields that are Pydantic models will be deep-copied.
- Return type:
Self- Returns:
A copy of the model with included, excluded and updated fields as specified.
- data_in: Path
- data_out: Path | None
- data_source: Literal['file', 'hawk']
- datemax: date
- datemin: date
- dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
- Return type:
Dict[str,Any]
- files: str
- classmethod from_orm(obj)
- Return type:
Self
- json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
- Return type:
str
- model_computed_fields = {}
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod model_construct(_fields_set=None, **values)
Creates a new instance of the Model class with validated data.
Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.
- !!! note
model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.
- Parameters:
_fields_set (
set[str] |None) – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.values (
Any) – Trusted or pre-validated data dictionary.
- Return type:
Self- Returns:
A new instance of the Model class with validated data.
- model_copy(*, update=None, deep=False)
- !!! abstract “Usage Documentation”
[model_copy](../concepts/models.md#model-copy)
Returns a copy of the model.
- !!! note
The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).
- Parameters:
update (
Mapping[str,Any] |None) – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.deep (
bool) – Set to True to make a deep copy of the model.
- Return type:
Self- Returns:
New model instance.
- model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, exclude_computed_fields=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- !!! abstract “Usage Documentation”
[model_dump](../concepts/serialization.md#python-mode)
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- Parameters:
mode (
Union[Literal['json','python'],str]) – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.include (
Union[set[int],set[str],Mapping[int,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],Mapping[str,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],None]) – A set of fields to include in the output.exclude (
Union[set[int],set[str],Mapping[int,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],Mapping[str,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],None]) – A set of fields to exclude from the output.context (
Any|None) – Additional context to pass to the serializer.by_alias (
bool|None) – Whether to use the field’s alias in the dictionary key if defined.exclude_unset (
bool) – Whether to exclude fields that have not been explicitly set.exclude_defaults (
bool) – Whether to exclude fields that are set to their default value.exclude_none (
bool) – Whether to exclude fields that have a value of None.exclude_computed_fields (
bool) – Whether to exclude computed fields. While this can be useful for round-tripping, it is usually recommended to use the dedicated round_trip parameter instead.round_trip (
bool) – If True, dumped values should be valid as input for non-idempotent types such as Json[T].warnings (
Union[bool,Literal['none','warn','error']]) – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].fallback (
Optional[Callable[[Any],Any]]) – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.serialize_as_any (
bool) – Whether to serialize fields with duck-typing serialization behavior.
- Return type:
dict[str,Any]- Returns:
A dictionary representation of the model.
- model_dump_json(*, indent=None, ensure_ascii=False, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, exclude_computed_fields=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- !!! abstract “Usage Documentation”
[model_dump_json](../concepts/serialization.md#json-mode)
Generates a JSON representation of the model using Pydantic’s to_json method.
- Parameters:
indent (
int|None) – Indentation to use in the JSON output. If None is passed, the output will be compact.ensure_ascii (
bool) – If True, the output is guaranteed to have all incoming non-ASCII characters escaped. If False (the default), these characters will be output as-is.include (
Union[set[int],set[str],Mapping[int,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],Mapping[str,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],None]) – Field(s) to include in the JSON output.exclude (
Union[set[int],set[str],Mapping[int,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],Mapping[str,Union[set[int],set[str],Mapping[int,Union[IncEx,bool]],Mapping[str,Union[IncEx,bool]],bool]],None]) – Field(s) to exclude from the JSON output.context (
Any|None) – Additional context to pass to the serializer.by_alias (
bool|None) – Whether to serialize using field aliases.exclude_unset (
bool) – Whether to exclude fields that have not been explicitly set.exclude_defaults (
bool) – Whether to exclude fields that are set to their default value.exclude_none (
bool) – Whether to exclude fields that have a value of None.exclude_computed_fields (
bool) – Whether to exclude computed fields. While this can be useful for round-tripping, it is usually recommended to use the dedicated round_trip parameter instead.round_trip (
bool) – If True, dumped values should be valid as input for non-idempotent types such as Json[T].warnings (
Union[bool,Literal['none','warn','error']]) – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].fallback (
Optional[Callable[[Any],Any]]) – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.serialize_as_any (
bool) – Whether to serialize fields with duck-typing serialization behavior.
- Return type:
str- Returns:
A JSON string representation of the model.
- property model_extra: dict[str, Any] | None
Get extra fields set during validation.
- Returns:
A dictionary of extra fields, or None if config.extra is not set to “allow”.
- model_fields = {'data_in': FieldInfo(annotation=Path, required=False, default_factory=<lambda>), 'data_out': FieldInfo(annotation=Union[Path, NoneType], required=False, default=None), 'data_source': FieldInfo(annotation=Literal['file', 'hawk'], required=False, default='file'), 'datemax': FieldInfo(annotation=date, required=False, default_factory=<lambda>), 'datemin': FieldInfo(annotation=date, required=False, default_factory=<lambda>), 'files': FieldInfo(annotation=str, required=False, default='*'), 'nfile': FieldInfo(annotation=int, required=False, default=1, metadata=[Ge(ge=0)]), 'nread': FieldInfo(annotation=int, required=False, default=0, metadata=[Ge(ge=0)]), 'nskip': FieldInfo(annotation=int, required=False, default=0, metadata=[Ge(ge=0)]), 'output_format': FieldInfo(annotation=Literal['parquet', 'csv'], required=False, default='parquet'), 'reset_target': FieldInfo(annotation=bool, required=False, default=False), 'sampling_rate': FieldInfo(annotation=float, required=False, default=1.0, metadata=[Ge(ge=0)])}
- property model_fields_set: set[str]
Returns the set of fields that have been explicitly set on this model instance.
- Returns:
- A set of strings representing the fields that have been set,
i.e. that were not filled from defaults.
- classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation', *, union_format='any_of')
Generates a JSON schema for a model class.
- Parameters:
by_alias (
bool) – Whether to use attribute aliases or not.ref_template (
str) – The reference template.union_format (
Literal['any_of','primitive_type_array']) –The format to use when combining schemas from unions together. Can be one of:
’any_of’: Use the [anyOf](https://json-schema.org/understanding-json-schema/reference/combining#anyOf)
keyword to combine schemas (the default). - ‘primitive_type_array’: Use the [type](https://json-schema.org/understanding-json-schema/reference/type) keyword as an array of strings, containing each type of the combination. If any of the schemas is not a primitive type (string, boolean, null, integer or number) or contains constraints/metadata, falls back to any_of.
schema_generator (
type[GenerateJsonSchema]) – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modificationsmode (
Literal['validation','serialization']) – The mode in which to generate the schema.
- Return type:
dict[str,Any]- Returns:
The JSON schema for the given model class.
- classmethod model_parametrized_name(params)
Compute the class name for parametrizations of generic classes.
This method can be overridden to achieve a custom naming scheme for generic BaseModels.
- Parameters:
params (
tuple[type[Any],...]) – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.- Return type:
str- Returns:
String representing the new class where params are passed to cls as type variables.
- Raises:
TypeError – Raised when trying to generate concrete names for non-generic models.
- model_post_init(context, /)
Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.
- Return type:
None
- classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)
Try to rebuild the pydantic-core schema for the model.
This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.
- Parameters:
force (
bool) – Whether to force the rebuilding of the model schema, defaults to False.raise_errors (
bool) – Whether to raise errors, defaults to True._parent_namespace_depth (
int) – The depth level of the parent namespace, defaults to 2._types_namespace (
Mapping[str,Any] |None) – The types namespace, defaults to None.
- Return type:
bool|None- Returns:
Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.
- classmethod model_validate(obj, *, strict=None, extra=None, from_attributes=None, context=None, by_alias=None, by_name=None)
Validate a pydantic model instance.
- Parameters:
obj (
Any) – The object to validate.strict (
bool|None) – Whether to enforce types strictly.extra (
Optional[Literal['allow','ignore','forbid']]) – Whether to ignore, allow, or forbid extra data during model validation. See the [extra configuration value][pydantic.ConfigDict.extra] for details.from_attributes (
bool|None) – Whether to extract data from object attributes.context (
Any|None) – Additional context to pass to the validator.by_alias (
bool|None) – Whether to use the field’s alias when validating against the provided input data.by_name (
bool|None) – Whether to use the field’s name when validating against the provided input data.
- Raises:
ValidationError – If the object could not be validated.
- Return type:
Self- Returns:
The validated model instance.
- classmethod model_validate_json(json_data, *, strict=None, extra=None, context=None, by_alias=None, by_name=None)
- !!! abstract “Usage Documentation”
[JSON Parsing](../concepts/json.md#json-parsing)
Validate the given JSON data against the Pydantic model.
- Parameters:
json_data (
str|bytes|bytearray) – The JSON data to validate.strict (
bool|None) – Whether to enforce types strictly.extra (
Optional[Literal['allow','ignore','forbid']]) – Whether to ignore, allow, or forbid extra data during model validation. See the [extra configuration value][pydantic.ConfigDict.extra] for details.context (
Any|None) – Extra variables to pass to the validator.by_alias (
bool|None) – Whether to use the field’s alias when validating against the provided input data.by_name (
bool|None) – Whether to use the field’s name when validating against the provided input data.
- Return type:
Self- Returns:
The validated Pydantic model.
- Raises:
ValidationError – If json_data is not a JSON string or the object could not be validated.
- classmethod model_validate_strings(obj, *, strict=None, extra=None, context=None, by_alias=None, by_name=None)
Validate the given object with string data against the Pydantic model.
- Parameters:
obj (
Any) – The object containing string data to validate.strict (
bool|None) – Whether to enforce types strictly.extra (
Optional[Literal['allow','ignore','forbid']]) – Whether to ignore, allow, or forbid extra data during model validation. See the [extra configuration value][pydantic.ConfigDict.extra] for details.context (
Any|None) – Extra variables to pass to the validator.by_alias (
bool|None) – Whether to use the field’s alias when validating against the provided input data.by_name (
bool|None) – Whether to use the field’s name when validating against the provided input data.
- Return type:
Self- Returns:
The validated Pydantic model.
- nfile: int
- nread: int
- nskip: int
- output_format: Literal['parquet', 'csv']
- classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
- classmethod parse_obj(obj)
- Return type:
Self
- classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
- reset_target: bool
- sampling_rate: float
- classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
- Return type:
Dict[str,Any]
- classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
- Return type:
str
- to_toml_payload()[source]
Return a TOML-friendly representation (Path/date → str).
- Return type:
dict[str,Any]
- classmethod update_forward_refs(**localns)
- Return type:
None
- classmethod validate(value)
- Return type:
Self
- class flight.flight_args.FlightArgsTD[source]
Bases:
TypedDict- __init__(*args, **kwargs)
- clear()
Remove all items from the dict.
- copy()
Return a shallow copy of the dict.
-
data_in:
str
-
data_out:
str
-
data_source:
str
-
datemax:
str
-
datemin:
str
-
files:
str
- classmethod fromkeys(iterable, value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items()
Return a set-like object providing a view on the dict’s items.
- keys()
Return a set-like object providing a view on the dict’s keys.
-
nfile:
int
-
nread:
int
-
nskip:
int
-
output_format:
str
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
-
reset_target:
bool
-
sampling_rate:
float
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E is present and has a .keys() method, then does: for k in E.keys(): D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values()
Return an object providing a view on the dict’s values.
- flight.flight_args.apply_source_defaults(args, *, host_ip=None)[source]
Ensure source-specific defaults for missing values.
- Return type:
- flight.flight_args.dump_args(args, settings_path, *, section='args', create_missing=True)[source]
- Return type:
None
- flight.flight_args.dump_args_to_toml(args, settings_path, section='args', create_missing=True)[source]
Persist arguments back to the TOML file (overwriting only the section).
- Return type:
None
- flight.flight_args.load_args_from_toml(settings_path, section='args')[source]
Load arguments from a TOML file, applying model defaults when missing.
- Return type:
- flight.flight_args.merge_args(base, overrides=None)[source]
Return a new instance with overrides applied on top of
base.- Return type:
- class flight_worker.flight_worker.FlightWorker[source]
Bases:
PolarsWorkerClass derived from AgiDataWorker
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- calculate_speed(new_column_name, df)[source]
Compute the speed (in meters) between consecutive coordinate pairs and add it to the DataFrame under the provided column name. Assumes that the previous coordinate columns are already present.
- Return type:
DataFrame
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- pool_init(worker_vars)[source]
Initialize the pool with worker variables.
- Parameters:
worker_vars (dict) – Variables specific to the worker.
- pool_vars = {}
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- preprocess_df(df)[source]
Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.
- Return type:
DataFrame
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
- work_done(worker_df)[source]
Concatenate dataframe if any and save the results.
- Parameters:
worker_df (pl.DataFrame) – Output dataframe for one plane.
- work_pool(file)[source]
Parse IVQ log files.
- Parameters:
file (str) – The log file to parse.
- Returns:
Parsed data.
- Return type:
pl.DataFrame
- works(workers_plan, workers_plan_metadata)
Executes worker tasks based on the distribution tree.
- Parameters:
workers_plan (any) – Distribution tree structure.
workers_plan_metadata (any) – Additional information about the workers.
- Returns:
Execution time in seconds.
- Return type:
float