Skip to content

how_to_write_a_flow

Justin Joyce edited this page Jul 15, 2021 · 5 revisions

How to: Write a Flow

What is a Flow?

A flow is a set of instructions which describe how to process data. Flows describe a pipeline of steps that will be called, in turn, to perform activities on data.

Flows are Directed Acyclic Graphs (DAG).

What is an Operator?

Operators are steps in flows which describe an action to perform on data.

mabel cames with a number Operators:

As well as a set of Operators for writing data to different datastore platforms.

How do I Write an Operator?

There are two ways to write an Operator, using the @operator decorator, or by writing a class which inherits from the BaseOperator class.

The @operator decorator has a number of constraints that limit it's options for usage. The decorated function should accept one value and return one value.

@operator
def hash_value(value):
    return hash(value)

Using the BaseOperator gives more options for customizations. As well as the the payload processing function (the same as the @operator decorator) - the __init__, finalize and read_sensors methods can be overridden. The execute method must be overriden, and unlike the @operator decorator, the execute method accepts two values, the data to be processed and a dictionary containing contextual data. The return should be one of:

  • a tuple of (data, context)
  • a list (or generator) of tuples of (data, context)
  • None - which indicates flow termination

How do I Build a Flow?

A flow is built by linking Operators, the order of the Operators describes the journey data will take through the pipeline.

There are two ways to build flows, the easiest is to use the shorthand >> instruction to link Operators together.

OperatorOne() >> OperatorTwo() >> EndOperator()

There are some rules that flows must follow:

  • All flows paths must end with an EndOperator
  • Flows must have more than one step
  • Flows must by 'acyclic', that is flow cannot have any loops
  • All steps must be connected into a single flow

Using the >> instruction to build your flow will ensure the last three rules are met however, you must ensure the EndOperator is added to the end of the flow.

The basis of these rules is that Flows are DAGs, and therefore must be Directed (entry and exit points), Acyclic (no loops), and a Graph (all steps connected).

How do I Run a Flow?

Flows are run using a flow_runner, these are created using the with statement:

flow = StepOne() >> EndOperator()
with flow as runner:
    runner(data, context)

This creates a flow_runner (called runner) to execute the flow, calling the runner with data and a context runs the flow for that data.

Advanced Topics

Flow Branching

Flows can branch so that at a point in the flow the data will follow multiple paths. The >> instruction can direct data into a list of operators like this:

OperatorOne() > [BranchOne() > EndOperator(), BranchTwo() > EndOperator()]

This creates a flow that looks like this:

OperatorOne()
  |--- BranchOne()
  |     \--- EndOperator()
  \--- BranchTwo()
        \--- EndOperator()

In this example, OperatorOne is called, the data is then passed to BranchOne, when then passes to the EndOperator (all paths must conclude with an EndOperator). The same data that was passed the BranchOne is passed to BranchTwo, who will then pass to the second EndOperator (all paths must conclude with an EndOperator)

Building Flows Manually

The >> instruction suits most situations and helps to ensure flows are built correctly, but if required flows can be created, or amended, manually.

Three functions exist in mabel.flows.Flow to create or update flows - there are no functions available to edit or remove existing Operators or Links.

  • add_operator(name, operator) adds an operator of type operator, and calls it name in the flow.
  • link_operators(source_operator, target_operator) creates a link between two operators.
  • merge(assimilatee) adds the operators and links from one flow to another.

Manually creating or updating may result in invalid flows, some efforts are made when creating flow_runner to ensure the flow is valid.

Clone this wiki locally