Azure Databricks 系列 Blog(二)之流计算 Structure Streaming

1. 前言

经过开箱介绍后,想必都对 Azure Databricks 有了大致的了解,那乘胜追击,本文介绍一下 Azure Databricks 的流计算架构。基于的场景是设计一个实时告警系统:“针对一家杂货店的购物者的购物异常习惯进行实时采样和告警,主要针对的产品是止咳糖浆,如果采样发现一次性购物超过 10 瓶就进行实时告警”,毕竟需要防止下一个沃尔特·怀特出现。需要用到的服务全部部署在中国区 Azure 上,后文对具体每项服务功能不多做介绍,请读者到官网自行搜索查询。


2. 流计算架构说明

依据本文模拟的场景,设计的云上架构如下图所示:

用到的相关服务组件功能做如下说明:
Azure VM:数据源,扮演 Producer 生产者,通过 Python 代码模拟客户购买行为,生成示例数据通过 SDK 发送数据到 Azure Eventhub。
Azure Eventhub:消息队列,做上下游生产者消费者服务的解耦,Entity ingestion 接收 Producer 发送的数据,Entity alerting 接收经过 Databricks 实时计算后的数据。
Azure Databricks:订阅 Eventhub Entity ingestion 作为数据源,通过 Structure Streaming 对数据进行实时处理后发送 Entity alerting。
Azure LogicApp:订阅 Eventhub Entity alerting 并做邮件实时告警。

整个数据流:Producer 生产者发送数据 → Eventhub Entity ingestion → Databricks Structured Streaming → Eventhub Entity alerting → Logic App


3. Azure Databrick Structure Streaming 实现

3.1 Terraform 自动化部署

通过 Terraform 部署的服务组件包括 Azure Virtual Machine、 Azure Databricks、Eventhub、Logic App,具体的 tf 文件和变量见这里,每一项服务 Terraform Azure Provider 都有 Resource 支持,具体可以参考官方文档。部署完成之后的资源清单如下图所示,所有资源都部署在中国北二区域。

3.2 Producer 代码发布

模拟的生产者代码通过 VM 发布,通过调用 Azure Eventhub 的 SDK 将 Messages 进行写入,具体代码见这里,几个重要配置简单说明下:
azure.eventhub:Azure Eventhub SDK 包,需要通过 pip3 install azure.eventhub 来指定安装。
create_batch():通过该方法 batch 发送数据,本次示例以 1 条消息为 1 个 batch 发送到 Eventhub Entity ingestion。
CONNECTION_STR:Azure Eventhub Endpoint,该连接字符串可以在 Portal 上 Shared access policies 的 Connection string–primary key 查看。
EVENTHUB_NAME:写入的 Eventhub Entity Name。

3.3 Azure Databricks 集群配置及 Structure Streaming Notebook 集成

Azure Databricks 的创建过程是先需要在 Azure 上创建一个 Databricks 实体,然后在此基础上在实体内部创建 Workspace 以及 Cluster,再提交 Job 等等。针对 Databricks 资源,都会有唯一的 ID 和 Endpoint 与之对应,以便能够进行 Restful API 调用,集群通过 Databricks Portal 创建即可。本示例创建 1 Driver 2 Worker 共计 3 个节点的 Standard Cluster,Databricks 版本为 6.4 (includes Apache Spark 2.4.5, Scala 2.11)。如果需要做机器学习相关的计算,可以启用集成 GPU/ML 框架的版本,详细说明见官方文档,不做赘述。

集群状态变为 Running 后就意味着 Ready 可以使用了,不过在导入 Python Notebook 之前,需要通过 Maven Central 安装 com.microsoft.azure:azure-eventhubs-spark 库文件以便安装 Spark 连接 Azure Eventhub Connector,需要注意库文件的版本要匹配。

Notebook 可以直接在 Portal 里新建写入,也可以在 VS Code 等 IDE 编写完之后发布,本文采用第二种模式,原因是 IDE 丰富的插件可以提高效率。具体的 Notebook 本文不做展示,放在这里,有需要的读者可以自行查看。通过 import 导入后,附上导入后的截图并做几点说明:

整个 Notebook 分为三个 stage:
第一阶段:从 Eventhub Entity ingestion 读取 Producer 写入的数据,通过 Streaming DataFrames 的 spark.readStream() 创建。
第二阶段:通过 DataFrame 丰富的函数做字段筛选,筛选出来我们需要的字段。
第二阶段:回写 Eventhub Entity alerting,通过 Streaming DataFrames 的 spark.writeStream() 流式写入,注意利用 checkpoint 方便任务终止再运行。

当 Producer 运行起来的时候,Eventhub 就会不断有数据写入,所以能看到 Spark 的 Input Records 的图像。对于每一个 Job,都能看到对于该任务分配的资源和 Spark 参数配置项。

3.4 Logic APP 配置邮件告警

经过 Azure Databricks 的数据筛选后,筛选出来的 Messages 都写入了 Eventhub Entity alerting 中,此时通过 LogicApp 来定义一个自动化的 workflow 来进行邮件告警。具体创建过程选择 Blank 然后自己创建 Step 即可,当然 Azure Portal 上的示例模板也可以用来参考,如下图所示:

第一步订阅 Eventhub Entity alerting,第二步集成 Outlook 邮件接口发送告警邮件。所以当目标消息被筛选出来之后,LogicApp 就按照定义的邮件内容(本文是 Messages 内容和时间戳)来发送邮件,发送邮件的截图如下:


4. 总结

总体上,一个通过消息队列 Azure Eventhub 以及 Databricks 做流计算处理的示例就完成了。如果消息生产者 Producer 不断产生消息,那么整个任务就会一直运行下去,当出现目标消息的时候就会不断的持续告警。在 Spark 推出 Structure Streaming 以后,也解决了 Spark Streaming microbatch 的局限性。介绍完流式计算,下一篇聊聊 Azure Databricks SparkSQL ,敬请期待吧~