IoT Stream Analytics practices

Damir Dobric Posts

Next talks:

 

    

Follow me on Twitter: #ddobric



 

 

Archives


Stream Analytics is probably one of consisted parts of a typical IoT solution. When huge number of devices produce telemetry data, some component will have to analyze them. In a typical IoT solution the reference architecture considers almost always ingest of data in some kind of temporary storage. AT the moment of writing of this article, Azure Stream Analytics supports 3 type of storage, which can be used for stream processing: Blob Storage, Event Hub and IoTHub. This is shown at the picture below.
Devices sending telemetry data (Events) to any of 3 named storages.
image

Stream Analytics provides an option to register so called Data-Input –Source.

image

Once the stream is defined, we can start analyzing it. As shown at the first picture Azure Stream Analytics (later ASA)  consists of 3 components. First one (on left) is input data stream as described on previous picture. You must define a single data source which will be used for analysis. As next we have to define an output. This is the port where result data of analyzing will be streamed out. Currently there are more possible outputs as inputs, as shown at the next picture. Notice, that sources used for input as almost all already know technologies.

image

If we for example use SQL, then we can do with that data anything possible in context of SQL server. Notice, that EventHub and Blob Storage can be used as output too. This is because, output of one ASA stream can be an input for next ASA job. By using this technic, we can build chain of parallel processing ASA jobs. Interestingly IoTHub is not included in the is of outputs. This is reasonable, because IoTHub is built on top of EventHub. When processing stream ASA doe not know anything about devices and device registry. In other words ASA dos not need IoTHub instead of event hub. Sending data into EventHub or IoTHub does not make any difference for ASA. This is why IoTHub is not in there.

As last component of an ASA job is ‘QUERY’. Simplified, ASA job is defined by Input Stream, Query and Output Stream, where data flow is strictly defined:

Input Stream –> Query –> Output Stream

The actual processing magic should be implemented in the query. Following URL shows query reference documentation and common query patterns.

Copy Stream

Let’s start with first example. Following stream does not perform any analytics. It simply copy date from input stream to output stream. You may ask yourself why is this needed at all if it does not do anything? Such copy statements as helpful to copy data to appropriate output. For example Microsoft provide APIs for almost all hardware platforms to ingest data in the event hub. Statement below would copy that data in for example SQL database. That means you can use ASA to to simply stream data to the right destination.

select *
INTO myoutputstream
from myinputstream;

Analyzing stream with Sliding Window

As next, let’s do some analytics. Following statement is calculating average temperature of device in sliding window of 5 seconds. That means, we are not calculating average temperature on some historical data as typical BI solution would do it.

SELECT
    DateAdd(second,-5,System.TimeStamp) as WinStartTime,
    system.TimeStamp as WinEndTime,
    DeviceId,
    Avg(Temperature) as AvgTemperature,
    Count(*) as EventCount
INTO myoutputstream
FROM myinputstream
GROUP BY TumblingWindow(second, 5), DeviceId;

If we pass result of calculation to Power BI output, we can build classical Power BI solution on top of streaming data result as
following picture shows.

image image

In this example we used sliding window of 5 seconds as an example, but we could use any other meaningful value. Please note, that, when working with streams, we do not necessary need to store all data. However be careful when making this decision. In most cases it is required to keep original data for later processing, for the case that you find out new use cases, which can leverage machine learning. Processing streams has advantage of processing of hot data, without of relation to any historical data. More over ASA provide parallel job processing of the query, which is not possible with common BI approach.

Copy and Analyse

The good technic is to copy all data before processing for later use and then to process data on the hot stream. Following example is combination of previous two examples.

select *
INTO rawdata
from myinputstream;

SELECT
    DateAdd(second,-5,System.TimeStamp) as WinStartTime,
    system.TimeStamp as WinEndTime,
    DeviceId,
    Avg(Temperature) as AvgTemperature,
    Count(*) as EventCount
INTO averagetempoutput
FROM myinputstream
GROUP BY TumblingWindow(second, 5), DeviceId;

As you see in example above, queries can be processes sequentially after each other. Note that parallel processing is still in place.

Cold and Hot Path analysis

ASA provides very high flexibility, when it comes to changing of use cases. Originally we started with copying of data (first statement in following example.) This is usually called cold-path.
It is a copy of raw data typically used for big data processing or machine learning. Then we figured out, that we can start analyzing of critical (2) events only. At same later time we decided to focus on specific range (3) of devices , which might produce critical events.

And finally (4) we define critical devices (hot path), when in 120 seconds more than 3 alerts are noticed.

WITH CriticalEvents AS
(
   
SELECT
        *
    FROM
        inputalldevices
    WHERE
        Temperature > 98
)


// (1) Copy all data
SELECT
* INTO rawdata FROM inputalldevices

// (2) Copy only critical events
SELECT
* INTO allCritical FROM CriticalEvents


// (3) Copy only critical events for devices in id range 200-500.
SELECT

* INTO criticalRetail200500 FROM CriticalEvents
WHERE DeviceId < 500 AND DeviceId > 200


// (4) Look for 3 critical events in 120 second thumbling window
SELECT
   DateAdd
(second,-5,System.TimeStamp) as WinStartTime,
  
system.TimeStamp as WinEndTime, DeviceId
,

MAX(Temperature) as MaxTemperature,
   Count(*) as
EventCount

INTO hotevents
FROM CriticalEvents

GROUP BY TumblingWindow(second, 120), DeviceId

HAVING [EventCount] >=3


Please note, whatever our statement looks like and when ever we need to change it, it is simply change of the query statement. Right now, changing of statement requires stopping of event stream processing. If you do not want to stop processing, you will have to create a new ASA job, with the input, which connects to different consumer group of event hub, assuming that event hub is used for processing of event, which is usual case.


Posted Dec 08 2015, 08:37 AM by Damir Dobric
developers.de is a .Net Community Blog powered by daenet GmbH.