1. Azure Kubernetes Service 及 Apache Spark 简介 伴随着 Kubernetes 技术的日渐成熟和社区生态的蓬勃发展, 越来越多的客户都将自己很多的应用程序迁移到容器服务上, 以便能够设计出更加快速、便捷、更高可靠的应用架构. 为了提高 Kubernetes 集群的部署速度以及降低其运维成本, Microsoft Azure 推出了托管的 Kubernetes 服务 —— Azure Kubernetes Service (AKS). AKS 带来了诸多优点, 如高可用的控制平面、Calico 等网络插件集成、Azure 服务无缝对接、与开源版本 Kubernetes 同步匹配、平滑迁移及无感知升级等。
在大数据处理与分析领域, 从技术的成熟度、稳定度以及社区活度程度来看, Apache Spark 是目前最流行的计算框架并在实际生产中处于重要地位并广泛使用. Apache Spark 支持 Standalone、Mesos 以及 YARN 等集群资源管理调度平台, 其中以 YARN 在实际生产中使用最为广泛, YARN 就是我们熟知的 Hadoop 平台的资源调度框架. 在计算存储不分离的架构下, 集群每个节点也会作为 Datanode 即采用 HDFS 分布式文件系统来进行数据存储. 但是随着数据量的爆炸式增长以及数据湖概念的日益普及, 计算与存储分离也逐渐成为了刚需. 为此, Microsoft Azure 也推出了满足以上需求的托管式 Hadoop 服务 —— Azure HDInsight 以及 Databricks. HDInsight 是 HDP 版本 Hadoop 在 Azure 云上的托管服务, Databricks 是著名 Spark 创始人完全基于 Apache Spark 并针对 Microsoft Azure 云服务平台进行优化的 Spark 托管服务.
目前最火热的两个技术已经可以结合并迸发出了新的火花, 2.3.0 版本以上的 Apache Spark 已经提供了对 Kubernetes 平台的支持与集成, 使我们能够利用原生的 Kubernetes 来进行集群计算资源的分配与管理 Spark 计算作业. 在该模式下, Spark Driver 和 Executor 都通过 Kubernetes Pods 来运行, 并且也可通过指定 Node Selector 将计算作业运行于特定节点之上 (例如: 带有GPU的实例类型等).
(from: https://spark.apache.org ).
2. Apache Spark on AKS 实现 本实验需要正确安装 azcli, kubectl 等命令行工具, 详细步骤请参考相应文档, 本文不再赘述. 具体的操作过程在 AKS Vnet 的一台内网服务器的 root 家目录下进行, 该服务器作为 Spark 集群的 Gateway 客户端.
2.1 创建 AKS 集群 2.1.1 创建资源组 1 az group create --name akssparkrg --location southeastasia
2.1.2 创建 3 节点 AKS 集群 1 2 3 4 5 6 7 8 az aks create --resource-group akssparkrg \ --name aksspark0001 \ --kubernetes-version 1.14.7 \ --node-count 3 \ --node-vm-size Standard_DS2_v3 \ --enable-addons monitoring \ --admin-username akssparkuser \ --ssh-key-value ~/.ssh/id_rsa.pub
集群创建时间在20分钟左右.
2.1.3 获取 AKS Credential: 1 az aks get-credentials --resource-group akssparkrg --name aksspark0001
2.2 在 AKS 集群中创建 Spark 服务帐户并绑定权限 2.2.1 创建 Spark 服务账号 1 kubectl create serviceaccount spark
2.2.2 权限绑定 1 2 3 4 kubectl create clusterrolebinding spark-role \ --clusterrole=edit \ --serviceaccount=default:spark \ --namespace=default
2.3 下载 Spark 二进制文件 从 Spark官网 选择合适的版本下载, 本例中使用 2.4.4 版本 ( 需要高于2.3.0 ).
1 2 wget https://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz tar -zvxf spark-2.4.4-bin-hadoop2.7.tgz
2.4 确认 Spark Jar 和 Kubernetes 的版本兼容性 Spark 采用了 fabric8.io 的 Kubernetes & OpenShift Java Client 客户端连接 API Server 来进行资源调度和分配, 需要先确认 Spark 2.4.4 自带的 kubernetes-client-*.jar 版本与 AKS 集群版本是否兼容. Spark 2.4.4 自带的 Kubernetes-client 版本为 4.1.2 ( kubernetes-client-4.1.2.jar ), AKS集群版本为 1.14.7, 存在版本不兼容问题,需要更高版本 jar 包. 具体的版本对应关系如下表所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | | Kubernetes 1.4.9 | Kubernetes 1.6.0 | Kubernetes 1.7.0 | Kubernetes 1.9.0 | Kubernetes 1.10.0 | Kubernetes 1.11.0 | Kubernetes 1.12.0 | Kubernetes 1.14.2 | Kubernetes 1.15.3 | |---------------------------|------------------|------------------|-------------------|-------------------|-------------------|-------------------|-------------------|-------------------|-------------------| | kubernetes-client 1.3.92 | + | + | - | - | - | - | - | - | - | | kubernetes-client 3.0.3 | - | - | ✓ | - | - | - | - | - | - | | kubernetes-client 3.0.10 | - | ✓ | ✓ | ✓ | - | - | - | - | - | | kubernetes-client 3.0.11 | - | ✓ | ✓ | ✓ | - | - | - | - | - | | kubernetes-client 3.1.12 | - | ✓ | ✓ | ✓ | - | - | - | - | - | | kubernetes-client 3.2.0 | - | ✓ | ✓ | ✓ | - | - | - | - | - | | kubernetes-client 4.0.0 | - | ✓ | ✓ | ✓ | - | - | - | - | - | | kubernetes-client 4.1.0 | - | ✓ | ✓ | ✓ | - | - | - | - | - | | kubernetes-client 4.1.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - | | kubernetes-client 4.1.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - | | kubernetes-client 4.1.3 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - | | kubernetes-client 4.2.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - | | kubernetes-client 4.2.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - | | kubernetes-client 4.2.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | - | - | | kubernetes-client 4.3.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.3.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.4.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.4.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.4.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.5.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.5.1 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.5.2 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | - | | kubernetes-client 4.6.0 | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
本文我们直接使用能够与 Kubernetes 1.14.7 版本兼容的 kubernetes-client 4.6.0.jar, 通过 这里 下载, 下载后替换 spark-2.4.4-bin-hadoop2.7/jars 目录下的 kubernetes-client-4.1.2.jar 即可.
2.5 创建 ACR (Azure Container Registry) 并与 AKS 集成 2.5.1 创建 ACR 1 az acr create -n akssparkacr0001 -g akssparkrg --sku basic
记录 ACR Resource ID: /subscriptions/53a326cc-f961-4540-8701-2bfd1003242b/resourceGroups/akssparkrg/providers/Microsoft.ContainerRegistry/registries/akssparkacr0001
2.5.2 ACR 与 AKS 集群集成 1 2 az aks update -n aksspark0001 -g akssparkrg --attach-acr akssparkacr0001 az aks update -n aksspark0001 -g akssparkrg --attach-acr /subscriptions/53a326cc-f961-4540-8701-2bfd1003242b/resourceGroups/akssparkrg/providers/Microsoft.ContainerRegistry/registries/akssparkacr0001
2.5.3 登录 ACR 1 az acr login -n akssparkacr0001
2.6 构建 Spark 镜像 2.6.1 构建并推送镜像至 ACR 1 2 3 4 5 6 7 8 REGISTRY_NAME=akssparkacr0001.azurecr.io REGISTRY_TAG=v1 ./bin/docker-image-tool.sh -r $REGISTRY_NAME -t $REGISTRY_TAG build ./bin/docker-image-tool.sh -r $REGISTRY_NAME -t $REGISTRY_TAG push
2.6.2 Azure 查看docker image:
通过 docker image list 也可以看, 自动构建了 Spark 原生, Spark-R 以及 Pyspark 的容器镜像.
2.7 运行作业 SparkPi Spark 作业两种方式来运行, Cluster 和 Client 模式. 两种模式的主要区别简单概括就是 Spark Job Driver 是在本地节点还是集群节点上. 在实际生产过程中, 通常会将 spark-submit 的作业提交方式服务化, 通过外部服务调用将作业提交到集群上运行, 最典型的就是 Apache Livy. 本文采取通过命令行 spark-submit 将 SparkPi 作业提交到集群上计算的方式.
2.7.1 创建 Storage Account 并将 spark-examples_2.11-2.4.4.jar 上传至 Azure Blob 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 RESOURCE_GROUP=akssparkrg STORAGE_ACCT=akssparksa0001 az group create --name $RESOURCE_GROUP --location southeastasia az storage account create --resource-group $RESOURCE_GROUP --name $STORAGE_ACCT --sku Standard_LRS export AZURE_STORAGE_CONNECTION_STRING=`az storage account show-connection-string --resource-group $RESOURCE_GROUP --name $STORAGE_ACCT -o tsv`CONTAINER_NAME=akssparkjars BLOB_NAME=spark-examples_2.11-2.4.4.jar FILE_TO_UPLOAD=/root/aksspark/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar echo "Creating the container..." az storage container create --name $CONTAINER_NAME az storage container set-permission --name $CONTAINER_NAME --public-access blob echo "Uploading the file..." az storage blob upload --container-name $CONTAINER_NAME --file $FILE_TO_UPLOAD --name $BLOB_NAME JARURL=$(az storage blob url --container-name $CONTAINER_NAME --name $BLOB_NAME | tr -d '"' )
2.7.2 运行 SparkPi 通过 spark-submit 提交 SparkPi 作业
1 2 3 4 5 6 7 8 9 10 cd spark-2.4.4-bin-hadoop2.7./bin/spark-submit \ --master k8s://https://aksspark00-akssparkrg-53a326-07f35845.hcp.southeastasia.azmk8s.io:443 \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=3 \ --conf spark.kubernetes.container.image=akssparkacr0001.azurecr.io/spark:v1 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ https://akssparksa0001.blob.core.windows.net/akssparkjars/spark-examples_2.11-2.4.4.jar
spark-submit 的具体参数可以参考 Spark on Kubernetes官网 , 不过官方文档只有默认参数配置. 如果需要看所有参数的可选值, 可以考虑去看 GitHub 上 Spark 源代码 .
2.7.3 查看运行结果 查看 Spark Job Driver 和对应的 Executor Pods
1 2 3 4 5 6 [root@AKSSpark spark-2.4.4-bin-hadoop2.7] NAME READY STATUS RESTARTS AGE spark-pi-1571055145415-driver 1/1 Running 0 25s spark-pi-1571055145415-exec-1 1/1 Running 0 15s spark-pi-1571055145415-exec-2 1/1 Running 0 15s spark-pi-1571055145415-exec-3 0/1 Pending 0 15s
运行作业时. 还可以访问 Spark UI. kubectl port-forward 命令提供对 Spark Job UI 的访问权限.
1 kubectl port-forward spark-pi-1571055145415-driver 4040:4040
获取作业结果和日志
1 2 3 [root@AKSSpark spark-2.4.4-bin-hadoop2.7] NAME READY STATUS RESTARTS AGE spark-pi-1571055145415-driver 0/1 Completed 0 53s
使用 kubectl logs 来获取 Spark 作业的 Log
1 kubectl logs spark-pi-1571055145415-driver
日志中,可以看到 Spark 作业的结果,即 Pi 的值.
1 Pi is roughly 3.1412957064785325
2.8 使用 Spark Shell 进行交互式分析 Azure Blob 数据 在很多需要调试的场景中, 都会通过命令行的交互式界面来进行调试. 对于 Spark 来说, 有 Spark Shell、Spark-R、Pyspark 三种命令行, 分别针对 Scala, R 以及 Python. 下面我们通过Spark Shell 来分析 Azure Blob 的数据.
2.8.1 Docker Image 新打包 v2 版本 Spark 通过 wasbs://{container-name}@{storage-account-name}.blob.core.windows.net 访问 Azure Blob, 这需要在 Spark Jar 中加载 azure-storage.jar 和 hadoop-azure.jar, 这些 Jar 需要打在 Docker Image 里面, 然后上传至 ACR.
1 2 3 cd spark-2.4.4-bin-hadoop2.7/jarswget http://central.maven.org/maven2/com/microsoft/azure/azure-storage/2.2.0/azure-storage-2.2.0.jar wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-azure/2.7.3/hadoop-azure-2.7.3.jar
然后重复 2.6 节的操作, 指定版本号为 v2, 打包好后推送到 ACR 上, 具体的 Image 地址为 akssparkacr0001.azurecr.io/spark:v2.
2.8.2 使用 Spark Shell 交互式分析 Azure Blob 中的数据 运行 Spark Shell
1 2 3 4 5 6 7 8 cd spark-2.4.4-bin-hadoop2.7./bin/spark-shell \ --master k8s://https://aksspark00-akssparkrg-53a326-07f35845.hcp.southeastasia.azmk8s.io:443 \ --name spark-shell \ --deploy-mode client \ --conf spark.executor.instances=3 \ --conf spark.kubernetes.executor.limit.cores=1 \ --conf spark.kubernetes.container.image=akssparkacr0001.azurecr.io/spark:v3
进入 Spark Shell Scala 命令行交互界面
Scala 验证
1 2 scala> sc.parallelize(1 to 100000).count res10: Long = 100000
具体的分析的文件为示例 csv, 名为 diamonds.csv, 并已经提前上传到 Azure Blob Container 名为 sparkshell 中.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 scala> spark.conf.set( | "fs.azure.account.key.akssparksa0001.blob.core.windows.net" , | "gWfl5JezYzXs1ub572KZDAWTR9buVb/pb/dHj/iqsKV07fQKSl+JzUqcLWgx4Qr7xQTVPBSVsXhO6Aja/torAw==" ) scala> val diamonds = spark.read.format("csv" ).option("header" , "true" ).option("inferSchema" , "true" ).load("wasbs://sparkshell@akssparksa0001.blob.core.windows.net/diamonds.csv" ) diamonds: org.apache.spark.sql.DataFrame = [_c0: int, carat: double ... 9 more fields] scala> diamonds.printSchema() root |-- _c0: integer (nullable = true ) |-- carat: double (nullable = true ) |-- cut : string (nullable = true ) |-- color: string (nullable = true ) |-- clarity: string (nullable = true ) |-- depth: double (nullable = true ) |-- table: double (nullable = true ) |-- price: integer (nullable = true ) |-- x: double (nullable = true ) |-- y: double (nullable = true ) |-- z: double (nullable = true ) scala> diamonds.show(10) +---+-----+---------+-----+-------+-----+-----+-----+----+----+----+ |_c0|carat| cut |color|clarity|depth|table|price| x| y| z| +---+-----+---------+-----+-------+-----+-----+-----+----+----+----+ | 1| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| | 2| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| | 3| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| | 4| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| | 5| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| | 6| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| | 7| 0.24|Very Good| I| VVS1| 62.3| 57.0| 336|3.95|3.98|2.47| | 8| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| | 9| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| | 10| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| +---+-----+---------+-----+-------+-----+-----+-----+----+----+----+ only showing top 10 rows scala> diamonds.limit(10).write.format("csv" ).save("wasbs://sparkshell@akssparksa0001.blob.core.windows.net/1.csv" ) scala> spark.read.csv("wasbs://sparkshell@akssparksa0001.blob.core.windows.net/1.csv" ).show() +---+----+---------+---+----+----+----+---+----+----+----+ |_c0| _c1| _c2|_c3| _c4| _c5| _c6|_c7| _c8| _c9|_c10| +---+----+---------+---+----+----+----+---+----+----+----+ | 1|0.23| Ideal| E| SI2|61.5|55.0|326|3.95|3.98|2.43| | 2|0.21| Premium| E| SI1|59.8|61.0|326|3.89|3.84|2.31| | 3|0.23| Good| E| VS1|56.9|65.0|327|4.05|4.07|2.31| | 4|0.29| Premium| I| VS2|62.4|58.0|334| 4.2|4.23|2.63| | 5|0.31| Good| J| SI2|63.3|58.0|335|4.34|4.35|2.75| | 6|0.24|Very Good| J|VVS2|62.8|57.0|336|3.94|3.96|2.48| | 7|0.24|Very Good| I|VVS1|62.3|57.0|336|3.95|3.98|2.47| | 8|0.26|Very Good| H| SI1|61.9|55.0|337|4.07|4.11|2.53| | 9|0.22| Fair| E| VS2|65.1|61.0|337|3.87|3.78|2.49| | 10|0.23|Very Good| H| VS1|59.4|61.0|338| 4.0|4.05|2.39| +---+----+---------+---+----+----+----+---+----+----+----+
另外, 跨 Azure Blob Container 的文件读写也可以.
3. 总结 通过以上的几个步骤, 可以实现在 Azure Kubernetes Service 集群上运行 Spark 作业的平台部署. 与传统的 YARN 来做资源管理调度相比较而言更为轻量快捷, 同时可以与 Azure Blob 做集成, 数据存储在 Azure Blob 中进行分析, 既可以保证数据高可靠性也能够实现计算与存储的分离从而提高灵活性.