1. 前言
上一篇,我们验证测试了 Apache Spark 和 Azure Kubernetes Service 的集成,包括通过 Cluster Mode 依托 AKS 集群运行 Spark 作业以及和 Azure Blob Storage 集成来做计算存储的解耦分离。和传统的资源调度框架 Hadoop Yarn 相比, 使用 Azure Kubernetes Service 提高了部署速度和效率,提高灵活性并一定程度降低了运维管理成本。Spark 在强大的社区支撑下,技术的成熟度是在几大大数据计算框架中最高也是最全面的。但是除了 Spark,在大数据计算框架中还有一颗冉冉升起的新星,It is Apache Flink。本文我们来聊聊 Apache Flink 以及如何与 Azure Kubernetes Service 集成来做流批计算。
2. Apahce Flink 介绍
Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。
(from: https://flink.apache.org/)
Flink 的代码主要是由 Java 语言实现的,部分代码是 Scala,提供主流的 Java 及 Scala API,并且从 1.0 版本开始也提供 Python API 即 Pyflink。对 Flink 而言,其所要处理的主要场景就是 Streaming 流数据,Batch 批数据只是流数据的一个时间维度上的特例而已,所以 Flink 是一款真正的流批统一的计算引擎,这种设计思路对于某些业务场景可以提高代码的复用率,所以这也是 Flink 被很多大厂接受并广泛使用的原因之一。下面我们来看看 Flink 的内部架构:
从下至上来看:
部署: Flink 支持本地运行(IDE 中直接运行程序)、能在独立集群(Standalone 模式)或者在被 YARN、Mesos、K8s 管理的集群上运行,也能部署在云上。
运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。
API:DataStream、DataSet、Table、SQL API。
扩展库:Flink 还包括用于 CEP(复杂事件处理)、机器学习、图形处理等场景。
本文仅调研 Flink on Azure Kubernetes 并做流批场景的技术可行性验证,对于其他部署模式不展开讨论,有兴趣的同学可以自行研究。
3. Flink on AKS 实现
本实验需要正确安装 Azure Cli、Terraform,Kubectl、Docker 等软件包,详细步骤请参考相应文档,本文不进行赘述。具体的操作过程在 AKS Vnet 的一台内网服务器上进行,该服务器作为 Flink 集群的跳板机使用。
3.1 创建 AKS 集群、ACR 及跳板机
为了提高效率,本文直接使用 Terraform 来做自动化一键部署,Terraform 的介绍不做赘述,同学们自行搜索吧。废话少说,直接上 Code,具体 Terraform Azure 的用法可以参考这里。本文按照 Terraform 的最佳实践结构设计,由于 variables.tf 包含一些个人 Azure 账号信息,所以本文只展现主要的 azure-aks.tf 文件,内容如下:
1 | # Configure the Microsoft Azure Provider |
所有资源会在 10 分钟以内部署完成。
3.2 获取 AKS Credential 并查看 AKS Cluster 信息:
1 | az aks get-credentials --resource-group aksrg001 --name akscluster001 |
集群节点信息:
1 | $ kubectl get node |
3.3 Docker Login 登陆 ACR (Azure Container Registry)
1 | docker login aksacr001.azurecr.io |
3.4 Flink Streaming on AKS
3.4.1 Streaming 任务示例
该任务会从某个端口中读取文本,分割为单词,并且每 5 秒钟打印一次每个单词出现的次数。以下代码是从 Flink 官方文档 上获取来的。
1 | import org.apache.flink.api.common.functions.FlatMapFunction; |
接下来执行 mvn clean package 命令,打包好的 Jar 文件路径为 target/flink-on-kubernetes-1.0.0-SNAPSHOT-jar-with-dependencies.jar。
3.4.2 构建自定义 Flink Docker 镜像并推送至 ACR
Flink 在 DockerHub 上提供了一个官方的容器镜像,我们可以以该镜像为基础,构建独立的 Flink 镜像,主要就是将自定义 Jar 包打到镜像里面去。此外,新版 Flink 已将 Hadoop 依赖从官方发行版中剥离,因此我们在打镜像时也需要包含进去。官方的 Dockerfile 主要做了将 OpenJDK 1.8 作为 JDK 环境,下载并安装 Flink 至 /opt/flink,同时添加 flink 用户和组等。我们在此基础上构建自定义 Flink 镜像:
1 | FROM flink:1.9.1-scala_2.12 |
注:hadoop_jar 从这里下载;job_jar 为刚刚编译打包好的 flink-on-kubernetes-1.0.0-SNAPSHOT-jar-with-dependencies.jar。下载好后做编译:
1 | $ cd /path/to/Dockerfile |
查看镜像:
1 | $ docker image ls |
镜像推送至 ACR:
1 | docker tag flink-on-kubernetes:1.0.0 aksacr001.azurecr.io/flink-on-kubernetes:v1 |
<img src=”https://cdn.jsdelivr.net/gh/TheoDoreW/CDN_Images@master/images/2019-11-18-FlinkonAKS/flink_acr.jpg" “height:1000px” width=”1000px” div align=center/>
3.4.3 构建 Flink Streaming JobManager 和 Service
1 | $ vi flink-streaming-on-aks-jobmanager.yml |
使用 kubectl 命令创建对象,并查看状态:
1 | $ export JOB=flink-streaming-on-aks |
接着创建一个 Service 来将 JobManager 的端口开放出来,以便 TaskManager 做服务注册和调用:
1 | $ vi flink-streaming-on-aks-service.yml |
1 | $ envsubst <flink-streaming-on-aks-service.yml | kubectl apply -f - |
3.4.4 构建 Flink Streaming TaskManager
1 | $ vi flink-streaming-on-aks-taskmanager.yml |
1 | $ envsubst <flink-streaming-on-aks-taskmanager.yml | kubectl apply -f - |
目前启动了 JobManager 和 TaskManager 的 AKS Pods 作为整个 Streaming 流计算任务的计算资源,下面进行实时计算任务测试。Flink WebUI如下:
Flink JobManager 的 WebUI 可以清晰看到 Job 以及整个 Flink Job 的 Pipeline,包括所有的 Log 和 GC 的情况也有显示。
3.4.5 Flink Streaming Job 运行和结果
还记得 3.6.1 的 Java 代码吗? 此时我们需要通过 nc 命令打开 9999 端口,然后输入 messages (输出系统日志来模拟) :
1 | $ nc -lk 9999 |
Flink Streaming Job 实时结果如下:
1 | $ kubectl logs -f -l instance=$JOB-taskmanager |
随着实时任务不断的输入追加,整个 Flink Streaming Job 都处于 Running 状态,永远都不会停下来。
3.5 Flink Batch on AKS
玩完 Streaming 流计算,我们再来看看批处理任务。其实批处理任务和流计算任务本属一家,只是时间维度上的特例,所以这也是 Flink 对流批任务处理的思想的统一。下面我们也运行同样的 WordCount 批处理任务来玩玩 Flink Batch on AKS。
3.5.1 下载 Flink 二进制文件
从 Flink 官网 选择合适的版本下载,本例中使用 1.9.1 版本。
1 | wget https://www-us.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz |
3.5.1 编写 ConfigMap Yaml 文件
1 | $ vi flink-batch-on-aks-configmap.yaml |
创建并查看 ConfigMap:
1 | $ kubectl apply -f flink-batch-on-aks-configmap.yaml |
3.5.2 编写 JobManager Yaml 文件
1 | $ vi flink-batch-on-aks-jobmanager.yaml |
创建并查看 JobManager :
1 | $ kubectl apply -f flink-batch-on-aks-jobmanager.yaml |
3.5.3 编写 TaskManager Yaml 文件
1 | $ vi flink-batch-on-aks-taskmanager.yml |
创建并查看 TaskManager:
1 | $ kubectl apply -f flink-batch-on-aks-taskmanager.yaml |
3.5.4 编写 JobManager Service Yaml 文件
1 | $ vi flink-batch-on-aks-jobmanager-service.yaml |
创建并查看 TaskManager Service :
1 | $ kubectl apply -f flink-batch-on-aks-jobmanager-service.yaml |
3.5.5 启动 Flink UI :
1 | $ kubectl port-forward flink-batch-on-aks-jobmanager-7ddb5cf4bc-nh7b8 --address 0.0.0.0 8081:8081 |
3.5.6 提交 Flink Batch Job 到 AKS 集群 :
1 | $ ./bin/flink run -m 10.11.4.4:8081 ./examples/streaming/WordCount.jar --input ./input.txt |
可以通过 Flink Batch UI 查看计算结果。
4. 总结
Flink 的思想是把一切任务都看作流来实现流批的计算。本文举例了 2 个场景,验证了在 AKS 上如何运行 Flink Streaming & Batch 作业,希望能够给大家一些参考。但是本文也有很多其他场景并未考虑到,比如:JobManager HA、External Checkpoints (可以和 Azure Blob Storage集成)、动态的任务扩容、与 Eventhub 或 Kafka 的集成等等,这些场景留待大家自己发现和实践了。