流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图1

    典型的物联网架构中,有实时数据分析的需求,在Azure中,流分析(stream analytics)就是这样的服务,它可以存在云中或者部署到边缘设备上。

    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图2
    流分析的基本概念:

    流分析基础.mp4 (24.35MB) 实战案例:
    对物联网传感器数据进行实时分析,每30秒监视一次传感器上报的平均温度,高于100度时报警,报警的方式是将报警的传感器及其温度值直接写入到Sql DB。
    实战内容:

    流分析实战.mp4 (98.94MB) 云中的实时流式处理
    •为物联网解决方案执行实时分析
    •每秒对数百万事件进行流式处理
    •获得关键任务的可靠性和性能预测结果
    •利用设备和应用程序的数据,创建实时仪表板和警报
    •跨多个数据流进行关联
    •使用常见的基于 SQL 的语言,以实现快速开发

    比如我们要使用流分析完成如下的需求:
    每30秒监视一次传感器上报的平均温度,高于100度时报警。通常情况下,我们将流分析部署于云端,在IoT Hub之后,从IoT Hub接收设备数据进行在线分析。

    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图5
    在这个场景中,如果我们的传感器有成千上万,每个传感器上传的数据大部分又都是低于100°C的,这些数据实际上对我们来说,没有意义,所以我们还会期待,在设备侧直接进行分析,然后讲分析过滤后的数据,上传到IoT Hub,一来降低数据传输的成本,二来对数据进行过滤,保证数据质量,这也就是流分析的第二种部署方式:部署到边缘
    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图6
    下面介绍流分析的几个重要概念:
    1.输入:目前支持Azure EventHub事件中心,Azure IoT Hub, Azure Blob 存储三个源。

    输入分两种类型:
    将数据推送到数据源后,流分析作业就可使用该数据并对其进行实时处理。 输入分为两种类型:流输入(stream inputs)和引用输入(referenceinputs)。

    • 流输入是指:数据流是一段时间内不受限制的事件序列。 流分析作业必须至少包含一个数据流输入。 事件中心、IoT 中心和 Blob 存储均可作为数据流输入源。
    • 引用输入是指:引用数据是完全静态的或更改缓慢。 它通常用于执行关联和查找。 例如,可以将数据流输入中的数据联接到引用数据中的数据,就像执行SQL 联接以查找静态值一样。 当前支持将Azure Blob 存储和 Azure SQL 数据库作为参考数据的输入源。

    2.输出:
    SQL Database,Blob Storage, Event Hub, Table Storage, Service Bus Queues, Service Bus Topics,Cosmos DB等等。
    具体请参见 :
    https://docs.azure.cn/zh-cn/stream-analytics/stream-analytics-define-outputs
    3. 查询:
    查询是使用类似SQL的语法进行数据过滤和计算。支持4种窗口函数:
    翻转开窗函数用于将数据流划分成不同的时间段并对其执行某个函数,如以下示例所示。 翻转窗口的主要差异在于它们会重复,不重叠,并且一个事件不能属于多个翻转窗口。
    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图7

    跳跃开窗函数在一段固定的时间内向前跳跃。 人们往往将此类窗口视为可以重叠的翻转窗口,因此一个事件可以属于多个跳跃窗口结果集。 要使跳跃窗口与翻转窗口相同,需将跃点大小指定为与窗口大小相同。
    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图8
    不同于翻转或跳跃窗口,滑动开窗函数在事件发生时生成输出。 每个窗口至少有一个事件,并且窗口持续根据€ (epsilon) 向前移动。 与跳跃窗口一样,事件可以属于多个滑动窗口。
    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图9
    会话窗口函数对差不多同时到达的事件进行分组,筛选出没有数据的时间段。 它具有三个主要参数:超时、最长持续时间和分区键(可选)。
    流分析 Stream Analytics-实时数据流式处理,可处理来自数百万台 IoT 设备的数据 - 图10

    实战内容:请参照本文开头的视频。
    实战中用到的修改后的代码:

    1. import random
    2. import time
    3. import sys
    4. # Using the Python Device SDK for IoT Hub:
    5. # https://github.com/Azure/azure-iot-sdk-python
    6. # The sample connects to a device-specific MQTT endpoint on your IoT Hub.
    7. import iothub_client
    8. # pylint: disable=E0611
    9. from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientResult
    10. from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue
    11. # The device connection string to authenticate the device with your IoT hub.
    12. # Using the Azure CLI:
    13. # az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyNodeDevice --output table
    14. CONNECTION_STRING = "HostName=iothubforsatest.azure-devices.cn;DeviceId=test0002;SharedAccessKey=vnYkfQ4znJqVow9ZeBsooyj5kYeJs96etpcUoQI/FwQ="
    15. # Using the MQTT protocol.
    16. PROTOCOL = IoTHubTransportProvider.MQTT
    17. MESSAGE_TIMEOUT = 10000
    18. # Define the JSON message to send to IoT Hub.
    19. TEMPERATURE = 0.0
    20. HUMIDITY = 60
    21. MSG_TXT = "{\"temperature\": %.2f,\"humidity\": %.2f,\"deviceid\": 'test0002'}"
    22. def send_confirmation_callback(message, result, user_context):
    23. print ( "IoT Hub responded to message with status: %s" % (result) )
    24. def iothub_client_init():
    25. # Create an IoT Hub client
    26. client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
    27. return client
    28. def iothub_client_telemetry_sample_run():
    29. try:
    30. client = iothub_client_init()
    31. print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )
    32. while True:
    33. # Build the message with simulated telemetry values.
    34. temperature = TEMPERATURE + (random.random() * 15)
    35. humidity = HUMIDITY + (random.random() * 20)
    36. msg_txt_formatted = MSG_TXT % (temperature, humidity)
    37. message = IoTHubMessage(msg_txt_formatted)
    38. # Add a custom application property to the message.
    39. # An IoT hub can filter on these properties without access to the message body.
    40. prop_map = message.properties()
    41. if temperature > 30:
    42. prop_map.add("temperatureAlert", "true")
    43. else:
    44. prop_map.add("temperatureAlert", "false")
    45. # Send the message.
    46. print( "Sending message: %s" % message.get_string() )
    47. client.send_event_async(message, send_confirmation_callback, None)
    48. time.sleep(3)
    49. except IoTHubError as iothub_error:
    50. print ( "Unexpected error %s from IoTHub" % iothub_error )
    51. return
    52. except KeyboardInterrupt:
    53. print ( "IoTHubClient sample stopped" )
    54. if __name__ == '__main__':
    55. print ( "IoT Hub Quickstart #1 - Simulated device" )
    56. print ( "Press Ctrl-C to exit" )
    57. iothub_client_telemetry_sample_run()

    示例的查询:

    1. select System.Timestamp AS OutPutTime,
    2. deviceid AS DeviceName,
    3. Avg(temperature) AS temp into windowoutput
    4. from inputiot
    5. TIMESTAMP BY EventProcessedUtcTime
    6. GROUP BY SlidingWindow(second,30), deviceid
    7. HAVING Avg(temperature)>30;

    本实战基本流程为:
    1.创建 IoTHub和模拟设备
    2.创建流分析服务
    3.创建SqlDatabase(作为输出)
    3.配置流分析的输入
    4.配置流分析的输出
    5.设计Query
    6.检查实时分析结果
    注意事项:
    输出至SQL DB时,表必须提前创建好;
    表中必须涵盖所有输出字段。

    image.png