A mechanism to orchestrate the Semantic Data Aggregator
(SDA
) life cycle management by an external application or system using the NGSI-LD
API like an interface that allows translating orders from an external orchestrator component to requests to the SDA
and extending the NGSI-LD
data model for that.
This is an approach to orchestrate the state transitions of those NGSI-LD
entities that represent the different stages in the data pipelines and model the actions of the Data Aggregator agents.
To deal with the life cycle management of the data aggregator we define the Agent
NGSI-LD
Entity. This is a parent entity for every life cycle stage of the agent entities in the data pipeline: MetricSource
, TelemetrySource
, MetricTarget
, MetricProcessor
and StreamApplication
entities.
Users or applications can declaratively express the desired state for each agent of the Data Aggregator (collector
, aggregator
and dispatcher
).
The Agent
parent entity has the following properties:
action
: property value set by users to change the agent state.state
: property value updated by theWeaver
during the action triggered by users to indicate the agent state.stateInfo
: optional cross-domain property to describe errors when the processed action fails or feedback information about the agent state.
The information model for the Agent
parent entity is depicted below.
Also, a short version of the state transition diagram for these Agent-type entities is represented (different possible values that the state
property of Agent
entities can take). The transition between states is triggered through the operation determined by the value of the action
property (transition action).
Furthermore, it is important to note that among the desired states for these agent entities (shown in the previous diagram), there are intermediate transition states to determine the possibility or not of changing the state of the entity. These intermediate transition states are BUILDING
and FAILED
. The following is a part of the state transition diagram to give an example of the state changes made for the start and the end of an Agent
entity.
When there is no error in the processing activities to trigger a new state change in the entity (for example, from the RUNNING
state to the CLEANED
state), the BUILDING
state serves as an intermediate transition state. If an error occurs to carry out this change of state, it will go from the BUILDING
state to the FAILED
state, indicating the cause of the error in the stateInfo
property, to finally return to the original state.
- Collection and Dispatch Agents Orchestration
- Aggregation Agents Orchestration
- Endpoint and Data Source context information entities Orchestration
- Final thoughts on SDA orchestration
Collector
and Dispatcher
are two of the types of agents that SDA
orchestrates, managing the life cycle of these agents that works as a NiFi
processing flows using metadata from the information models.
In the following subsections, different sequence diagrams are detailed to show the state transition management of those NGSI-LD
entities that represent the metadata for the collection agents.
The following sequence diagram shows the steps that the SDA
framework follows to allow the collection agent instantiation. This diagram corresponds to the START(0)
transition action of the state transition diagram.
The steps followed by SDA
for the collection agent instantiation are as follows:
-
First of all, the user has to model and create a new
NGSI-LD
collection agent entity (e.g.,MetriSource
orTelemetrySource
entity) inScorpio Broker
to describe collection agent metadata. In the entity the user have to define theaction
property (inherited from theAgent
parent entity) initialized to theSTART
value. -
The entity creation triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the collection agent initialization is being processed. -
Then, the
Weaver
manages the configuration and instantiation of the collection agent with aNiFi
processing flow. -
When
NiFi
successfully instantiates the processor, theWeaver
update thestate
property of the collection agent entity to theRUNNING
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the collection agent has been successfully instantiated.
The previous sequence diagram shows an example of Endpoint
error detection. After creating the collection agent entity and before configuring and instantiating the NiFI
processing flow for the collection agent, Weaver
must check whether or not there is an Endpoint
entity that defines the data source endpoint from which the information is extracted. If this Endpoint
entity does not exist, the collection agent configuration and the processor execution will fail. In that case, the Weaver
will update the state
property of the collection agent entity to the FAILED
value and the stateInfo
cross-domain property with feedback information about the failed operation. Finally, the user is notified that the collection agent could not be instantiated.
The previous sequence diagram shows the steps that the framework follows to allow the collection agent upgrade. This diagram corresponds to the START(1)
or the START(3)
transition action of the state transition diagram. The steps are the following:
-
First of all, user has to update the previously created
NGSI-LD
collection agent entity (e.g.,MetriSource
orTelemetrySource
entity) inScorpio Broker
. To do this, theaction
property must be updated to theSTART
value. -
The entity upgrade triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the collection agent upgrade is being processed. -
Then, the
Weaver
manages the configuration and upgrade of the collection agent with aNiFi
processing flow. -
When
NiFi
successfully upgrades the processor, theWeaver
update thestate
property of the collection agent entity to theRUNNING
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the collection agent has been successfully updated.
The following sequence diagram shows the steps that the SDA
framework follows to allow the collection agent deployment (deploy stopped agent). This diagram corresponds to the STOP(0)
transition action of the state transition diagram.
The steps followed by SDA
for the collection agent deployment are as follows:
-
First of all, the user has to model and create a new
NGSI-LD
collection agent entity (e.g.,MetriSource
orTelemetrySource
entity) inScorpio Broker
to describe collection agent metadata. In the entity the user has to define theaction
property (inherited from theAgent
parent entity) initialized to theSTOP
value. -
The entity creation triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the collection agent deployment is being processed. -
Then, the
Weaver
manages the configuration and creation of the collection agent with aNiFi
processing flow. TheNiFi
flow will be deployed but will not be started. -
When
NiFi
successfully deploys the processor, theWeaver
update thestate
property of the agent entity to theRUNNING
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the collection agent has been successfully deployed and stopped.
The previous sequence diagram shows the steps that the framework follows to allow the collection agent to stop. This diagram corresponds to the STOP(2)
transition action of the state transition diagram. The steps are the following:
-
First of all, user has to update the previously created
NGSI-LD
collection agent entity (e.g.,MetriSource
orTelemetrySource
entity) inScorpio Broker
. To do this, theaction
property must be updated to theSTOP
value. -
The entity upgrade triggers a notification to the
Weaver
component. The Weaver then update the entity with thestate
property initialized to theBUILDING
value while the collection agent stop is being processed. -
Then, the
Weaver
manages the stop of the collection agent with aNiFi
processing flow. -
When
NiFi
successfully stops the processor, theWeaver
update thestate
property of the agent entity to theSTOPPED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the collection agent has been successfully stopped.
The following sequence diagram shows the steps that the SDA
framework follows to allow the collection agent deletion. This diagram corresponds to the END(4)
or END(5)
transition action of the state transition diagram.
The steps followed by SDA
for the collection agent deletion are as follows:
-
First of all, user has to update the previously created
NGSI-LD
collection agent entity (e.g.,MetriSource
orTelemetrySource
entity) inScorpio Broker
. To do this, theaction
property must be updated to theEND
value. -
The entity upgrade triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the collection agent deletion is being processed. -
Then, the
Weaver
manages the deletion of the collection agent with aNiFi
processing flow. -
When
NiFi
successfully deletes the processor, theWeaver
update thestate
property of the agent entity to theCLEANED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the collection agent has been successfully deleted.
Aggregator
is the other type of agent that SDA
orchestrates, managing the life cycle of this agents that works as a stream processing applications running on the Flink
engine using metadata from the information models.
The aggregation agents are in charge of managing both the upload of stream processing applications (defined as JAR files) and the execution of their instances (defined as Job instances) within the Flink
engine.
For more information on how SDA
internally manages the uploading and execution of stream processing applications, see Stream Processing Applications Management
.
In the following subsections, different sequence diagrams are detailed to show the state transition management of those NGSI-LD
entities (i.e., StreamApplication
and MetricProcessor
entities) that represent the metadata for the aggregation agents.
It should be noted that to orchestrate the upload of stream application JARs to the Flink
engine, and allow SDA
to manage the life cycle of NGSI-LD
StreamApplication
entities that contain context information necessary for this operation, a new UPLOADED
value is defined for the state
property of the Agent
parent entity.
SDA
is responsible for orchestrating the creation and deletion of these StreamApplication
entities and determining the availability of the associated JAR files, as well as uploading them to the stream processing engine (Flink
).
Then, the state transition diagram for these StreamApplication
entities is depicted.
The UPLOADED
state determines that the JAR file of the stream processing application was available and has been uploaded to the Flink
engine. In case of failure (for example, the JAR file was not found), the state
property of the StreamApplication
will be changed to FAILED
and the cause of failure will be notified (from the stateInfo
cross-domain property).
The following sequence diagram shows the steps that the SDA
framework follows to allow uploading a stream processing application JAR to the aggregation agent (Flink
engine).
The steps followed by SDA
for the stream application JAR upload are as follows:
-
First of all, the user has to model and create a new
NGSI-LD
StreamApplication
entity inScorpio Broker
to describe stream processing application metadata. In the entity the user has to define theaction
property (inherited from theAgent
parent entity) initialized to theSTART
value. -
The entity creation triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the upload of the streaming application is being processed. -
Then, the
Weaver
manages the upload of the application JAR to theFlink
engine. -
When the application has been successfully uploaded to the
Flink
engine, theWeaver
update thestate
property of theStreamApplication
entity to theUPLOADED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the stream processing application has been successfully uploaded.
The previous sequence diagram shows the steps that the framework follows to allow deleting a stream processing application JAR from the aggregation agent. The steps are the following:
-
First of all, the user has to update the previously created
NGSI-LD
StreamApplication
entity inScorpio Broker
. To do this, theaction
property must be updated to theEND
value. -
The entity upgrade triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the stream application deletion is being processed. -
Then, the
Weaver
manages the deletion of the stream application JAR from theFlink
engine. -
When the application has been successfully deleted from the
Flink
engine, theWeaver
update thestate
property of the agent entity to theCLEANED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the stream processing application has been successfully deleted.
The previous sequence diagram shows the steps that the SDA
framework follows to allow the Job instance submission to the aggregation agent. This diagram corresponds to the START(0)
transition action of the state transition diagram.
The steps followed by SDA
for submitting the Job instance are as follows:
-
First of all, the user has to model and create a new
NGSI-LD
MetricProcessor
entity inScorpio Broker
to describe stream processing Job metadata. In the entity we have to define theaction
property (inherited from theAgent
parent entity) initialized to theSTART
value. -
The entity creation triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the submit of the Job instance is being processed. -
Then, the
Weaver
manages the submit of the Job instance to theFlink
engine. -
When the Job has been successfully submitted to the
Flink
engine, theWeaver
update thestate
property of the agent entity to theRUNNING
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the Job instance has been successfully submitted and is running.
The previous sequence diagram shows an example of StreamApplication
error detection. After creating the MetricProcessor
entity and before configuring and submitting the Job instance to the aggregation agent (Flink
engine), Weaver
must check whether or not there is a StreamApplication
entity that identifies the JAR of the stream application to run on the Flink
engine. If this StreamApplication
entity does not exist, the aggregation agent configuration and the Job execution will fail. In that case, the Weaver
will update the state
property of the agent entity to the FAILED
value and the stateInfo
cross-domain property with feedback information about the failed operation. Finally, the user is notified that the Job instance could not be submitted and executed in the Flink
engine.
The previous sequence diagram shows the steps that the framework follows to allow the Job instance upgrade to the aggregation agent. This diagram corresponds to the START(1)
or START(3)
transition action of the state transition diagram. The steps are the following:
-
First of all, the user has to update the previously created
NGSI-LD
MetricProcessor
entity inScorpio Broker
. To do this, theaction
property must be updated to theSTART
value. -
The entity upgrade triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the Job instance upgrade is being processed. -
Then, the
Weaver
manages the upgrade of the Job instance to theFlink
engine. -
When the Job has successfully updated from the
Flink
engine, theWeaver
update thestate
property of the agent entity to theRUNNING
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the Job instance has been successfully updated and is running.
The previous sequence diagram shows the steps that the SDA
framework follows to allow the Job definition for the aggregation agent. This diagram corresponds to the STOP(0)
transition action of the state transition diagram.
The steps followed by SDA
for the Job instance definition are as follows:
-
First of all, the user has to model and create a new
NGSI-LD
MetricProcessor
entity inScorpio Broker
to describe stream processing Job metadata. In the entity we have to define theaction
property (inherited from theAgent
parent entity) initialized to theSTOP
value. -
The entity creation triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the definition of the Job instance is being processed. -
Then, the
Weaver
manages the Job instance definition. That is, th Job instance will be defined but will not be submitted to theFlink
engine. -
When the Job has successfully defined, the
Weaver
update thestate
property of the agent entity to theSTOPPED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the Job instance has been successfully defined and is stopped.
The previous sequence diagram shows the steps that the framework follows to allow stopping the Job instance from the aggregation agent. This diagram corresponds to the STOP(2)
transition action of the state transition diagram. The steps are the following:
-
First of all, the user has to update the previously created
NGSI-LD
MetricProcessor
entity inScorpio Broker
. To do this, theaction
property must be updated to theSTOP
value. -
The entity upgrade triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the Job instance stop is being processed. -
Then, the
Weaver
manages the stop of the Job instance from theFlink
engine. -
When the Job has successfully stopped from the
Flink
engine, theWeaver
update thestate
property of the agent entity to theSTOPPED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the Job instance has been successfully cancelled and stopped.
The previous sequence diagram shows the steps that the SDA
framework follows to allow deleting the Job instance from the aggregation agent. This diagram corresponds to the END(4)
or END(5)
transition action of the state transition diagram.
The steps followed by SDA
for the Job instance deletion are as follows:
-
First of all, user has to update the previously created
NGSI-LD
MetricProcessor
entity inScorpio Broker
. To do this, theaction
property must be updated to theEND
value. -
The entity upgrade triggers a notification to the
Weaver
component. TheWeaver
then update the entity with thestate
property initialized to theBUILDING
value while the Job instance deletion is being processed. -
Then, the
Weaver
manages the deletion of the Job instance from theFlink
engine. -
When the Job has successfully deleted from the
Flink
engine, theWeaver
update thestate
property of the agent entity to theCLEANED
value and thestateInfo
cross-domain property with feedback information about the successfully operation. -
Finally, the user is notified that the Job instance has been successfully cancelled and deleted.
In addition to orchestrating through state changes the agents that allow managing the collection and delivery of data (from NiFi
processing flows), as well as data aggregation (from Flink
stream processing applications), SDA
must manage the life cycle of those NGSI-LD
entities that define the context information for the input data sources and their endpoint services.
So far there are two types of data source entities defined:
Prometheus
: Prometheus-based data source context information entities for metric collection.Device
: Telemetry-based data source context information entities for telemetry data collection.
In addition, Endpoint
type entities allow you to define the connection point with the data sources.
SDA
is responsible for orchestrating the creation and deletion of these data source and endpoint entities and determining their availability.
Then, a short version of the state transition diagram for these data sources and endpoint entities is depicted.
Two states are defined to determine whether the entities are enabled or not (ENABLED
and DISABLED
). The START
action allows entities to be created and enabled, while the END
action allows them to be deleted and disabled.
When data source entities are created (e.g., Prometheus
or Device
entities), it is determined whether they are reachable from their associated endpoint service (related Endpoint
entity). In case the data source is available and accessible from its endpoint, it will be enabled (the state
property will change to ENABLED
). Otherwise, the state
property of this entity will be changed to FAILED
and the cause of failure will be notified (from the stateInfo
cross-domain property)
Finally, the full version of the state transition diagram is presented depending on the associated entity domain. In addition, the transition actions carried out by Weaver
to manage the change of states are detailed.
NGSI-LD API Orchestrator
Postman collection has a set of requests that can be used to model a full NGSI-LD
datapipeline for data aggregation and orchestrate the life cycle of the entities involved in it, based on the NGSI-LD
API.