BizMicroservices

BizMicroService is the orchestration of TechMicroServices. This orchestration is defined with the Airflow plateform.

Notice : We recomand to create first a directory service as described in the directory Samples.

DAG

The definition of the DAG, the BizMicroService, is done by thru biz decorator, which is simply a renaming of the dag decorator of Airflow.

Notice : It seems stupid to just rename a decorator, but we have in mind to use this decorator in future for creating relation dependencies between microservices.

from coworks.biz import biz

DEFAULT_ARGS = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'email': "gdoumenc@neorezo.io",
    'email_on_failure': True,
    'email_on_retry': False,
}


@biz(
    default_args=DEFAULT_ARGS,
    tags=['coworks', 'sample'],
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    catchup=False,
    render_template_as_native_obj=True,
)
def my_first_biz(data):
    ...

Operators

TechMicroServiceOperator

This first operator is for calling a TechMicroService.

create_invoice = TechMicroServiceOperator(
    task_id='create_invoice',
    cws_name='neorezo-billing_invoice-eshop',
    method='POST',
    entry="/invoice",
    json="{{ dag_run.conf['data'] }}",
)

The arguments are :

  • cws_name : allows to call the microservice by its name thanks to the directory service,

  • method and entry : route to the service,

  • data or json :service parameters for GET and POST method respectively.

Other main arguments are needed to be understood :

  • directory_conn_id: the airflow connection id used to call the directory microservice. By default ‘coworks_directory’.

  • asynchronous: Asynchronous status. By default ‘False’.

cws_name, entry, data, json, asynchronous arguments are templated.

If you don’t want to use the directory microservice:

create_invoice = TechMicroServiceOperator(
    task_id='create_invoice',
    api_id='xxxxx',
    stage='v1',
    token='yyyy',
    method='POST',
    entry="/invoice",
    json="{{ dag_run.conf['data'] }}",
)

BranchTechMicroServiceOperator

This branching operator allows to test microservice status code or result content

check_invoice = BranchTechMicroServiceOperator(
    task_id='check_invoice',
    cws_task_id='neorezo-billing_invoice-eshop',
    on_success = "sent_to_customer"
    on_failure = "mail_error"
)

The arguments are :

  • cws_task_id : calling task id used to retrieve XCOM values,

  • on_success : branch task id on success,

  • on_failure :branch task id on failure.

Sensors

This sensor is defined to wait until an asynchronous call is finished.

await_invoice = AsyncTechMicroServiceSensor(
    task_id='await_invoice',
    cws_task_id='neorezo-billing_invoice-eshop',
)

This sensor will await the microservice billing_invoice-eshop will terminate its asynchronous execution.

The arguments are :

  • cws_task_id : the microserrvice call task awaited,

Other main arguments are needed to be understood :

  • aws_conn_id : the airflow connection id used to observe S3 result. By default ‘aws_s3’.

Asynchronous task

The sequence of a calling task, a waiting task and a reading result task for an asynchronous call is done by:

invoice = TechMicroServiceAsyncGroup(
    'invoice',
    cws_name='neorezo-billing_invoice-eshop',
    method='POST',
    entry="/invoice",
    json="{{ dag_run.conf['data'] }}",
)

The result is then accessible in invoice.output in python code, or thru the invoice.read task id:

invoice >> send_mail(invoice.output)

or:

ti.xcom_pull(task_ids='invoice.read')