Azure Databricks 系列 Blog(三)之 SparkSQL on Delta Lake

1. 前言

上一篇介绍了 Azure Databricks 流计算框架 Structure Streaming,通过一个 Demo 演示了 Spark 的实时流计算框架 Structure Streaming。但是在实际业务中,光有实时数据流是不够的,离线数据的计算和分析也非常重要,并且往往离线分析才能获取业务上所需要的更多的 Insights,而一般来说离线的数据分析都会跑一些 OLAP SQL 查询,所以基于此场景,本文就来介绍并演示下 SparkSQL,一个可以通过 Spark 来进行 SQL 分析的可实时可离线的计算框架。本文场景基于集成 Azure Datalake Gen2 并 Enable Databricks Delta Lake 作为外部存储实现计算存储分离,通过 SparkSQL 新冠肺炎实时统计数据,资源依然沿用前面 Demo 所创建的资源组,具体如何实现,且听下文分解。


2. Delta Lake 介绍及 Azure Datalake Gen2 集成

Delta Lake 是可提高数据湖可靠性的开源存储层,由 Databricks 开发并开源出来。 Delta Lake 提供 ACID 事务和可缩放的元数据处理,并可以统一流处理和批数据处理。 Delta Lake 在现有 Data Lake 的顶层运行,并且可以与 Apache Spark API 完全兼容。具体而言,Delta Lake 提供:

Spark 上的 ACID 事务:可序列化的隔离级别可避免读者看到不一致的数据。
可缩放的元数据处理:利用 Spark 的分布式处理能力,轻松处理包含数十亿文件的 PB 级表的所有元数据。
流式处理和批处理统一:Delta Lake 中的表是批处理表,也是流式处理源和接收器。 流式处理数据引入、批处理历史回填、交互式查询功能都是现成的。
架构强制:自动处理架构变体,以防在引入过程中插入错误的记录。
按时间顺序查看:数据版本控制支持回滚、完整的历史审核线索和可重现的机器学习试验。
更新插入和删除:支持合并、更新和删除操作,以启用复杂用例,如更改数据捕获、渐变维度 (SCD) 操作、流式处理更新插入等。

总的来说,Delta Lake 不仅仅在需要事务的 ACID 场景给予支持,并且针对性能也做了相当大的代码优化,本文的实现也是基于 Delta Lake,需要注意数据格式需要转换成 delta。

正常 Azure Datalake Gen2 与 Delta Lake 的集成需要在 Spark 中增加配置项 “spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore”, 并且集群需要在 lib 库中支持 hadoop-azure-datalake/hadoop-azure/wildfly-openssl JAR,具体过程可以见这里。所以这时候 PaaS 的优势就显示出来了,Azure Databricks 在 Cluster 创建好的同时就已经在集群上启用了该库,直接调用即可。 Azure Datalake Gen2 的创建过程本文不赘述,具体见官方文档。本文介绍两种常见的集成方式,如下图所示并做几点说明:

cmd2:非挂载方式
cmd2 中通过 scala 代码演示 Azure Databricks 集成 Azure Datalake Gen2 的认证配置项,通过创建拥有 IAM Role “Storage Blob Data Contributor” 的 Service Principle 来做认证,验证通过后就可以直接调用 Datalake 里面的文件了,注意相关变量需要替换。

cmd3:挂载方式
cmd3 中通过 Python 代码演示 Azure Databricks 如何挂载 Azure Datalake Gen2,身份验证过程和 cmd2 的方式一样。这种方式的好处是直接可以可以把远端的 Azure Datalake 挂载到 Azure Databricks 上,就好像在使用本地磁盘一样使用 Datalake,本文更推荐并且采用该模式,同时注意相关变量需要替换。

另外,本文的环境由于都是在 Azure 中国区,所以相关的域名都是 https://login.partner.microsoftonline.cn, 如果在 Azure Global,那么注意域名是 https://login.microsoftonline.com。 其实从最佳实践的角度来说,本文并不是实现的最好方式,因为在 Notebook 里面的认证信息全部都是明文出现,最好的方式应该集成 Azure Key Vault 来隐藏密码,该方式不再本文赘述,有兴趣的同学自行研究吧,附上链接供参考。


3. 引入数据源并运行 SparkSQL

前面提到,本文所采用的示例数据是和新冠肺炎实时统计数据相关,讲到这里介绍一个微软提供的一个开放数据集 Repo,链接在这里,包括各种场景下的 csv/json/parquet 格式的原始和脱敏数据,本文所使用的新冠肺炎数据也是在这里获取的,选择 csv 格式数据作为数据源,下载好之后 upload 到 Azure DataLake 上,在 container data 下面再创建个 source folder 作为存放原始数据的目录径。

原始数据确定好之后,就可以通过 Notebook 来做 SparkSQL 分析了,本文用到的示例 Notebook 已经上传到了这里,几个重要配置简单说明下:

COVIDDF:COVIDDF 为定义的 DataFrame,通过 spark.read.format(‘csv’) 格式并指定挂载路径引入数据;
COVIDDF.write.mode(“append”).format(“delta”):通过 append 追加的方式写入目标 Table 中,注意这里的格式 delta,也就是前面提到的需要做格式转换,写入的表也需要指定路径,并会在 Azure Dalalake 中生成相应的文件;
ChinaCOVID 及 ChinaCOVID1:经过上面的建表过程之后就可以运行 SparkSQL 来做 SQL 查询了;


4. 总结

一个通过集成 Azure Datalake Gen2 作为外部存储并采用 Delta Lake 格式进行 SparkSQL 分析的示例就完成了,本文基本上使用了 PySpark 来实现,其实除此之外通过 SQL 或者 Scala 也是一样可以实现的,有兴趣的同学可以自己再深入研究下吧。