强强联合! 利用 Microsoft Azure AKS 集群集成 Apache Spark 来做大数据计算

1. Azure Kubernetes Service 及 Apache Spark 简介

伴随着 Kubernetes 技术的日渐成熟和社区生态的蓬勃发展, 越来越多的客户都将自己很多的应用程序迁移到容器服务上, 以便能够设计出更加快速、便捷、更高可靠的应用架构. 为了提高 Kubernetes 集群的部署速度以及降低其运维成本, Microsoft Azure 推出了托管的 Kubernetes 服务 —— Azure Kubernetes Service (AKS). AKS 带来了诸多优点, 如高可用的控制平面、Calico 等网络插件集成、Azure 服务无缝对接、与开源版本 Kubernetes 同步匹配、平滑迁移及无感知升级等。

在大数据处理与分析领域, 从技术的成熟度、稳定度以及社区活度程度来看, Apache Spark 是目前最流行的计算框架并在实际生产中处于重要地位并广泛使用. Apache Spark 支持 Standalone、Mesos 以及 YARN 等集群资源管理调度平台, 其中以 YARN 在实际生产中使用最为广泛, YARN 就是我们熟知的 Hadoop 平台的资源调度框架. 在计算存储不分离的架构下, 集群每个节点也会作为 Datanode 即采用 HDFS 分布式文件系统来进行数据存储. 但是随着数据量的爆炸式增长以及数据湖概念的日益普及, 计算与存储分离也逐渐成为了刚需. 为此, Microsoft Azure 也推出了满足以上需求的托管式 Hadoop 服务 —— Azure HDInsight 以及 Databricks. HDInsight 是 HDP 版本 Hadoop 在 Azure 云上的托管服务, Databricks 是著名 Spark 创始人完全基于 Apache Spark 并针对 Microsoft Azure 云服务平台进行优化的 Spark 托管服务.

目前最火热的两个技术已经可以结合并迸发出了新的火花, 2.3.0 版本以上的 Apache Spark 已经提供了对 Kubernetes 平台的支持与集成, 使我们能够利用原生的 Kubernetes 来进行集群计算资源的分配与管理 Spark 计算作业. 在该模式下, Spark Driver 和 Executor 都通过 Kubernetes Pods 来运行, 并且也可通过指定 Node Selector 将计算作业运行于特定节点之上 (例如: 带有GPU的实例类型等).


