Ensure you have completed all the steps in the following documents before continuing:
You should still be in the Dataflow Editor interface from the previous chapter.
Data > downstream > ParStream-Import
ParStream-Import and select Add Database
historian
localhost
9042
HIGH
true
Invoke

Data > downstream > dataflow
We are creating these next dataflows on the remote broker, which is where the ParStream database resides and where the ParStream connection is configured above. This makes sense given the main purpose of the remote broker is to aggregate the data from local brokers.
dataflow and select Create Dataflow
save-all-control-data and click Invoke
... > dataflow > save-all-control-data
save-all-control-data dataflow in the right-hand editor workspace by clicking on its dataflow icon
Blocks section, drag Table Operations > JSON Parser to the editor workspace Dataflow section
jsonParser to parsed-control-data
... > downstream > DQL
DQL and drag Query to the editor workspace Dataflow section

query to get-all-control-data
get-all-control-data and edit the following values in the right-hand Properties section:
30
list brokers | sublist /data/control/? | subscribe value.timestamp as timestamp, value
This is Distributed Query Language (DQL), which provides a powerful, flexible way to extract published data throughout all the brokers within an EFM deployment. This particular DQL query gets any
controldata published by any broker the remote broker knows about. In this lab environment, the only other broker is the local broker you configured previously.
Invoke button associated with the invoke field
Table button associated with the output field
value column to the parsed-control-data input field

If you don’t see any data in the table, actuate the Sense HAT joystick to have the
publish-local-controldataflow on the local broker send data.
... > downstream > ParStream-Import > historian > Tables > control_data
If you don’t see the
control_datatable listed, right-click onhistorianand selectIntrospectand then check again.
control_data and drag Insert Row to the editor workspace Dataflow section

insertRow to save-control-data
save-control-data and edit the following values in the right-hand Properties section:
true
get-all-control-data and click the Table button associated with the output field in the Properties section
timestamp column to the save-crash-data ts field

parsed-control-data and click the Table button associated with the output field in the Properties section
save-control-data
id value to the save-control-data station field
direction value to the save-control-data direction field
Since we are filtering on our local broker to only publish
released(i.e. completely pressed) joystick control actions we don’t need to store this column in thecontrol_dataParStream table.

You will fully confirm whether this dataflow is working correctly when you configure a dashboard in the next chapter. For now you should see the
save-control-datafields update when you actuate the Sense HAT and see a value ofINSERTEDfor theStatusfield in itsPropertiessection.
This dataflow saves all control data into the ParStream database to enable a use case where all data is considered operationally important. The next dataflow will show how to perform a moving average for cases where data trends are more important than the individual data points.
save-all-control-data dataflow by clicking the x icon next to its name above the editor workspace
Data > downstream > dataflow
dataflow and select Create Dataflow
save-sampled-sensor-data and click Invoke
... > dataflow > save-sampled-sensor-data
save-sampled-sensor-data dataflow in the right-hand editor workspace by clicking on its dataflow icon
Blocks section, drag Table Operations > JSON Parser to the editor workspace Dataflow section
jsonParser to parsed-sensor-data
... > downstream > DQL
DQL and drag Query to the editor workspace Dataflow section
query to get-all-sensor-data
get-all-sensor-data and edit the following values in the right-hand Properties section:
30
list brokers | sublist /data/sensor/? | subscribe
This DQL query gets any
sensordata published by any broker the remote broker knows about.
Invoke button associated with the invoke field
Table button associated with the output field
value column to the parsed-sensor-data input field

You should see this table data update every 3 seconds.
Blocks section, drag Table Operations > Realtime Recorder to the editor workspace Dataflow section
A
Realtime Recorderblock builds a table of data from individual data points over time.
realtimeRecorder to buffered-sensor-data
buffered-sensor-data block (3) times to add 3 more name and value fields
name 0 field in the Properties section
Pinned box
value 0 field in the Properties section
Pinned box
Properties section:
60
id
temp_c
press_hpa
humidity
parsed-sensor-data and click the Table button associated with the output field in the Properties section
buffered-sensor-data
id value to the buffered-sensor-data value 0 field
temp_c value to the buffered-sensor-data value 1 field
press_hpa value to the buffered-sensor-data value 2 field
humidity value to the buffered-sensor-data value 3 field

If you select the
buffered-sensor-datablock and click theTablebutton associated with theoutputfield in itsPropertiessection you will see that it is generating a table with 60 seconds of rolling sensor data. This table is the data source for the 60-second moving average that will ultimately end up saved into ParStream. This is an example of how EFM can intelligently sample data to drive insight and efficiency or otherwise enforce a customer’s desired operational policies.
Blocks section, drag Tables Operations > Aggregation to the editor workspace Dataflow section
An
Aggregationblock aggregates columns of data into individual data points based on the specified operation (e.g. average, median, maximum value, last value, etc.)
tableAggregation to last-ts
last-ts and edit the following values in the right-hand Properties section:
timestamp
Last
output field to the last-ts input field
Blocks, drag Date Time Operations > Date Math to the editor workspace Dataflow section
A
Date Mathblock allows for adding and subtracting time periods from dates, however in this case it is used to normalize the timestamp into UTC, which automatically happens just by passing the timestamp “through” the block.
dateMath to ts-to-utc
ts-to-utc and unpin the following fields by right-clicking on the blue dot and unchecking the Pinned box:
This will make the block much smaller and easier to position in the editor workspace by only exposing the needed
inputandoutputfields.
output field to the ts-to-utc input field

Blocks section, drag Table Operations > Aggregation to the editor workspace Dataflow section
tableAggregation1 to avg-temp_c
avg-temp_c and edit the following values in the right-hand Properties section:
temp_c
Average
output field to the avg-temp_c input field
Blocks section, drag Table Operations > Aggregation to the editor workspace Dataflow section
tableAggregation2 to avg-press_hpa
avg-press_hpa and edit the following values in the right-hand Properties section:
press_hpa
Average
output field to the avg-press_hpa input field
Blocks section, drag Table Operations > Aggregation to the editor workspace Dataflow section
tableAggregation3 to avg-humidity
avg-humidity and edit the following values in the right-hand Properties section:
humidity
Average
output field to the avg-humidity input field

... > downstream > ParStream-Import > historian > Tables > sensor_data
sensor_data and drag Insert Row to the editor workspace Dataflow section
insertRow to save-sensor-data
save-sensor-data and edit the following values in the right-hand Properties section:
60
output field to the save-sensor-data ts field
value 0 field to the save-sensor-data station field
output field to the save-sensor-data temp_c field
output field the save-sensor-data press_hpa field
output field to the save-sensor-data humidity field

As before, you will fully confirm whether this dataflow is working correctly when you configure the dashboard in the next chapter. For now you should see the averaged
save-sensor-datafields update every 3 seconds (the ParStream INSERT itself only happens every 60 seconds per theintervalfield) and see a value ofINSERTEDfor theStatusfield in itsPropertiessection.
save-sampled-sensor-data dataflow by clicking the x icon next to its name above the editor workspace