Skip to content

Pipeline

The pipeline is a tool allowing to pipeline services takingtkeir_doc as input in there REST API. This tools is a rest service.

Pipeline API

Pipeline configuration

Example of Configuration:

pipeline.json
{
    "logger": {
        "logging-level": "{{ project.loglevel }}"
    },
    "pipeline": {
        "settings":{
            "strategy":"serial",
            "max-time-loop":-1,
            "max-time-per-task":300,
            "zip-results":true
        },  
        "tasks":[{
                    "task":"converter",
                    "previous-task":"input",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "configuration": "converter.json",                    
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-inputs",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"tokenizer",
                    "previous-task":"converter",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "configuration": "tokenizer.json",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-tokenizer",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {                    
                    "task":"morphosyntax",
                    "previous-task":"tokenizer",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "configuration": "mstagger.json",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-ms",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"ner",
                    "previous-task":"morphosyntax",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "configuration": "nertagger.json",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-ner",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"syntax",
                    "previous-task":"ner",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,                    
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-syntax",
                    "configuration": "syntactic-tagger.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"keywords",
                    "previous-task":"syntax",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-kw",
                    "configuration": "keywords.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"zeroshotclassifier",
                    "previous-task":"keywords",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/output-zsc",
                    "configuration": "zeroshotclassifier.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"sentiment",
                    "previous-task":"zeroshotclassifier",
                    "save-output":false,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/output-sentiment",
                    "configuration": "sentiment.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"summarizer",
                    "previous-task":"sentiment",
                    "save-output":false,
                    "clean-input-folder-after-analysis":false,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/output-summarizer",
                    "configuration": "summarizer.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },

                   {
                    "task":"clusterinfer",
                    "previous-task":"keywords",
                    "save-output":true,
                    "clean-input-folder-after-analysis":true,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-ci",
                    "configuration": "relations.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   },
                   {
                    "task":"index",
                    "previous-task":"clusterinfer",
                    "save-output":true,
                    "clean-input-folder-after-analysis":false,
                    "resources-base-path":"{{ project.path }}/configs/",
                    "input-dir":"{{ project.data }}/raw-inputs",
                    "output-dir":"{{ project.data }}/test-outputs-index",
                    "configuration": "indexing.json",
                    "network":{
                        "use-ssl":false,
                        "no-ssl-verify":true
                    }
                   }
                ],
        "network": {
            "host":"0.0.0.0",
            "port":10006,
            "associate-environment": {
                "host":"PIPELINE_HOST",
                "port":"PIPELINE_PORT"
            }
        },
        "runtime":{
            "request-max-size":100000000,
            "request-buffer-queue-size":100,
            "keep-alive":true,
            "keep-alive-timeout":5,
            "graceful-shutown-timeout":15.0,
            "request-timeout":60,
            "response-timeout":60,
            "workers":1
        }
    }
}

Pipeline is an aggreation of network configuration, serialize configuration, runtime configuration (in field converter), logger (at top level).

The settings of pipeline allows to define a strategy to run the tasks:

  • pipeline/settings/strategy : [serial (the task are run for a given list),monolthic (the task are alredy run), service (the task are run through a service)]
  • pipeline/settings/max-time-loop : max time to run the servic

The pipeline is a chained list of tasks:

  • pipeline/tasks/[task name]/task : task name
  • pipeline/tasks/[task name]/previous-task : previous task name
  • pipeline/tasks/[task name]/save-output : the task output is save or not
  • pipeline/tasks/[task name]/clean-input-folder-after-analysis : does not store data for this task
  • pipeline/tasks/[task name]/resources-base-path : path of resources/configuration file of the task
  • pipeline/tasks/[task name]/configuration : configuration of the task

Configure pipeline logger

Logger is configuration at top level of json in logger field.

Example of Configuration:

logger configuration
{
    "logger": {
        "logging-level": "debug"
    }    
}

The logger fields is:

  • logging-level

It can be set to the following values:

  • debug for the debug level and developper information
  • info for the level of information
  • warning to display only warning and errors
  • error to display only error
  • critical to display only error

Configure pipeline Network

Example of Configuration:

network configuration
{
    "network": {
        "host":"0.0.0.0",
        "port":8080,
        "associate-environment": {
            "host":"HOST_ENVNAME",
            "port":"PORT_ENVNAME"
        },
        "ssl":
        {
            "certificate":"path/to/certificate",
            "key":"path/to/key"
        }
    }
}

The network fields:

  • host : hostname

  • port : port of the service

  • associated-environement : default one. This field is not mandatory.

    • "host" : associated "host" environment variable
    • "port" : associated "port" environment variable
  • ssl : ssl configuration IN PRODUCTION IT IS MANDATORY TO USE CERTIFICATE AND KEY THAT ARE *NOT* SELF SIGNED

  • cert : certificate file

  • key : key file

Configure pipeline runtime

Example of Configuration:

network configuration
{
    "runtime":{
        "request-max-size":100000000,
        "request-buffer-queue-size":100,
        "keep-alive":true,
        "keep-alive-timeout":5,
        "graceful-shutown-timeout":15.0,
        "request-timeout":60,
        "response-timeout":60,
        "workers":1
    }    
}

The Runtime fields:

  • request-max-size : how big a request may be (bytes)

  • request-buffer-queue-size: request streaming buffer queue size

  • request-timeout : how long a request can take to arrive (sec)

  • response-timeout : how long a response can take to process (sec)

  • keep-alive: keep-alive

  • keep-alive-timeout: how long to hold a TCP connection open (sec)

  • graceful-shutdown_timeout : how long to wait to force close non-idle connection (sec)

  • workers : number of workers for the service on a node

  • associated-environement : if one of previous field is on the associated environment variables that allows to replace the default one. This field is not mandatory.

  • request-max-size : overwrite with environement variable

  • request-buffer-queue-size: overwrite with environement variable
  • request-timeout : overwrite with environement variable
  • response-timeout : overwrite with environement variable
  • keep-alive: overwrite with environement variable
  • keep-alive-timeout: overwrite with environement variable
  • graceful-shutdown_timeout : overwrite with environement variable
  • workers : overwrite with environement variable

Pipeline service

To run the command type simply from tkeir directory:

python3 thot/pipeline_svc.py --config=<path to pipeline configuration file>

or if you install tkeir wheel:

tkeir-pipeline-svc --config=<path to pipeline configuration file>

A light client can be run through the command

python3 thot/pipeline_client.py --config=<path to pipeline configuration file> --input=<input directory> --output=<output directory> --loop-time=<time between two get loop> --scheme [http|https] --nsv (not verify ssl)

or if you install tkeir wheel:

tkeir-pipeline-client --config=<path to pipeline configuration file> --input=<input directory> --output=<output directory> --loop-time=<time between two get loop> --scheme [http|https] --nsv (not verify ssl)

Pipeline as batch processing

You can also run the pipeline with a batch function:

python3 thot/batch_ingester.py -c <PATH TO YOU CONFIGURATION FOLDER>/pipeline.json -i <PATH TO DATA FOLDER>/data/tkeir -o <PATH TO DATA FOLDER>/data/tkeir-out

or if you install tkeir wheel:

tkeir-batch-ingester -c <PATH TO YOU CONFIGURATION FOLDER>/pipeline.json -i <PATH TO DATA FOLDER>/data/tkeir -o <PATH TO DATA FOLDER>/data/tkeir-out