Example1: My-Code-Project
manager: my_code

Package your_code
my_code: module my_code Auteur: Jean-Pierre Morard Copyright: Thales SIX GTS France SAS
- class my_code.my_code.MyCode(env, **args)[source]
Bases:
AgiManager
Class MyCode provides methods to orchestrate the run
- __init__(env, **args)[source]
Initialize the object with the provided keyword arguments.
- Parameters:
**args (Unpack[MyCodeArgs]) – Keyword arguments to initialize the object.
- Returns:
None
- args = {}
- static convert_functions_to_names(workers_tree)
Converts functions in a nested structure to their names.
- static do_distrib(inst, agi_env, workers)
Build the distribution tree.
- Parameters:
inst – The instance for building the distribution tree.
- Returns:
None
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree.
If the error is due to an access error (read-only file), it attempts to add write permission and then retries.
If the error is for another reason, it re-raises the error.
Usage: shutil.rmtree(path, onerror=onerror)
- Parameters:
func (function) – The function that raised the error.
path (str) – The path name passed to the function.
exc_info (tuple) – The exception information returned by sys.exc_info().
- Returns:
None
- verbose = None
- class my_code.my_code.MyCodeArgs(**data)[source]
Bases:
BaseModel
Class MyCodeArgs contains Arguments for MyCode
- __init__(**data)
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- classmethod construct(_fields_set=None, **values)
- Return type:
Self
- copy(*, include=None, exclude=None, update=None, deep=False)
Returns a copy of the model.
- !!! warning “Deprecated”
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `
- Parameters:
include – Optional set or mapping specifying which fields to include in the copied model.
exclude – Optional set or mapping specifying which fields to exclude in the copied model.
update – Optional dictionary of field-value pairs to override field values in the copied model.
deep – If True, the values of fields that are Pydantic models will be deep-copied.
- Returns:
A copy of the model with included, excluded and updated fields as specified.
- dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
- Return type:
Dict
[str
,Any
]
- classmethod from_orm(obj)
- Return type:
Self
- json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
- Return type:
str
- model_computed_fields = {}
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod model_construct(_fields_set=None, **values)
Creates a new instance of the Model class with validated data.
Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.
- Return type:
Self
- !!! note
model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.
- Parameters:
_fields_set – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.
values – Trusted or pre-validated data dictionary.
- Returns:
A new instance of the Model class with validated data.
- model_copy(*, update=None, deep=False)
- Return type:
Self
- !!! abstract “Usage Documentation”
[model_copy](../concepts/serialization.md#model_copy)
Returns a copy of the model.
- !!! note
The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).
- Parameters:
update – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.
deep – Set to True to make a deep copy of the model.
- Returns:
New model instance.
- model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- Return type:
dict
[str
,Any
]
- !!! abstract “Usage Documentation”
[model_dump](../concepts/serialization.md#modelmodel_dump)
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- Parameters:
mode – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.
include – A set of fields to include in the output.
exclude – A set of fields to exclude from the output.
context – Additional context to pass to the serializer.
by_alias – Whether to use the field’s alias in the dictionary key if defined.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A dictionary representation of the model.
- model_dump_json(*, indent=None, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- Return type:
str
- !!! abstract “Usage Documentation”
[model_dump_json](../concepts/serialization.md#modelmodel_dump_json)
Generates a JSON representation of the model using Pydantic’s to_json method.
- Parameters:
indent – Indentation to use in the JSON output. If None is passed, the output will be compact.
include – Field(s) to include in the JSON output.
exclude – Field(s) to exclude from the JSON output.
context – Additional context to pass to the serializer.
by_alias – Whether to serialize using field aliases.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A JSON string representation of the model.
- property model_extra: dict[str, Any] | None
Get extra fields set during validation.
- Returns:
A dictionary of extra fields, or None if config.extra is not set to “allow”.
- model_fields = {'mycode_param1': FieldInfo(annotation=int, required=False, default=<function conint>)}
- property model_fields_set: set[str]
Returns the set of fields that have been explicitly set on this model instance.
- Returns:
- A set of strings representing the fields that have been set,
i.e. that were not filled from defaults.
- classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation')
Generates a JSON schema for a model class.
- Return type:
dict
[str
,Any
]- Parameters:
by_alias – Whether to use attribute aliases or not.
ref_template – The reference template.
schema_generator – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications
mode – The mode in which to generate the schema.
- Returns:
The JSON schema for the given model class.
- classmethod model_parametrized_name(params)
Compute the class name for parametrizations of generic classes.
This method can be overridden to achieve a custom naming scheme for generic BaseModels.
- Return type:
str
- Parameters:
params – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.
- Returns:
String representing the new class where params are passed to cls as type variables.
- Raises:
TypeError – Raised when trying to generate concrete names for non-generic models.
- model_post_init(context, /)
Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.
- Return type:
None
- classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)
Try to rebuild the pydantic-core schema for the model.
This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.
- Return type:
bool
|None
- Parameters:
force – Whether to force the rebuilding of the model schema, defaults to False.
raise_errors – Whether to raise errors, defaults to True.
_parent_namespace_depth – The depth level of the parent namespace, defaults to 2.
_types_namespace – The types namespace, defaults to None.
- Returns:
Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.
- classmethod model_validate(obj, *, strict=None, from_attributes=None, context=None, by_alias=None, by_name=None)
Validate a pydantic model instance.
- Return type:
Self
- Parameters:
obj – The object to validate.
strict – Whether to enforce types strictly.
from_attributes – Whether to extract data from object attributes.
context – Additional context to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Raises:
ValidationError – If the object could not be validated.
- Returns:
The validated model instance.
- classmethod model_validate_json(json_data, *, strict=None, context=None, by_alias=None, by_name=None)
- Return type:
Self
- !!! abstract “Usage Documentation”
[JSON Parsing](../concepts/json.md#json-parsing)
Validate the given JSON data against the Pydantic model.
- Parameters:
json_data – The JSON data to validate.
strict – Whether to enforce types strictly.
context – Extra variables to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Returns:
The validated Pydantic model.
- Raises:
ValidationError – If json_data is not a JSON string or the object could not be validated.
- classmethod model_validate_strings(obj, *, strict=None, context=None, by_alias=None, by_name=None)
Validate the given object with string data against the Pydantic model.
- Return type:
Self
- Parameters:
obj – The object containing string data to validate.
strict – Whether to enforce types strictly.
context – Extra variables to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Returns:
The validated Pydantic model.
-
mycode_param1:
int
- classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
- classmethod parse_obj(obj)
- Return type:
Self
- classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
- classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
- Return type:
Dict
[str
,Any
]
- classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
- Return type:
str
- classmethod update_forward_refs(**localns)
- Return type:
None
- classmethod validate(value)
- Return type:
Self

worker: my_code_worker

Module my_code_worker extension of your_code
Auteur: yourself
- class my_code_worker.my_code_worker.MyCodeWorker[source]
Bases:
DagWorker
class derived from DagWorker
- algo_A()[source]
Perform algorithm A.
This method belongs to MyCodeWorker class.
- Parameters:
self – Instance of MyCodeWorker.
- Returns:
None
- Prints:
str: Print statement indicating the execution of algorithm A.
- algo_B()[source]
Prints a message indicating the execution of algo_B method in MyCodeWorker class.
- Parameters:
self – Reference to the instance of MyCodeWorker class.
- Returns:
None
- algo_C()[source]
Prints a message indicating that the algo_C method of MyCodeWorker has been called.
- Parameters:
() (self) – The MyCodeWorker instance on which the method is called.
- Returns:
None
- algo_X()[source]
Perform a specific algorithm X.
This method prints a message indicating the execution of algorithm X.
- Parameters:
self – The object instance.
- Returns:
None
- algo_Y()[source]
Perform algorithm Y.
This method is a part of the MyCodeWorker class.
- Parameters:
self – The instance of the MyCodeWorker class.
- Returns:
None
- algo_Z()[source]
Perform a specific algorithm Z.
This function is part of the MyCodeWorker class.
- Returns:
None
- static build(target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a target Worker.
- Parameters:
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- env = None
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)
Execute tasks in a single process, respecting dependencies, but only for branches assigned to this worker via round-robin.
- exec_multi_process(workers_tree, workers_tree_info)
Execute tasks in multiple threads, distributing branches to workers in round‑robin, then honoring dependencies per worker.
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- get_logs_and_result(*args, verbosity=50, **kwargs)
- get_work(work)[source]
- Parameters:
work (
str
) – contain the worker function name called by AgiWorker.do_work
this is type string and not type function to avoid manager (e.g. My_code) to be dependant of MyCodeWorker :return:
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.
- static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
- Parameters:
app
workers
mode
verbose
args
- Returns:
- start()[source]
Start the function.
This function prints the file name if the ‘verbose’ attribute is greater than 0.
- Parameters:
self – The current instance of the class.
- Returns:
None
- stop()[source]
Stop the current action.
- Raises:
NotImplementedError – This method needs to be implemented in a subclass.
- t0 = None
- topological_sort(dependency_graph)
Perform a topological sort on the dependency graph. Raises ValueError on cycles.
- verbose = 1
- worker = None
- worker_id = None
- works(workers_tree, workers_tree_info)
Run the worker tasks.