(from: https://spark.apache.org).


2. Apache Spark on AKS 实现

本实验需要正确安装 azcli, kubectl 等命令行工具, 详细步骤请参考相应文档, 本文不再赘述. 具体的操作过程在 AKS Vnet 的一台内网服务器的 root 家目录下进行, 该服务器作为 Spark 集群的 Gateway 客户端.

2.1 创建 AKS 集群

2.1.1 创建资源组

1
az group create --name akssparkrg --location southeastasia

2.1.2 创建 3 节点 AKS 集群

1
2
3
4
5
6
7
8
az aks create --resource-group akssparkrg \
--name aksspark0001 \
--kubernetes-version 1.14.7 \
--node-count 3 \
--node-vm-size Standard_DS2_v3 \
--enable-addons monitoring \
--admin-username akssparkuser \
--ssh-key-value ~/.ssh/id_rsa.pub

集群创建时间在20分钟左右.

2.1.3 获取 AKS Credential:

1
az aks get-credentials --resource-group akssparkrg --name aksspark0001

2.2 在 AKS 集群中创建 Spark 服务帐户并绑定权限

2.2.1 创建 Spark 服务账号

1
kubectl create serviceaccount spark

2.2.2 权限绑定

1
2
3
4
kubectl create clusterrolebinding spark-role \
--clusterrole=edit \
--serviceaccount=default:spark \
--namespace=default

2.3 下载 Spark 二进制文件

Spark官网 选择合适的版本下载, 本例中使用 2.4.4 版本 ( 需要高于2.3.0 ).

1
2
wget https://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar -zvxf spark-2.4.4-bin-hadoop2.7.tgz

2.4 确认 Spark Jar 和 Kubernetes 的版本兼容性

Spark 采用了 fabric8.io 的 Kubernetes & OpenShift Java Client 客户端连接 API Server 来进行资源调度和分配, 需要先确认 Spark 2.4.4 自带的 kubernetes-client-*.jar 版本与 AKS 集群版本是否兼容. Spark 2.4.4 自带的 Kubernetes-client 版本为 4.1.2 ( kubernetes-client-4.1.2.jar ), AKS集群版本为 1.14.7, 存在版本不兼容问题,需要更高版本 jar 包. 具体的版本对应关系如下表所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|                           | Kubernetes 1.4.9 | Kubernetes 1.6.0 | Kubernetes 1.7.0  | Kubernetes 1.9.0  | Kubernetes 1.10.0 | Kubernetes 1.11.0 | Kubernetes 1.12.0 | Kubernetes 1.14.2 | Kubernetes 1.15.3 |
|---------------------------|------------------|------------------|-------------------|-------------------|-------------------|-------------------|-------------------|-------------------|-------------------|
| kubernetes-client 1.3.92 | + | + | - | - | - | - | - | - | - |
| kubernetes-client 3.0.3 | - | - | ✓ | - | - | - | - | - | - |
| kubernetes-client 3.0.10 | - | ✓ | ✓ | ✓ | - | - | - | - | - |
| kubernetes-client 3.0.11 | - | ✓ | ✓ | ✓ | - | - | - | - | - |
| kubernetes-client 3.1.12 | - | ✓ | ✓ | ✓ | - | - | - | - | - |
| kubernetes-client 3.2.0 | - | ✓ | ✓ | ✓ | - | - | - | - | - |
| kubernetes-client 4.0.0 | - | ✓ | ✓ | ✓ | - | - | - | - | - |
| kubernetes-client 4.1.0 | - | ✓ | ✓ | ✓ | - | - | - | - | - |
| kubernetes-client 4.1.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - |
| kubernetes-client 4.1.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - |
| kubernetes-client 4.1.3 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - |
| kubernetes-client 4.2.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - |
| kubernetes-client 4.2.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - |
| kubernetes-client 4.2.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - |
| kubernetes-client 4.3.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.3.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.4.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.4.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.4.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.5.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.5.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.5.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - |
| kubernetes-client 4.6.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |

本文我们直接使用能够与 Kubernetes 1.14.7 版本兼容的 kubernetes-client 4.6.0.jar, 通过 这里 下载, 下载后替换 spark-2.4.4-bin-hadoop2.7/jars 目录下的 kubernetes-client-4.1.2.jar 即可.

2.5 创建 ACR (Azure Container Registry) 并与 AKS 集成

2.5.1 创建 ACR

1
az acr create -n akssparkacr0001 -g akssparkrg --sku basic

记录 ACR Resource ID:
/subscriptions/53a326cc-f961-4540-8701-2bfd1003242b/resourceGroups/akssparkrg/providers/Microsoft.ContainerRegistry/registries/akssparkacr0001

2.5.2 ACR 与 AKS 集群集成

1
2
az aks update -n aksspark0001 -g akssparkrg --attach-acr akssparkacr0001
az aks update -n aksspark0001 -g akssparkrg --attach-acr /subscriptions/53a326cc-f961-4540-8701-2bfd1003242b/resourceGroups/akssparkrg/providers/Microsoft.ContainerRegistry/registries/akssparkacr0001

2.5.3 登录 ACR

1
az acr login -n akssparkacr0001

2.6 构建 Spark 镜像

2.6.1 构建并推送镜像至 ACR

1
2
3
4
5
6
7
8
REGISTRY_NAME=akssparkacr0001.azurecr.io
REGISTRY_TAG=v1

# 执行脚本构建 Spark 镜像
./bin/docker-image-tool.sh -r $REGISTRY_NAME -t $REGISTRY_TAG build

# 推送镜像
./bin/docker-image-tool.sh -r $REGISTRY_NAME -t $REGISTRY_TAG push

2.6.2 Azure 查看docker image:

通过 docker image list 也可以看, 自动构建了 Spark 原生, Spark-R 以及 Pyspark 的容器镜像.

2.7 运行作业 SparkPi

Spark 作业两种方式来运行, Cluster 和 Client 模式. 两种模式的主要区别简单概括就是 Spark Job Driver 是在本地节点还是集群节点上. 在实际生产过程中, 通常会将 spark-submit 的作业提交方式服务化, 通过外部服务调用将作业提交到集群上运行, 最典型的就是 Apache Livy. 本文采取通过命令行 spark-submit 将 SparkPi 作业提交到集群上计算的方式.

2.7.1 创建 Storage Account 并将 spark-examples_2.11-2.4.4.jar 上传至 Azure Blob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 变量定义, storage account name
RESOURCE_GROUP=akssparkrg
STORAGE_ACCT=akssparksa0001

# 创建 storage account
az group create --name $RESOURCE_GROUP --location southeastasia
az storage account create --resource-group $RESOURCE_GROUP --name $STORAGE_ACCT --sku Standard_LRS
export AZURE_STORAGE_CONNECTION_STRING=`az storage account show-connection-string --resource-group $RESOURCE_GROUP --name $STORAGE_ACCT -o tsv`

# 指定上传 jar
CONTAINER_NAME=akssparkjars
BLOB_NAME=spark-examples_2.11-2.4.4.jar
FILE_TO_UPLOAD=/root/aksspark/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

# 创建 blob container
echo "Creating the container..."
az storage container create --name $CONTAINER_NAME
az storage container set-permission --name $CONTAINER_NAME --public-access blob

# 上传 jar
echo "Uploading the file..."
az storage blob upload --container-name $CONTAINER_NAME --file $FILE_TO_UPLOAD --name $BLOB_NAME

# 获取 jar url
JARURL=$(az storage blob url --container-name $CONTAINER_NAME --name $BLOB_NAME | tr -d '"')

2.7.2 运行 SparkPi

通过 spark-submit 提交 SparkPi 作业

1
2
3
4
5
6
7
8
9
10
cd spark-2.4.4-bin-hadoop2.7
./bin/spark-submit \
--master k8s://https://aksspark00-akssparkrg-53a326-07f35845.hcp.southeastasia.azmk8s.io:443 \ # AKS 集群地址
--deploy-mode cluster \ # 集群模式
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=3 \ # 申请 3 个 Executor
--conf spark.kubernetes.container.image=akssparkacr0001.azurecr.io/spark:v1 \ # Docker Image 地址
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ # 指定创建的 Service Account spark
https://akssparksa0001.blob.core.windows.net/akssparkjars/spark-examples_2.11-2.4.4.jar # Jar

spark-submit 的具体参数可以参考 Spark on Kubernetes官网, 不过官方文档只有默认参数配置. 如果需要看所有参数的可选值, 可以考虑去看 GitHub 上 Spark 源代码.

2.7.3 查看运行结果

查看 Spark Job Driver 和对应的 Executor Pods

1
2
3
4
5
6
[root@AKSSpark spark-2.4.4-bin-hadoop2.7]# kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-1571055145415-driver 1/1 Running 0 25s
spark-pi-1571055145415-exec-1 1/1 Running 0 15s
spark-pi-1571055145415-exec-2 1/1 Running 0 15s
spark-pi-1571055145415-exec-3 0/1 Pending 0 15s

运行作业时. 还可以访问 Spark UI. kubectl port-forward 命令提供对 Spark Job UI 的访问权限.

1
kubectl port-forward spark-pi-1571055145415-driver 4040:4040

获取作业结果和日志

1
2
3
[root@AKSSpark spark-2.4.4-bin-hadoop2.7]# kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-1571055145415-driver 0/1 Completed 0 53s

使用 kubectl logs 来获取 Spark 作业的 Log

1
kubectl logs spark-pi-1571055145415-driver

日志中,可以看到 Spark 作业的结果,即 Pi 的值.

1
Pi is roughly 3.1412957064785325

2.8 使用 Spark Shell 进行交互式分析 Azure Blob 数据

在很多需要调试的场景中, 都会通过命令行的交互式界面来进行调试. 对于 Spark 来说, 有 Spark Shell、Spark-R、Pyspark 三种命令行, 分别针对 Scala, R 以及 Python. 下面我们通过Spark Shell 来分析 Azure Blob 的数据.

2.8.1 Docker Image 新打包 v2 版本

Spark 通过 wasbs://{container-name}@{storage-account-name}.blob.core.windows.net 访问 Azure Blob, 这需要在 Spark Jar 中加载 azure-storage.jar 和 hadoop-azure.jar, 这些 Jar 需要打在 Docker Image 里面, 然后上传至 ACR.

1
2
3
cd spark-2.4.4-bin-hadoop2.7/jars
wget http://central.maven.org/maven2/com/microsoft/azure/azure-storage/2.2.0/azure-storage-2.2.0.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-azure/2.7.3/hadoop-azure-2.7.3.jar

然后重复 2.6 节的操作, 指定版本号为 v2, 打包好后推送到 ACR 上, 具体的 Image 地址为 akssparkacr0001.azurecr.io/spark:v2.

2.8.2 使用 Spark Shell 交互式分析 Azure Blob 中的数据

运行 Spark Shell

1
2
3
4
5
6
7
8
cd spark-2.4.4-bin-hadoop2.7
./bin/spark-shell \
--master k8s://https://aksspark00-akssparkrg-53a326-07f35845.hcp.southeastasia.azmk8s.io:443 \
--name spark-shell \
--deploy-mode client \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.executor.limit.cores=1 \
--conf spark.kubernetes.container.image=akssparkacr0001.azurecr.io/spark:v3

进入 Spark Shell Scala 命令行交互界面

Scala 验证

1
2
scala> sc.parallelize(1 to 100000).count
res10: Long = 100000

具体的分析的文件为示例 csv, 名为 diamonds.csv, 并已经提前上传到 Azure Blob Container 名为 sparkshell 中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Azure Blob Storage Account 认证
scala> spark.conf.set(
| "fs.azure.account.key.akssparksa0001.blob.core.windows.net",
| "gWfl5JezYzXs1ub572KZDAWTR9buVb/pb/dHj/iqsKV07fQKSl+JzUqcLWgx4Qr7xQTVPBSVsXhO6Aja/torAw==")

# 加载 blob 文件
scala> val diamonds = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("wasbs://sparkshell@akssparksa0001.blob.core.windows.net/diamonds.csv")
diamonds: org.apache.spark.sql.DataFrame = [_c0: int, carat: double ... 9 more fields]

# 显示字段类型
scala> diamonds.printSchema()
root
|-- _c0: integer (nullable = true)
|-- carat: double (nullable = true)
|-- cut: string (nullable = true)
|-- color: string (nullable = true)
|-- clarity: string (nullable = true)
|-- depth: double (nullable = true)
|-- table: double (nullable = true)
|-- price: integer (nullable = true)
|-- x: double (nullable = true)
|-- y: double (nullable = true)
|-- z: double (nullable = true)

# 显示前 10 行数据
scala> diamonds.show(10)
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat| cut|color|clarity|depth|table|price| x| y| z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 1| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43|
| 2| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31|
| 3| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31|
| 4| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63|
| 5| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75|
| 6| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48|
| 7| 0.24|Very Good| I| VVS1| 62.3| 57.0| 336|3.95|3.98|2.47|
| 8| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53|
| 9| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49|
| 10| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
only showing top 10 rows

# 提取前 10 行并写入新文件
scala> diamonds.limit(10).write.format("csv").save("wasbs://sparkshell@akssparksa0001.blob.core.windows.net/1.csv")
scala> spark.read.csv("wasbs://sparkshell@akssparksa0001.blob.core.windows.net/1.csv").show()
+---+----+---------+---+----+----+----+---+----+----+----+
|_c0| _c1| _c2|_c3| _c4| _c5| _c6|_c7| _c8| _c9|_c10|
+---+----+---------+---+----+----+----+---+----+----+----+
| 1|0.23| Ideal| E| SI2|61.5|55.0|326|3.95|3.98|2.43|
| 2|0.21| Premium| E| SI1|59.8|61.0|326|3.89|3.84|2.31|
| 3|0.23| Good| E| VS1|56.9|65.0|327|4.05|4.07|2.31|
| 4|0.29| Premium| I| VS2|62.4|58.0|334| 4.2|4.23|2.63|
| 5|0.31| Good| J| SI2|63.3|58.0|335|4.34|4.35|2.75|
| 6|0.24|Very Good| J|VVS2|62.8|57.0|336|3.94|3.96|2.48|
| 7|0.24|Very Good| I|VVS1|62.3|57.0|336|3.95|3.98|2.47|
| 8|0.26|Very Good| H| SI1|61.9|55.0|337|4.07|4.11|2.53|
| 9|0.22| Fair| E| VS2|65.1|61.0|337|3.87|3.78|2.49|
| 10|0.23|Very Good| H| VS1|59.4|61.0|338| 4.0|4.05|2.39|
+---+----+---------+---+----+----+----+---+----+----+----+

另外, 跨 Azure Blob Container 的文件读写也可以.


3. 总结

通过以上的几个步骤, 可以实现在 Azure Kubernetes Service 集群上运行 Spark 作业的平台部署. 与传统的 YARN 来做资源管理调度相比较而言更为轻量快捷, 同时可以与 Azure Blob 做集成, 数据存储在 Azure Blob 中进行分析, 既可以保证数据高可靠性也能够实现计算与存储的分离从而提高灵活性.