新一代大数据计算框架 Flink 与 Microsoft Azure Kubernetes 集成来做流批计算

1. 前言

上一篇,我们验证测试了 Apache Spark 和 Azure Kubernetes Service 的集成,包括通过 Cluster Mode 依托 AKS 集群运行 Spark 作业以及和 Azure Blob Storage 集成来做计算存储的解耦分离。和传统的资源调度框架 Hadoop Yarn 相比, 使用 Azure Kubernetes Service 提高了部署速度和效率,提高灵活性并一定程度降低了运维管理成本。Spark 在强大的社区支撑下,技术的成熟度是在几大大数据计算框架中最高也是最全面的。但是除了 Spark,在大数据计算框架中还有一颗冉冉升起的新星,It is Apache Flink。本文我们来聊聊 Apache Flink 以及如何与 Azure Kubernetes Service 集成来做流批计算。

Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。

(from: https://flink.apache.org/)

Flink 的代码主要是由 Java 语言实现的,部分代码是 Scala,提供主流的 Java 及 Scala API,并且从 1.0 版本开始也提供 Python API 即 Pyflink。对 Flink 而言,其所要处理的主要场景就是 Streaming 流数据,Batch 批数据只是流数据的一个时间维度上的特例而已,所以 Flink 是一款真正的流批统一的计算引擎,这种设计思路对于某些业务场景可以提高代码的复用率,所以这也是 Flink 被很多大厂接受并广泛使用的原因之一。下面我们来看看 Flink 的内部架构:

从下至上来看:
部署: Flink 支持本地运行(IDE 中直接运行程序)、能在独立集群(Standalone 模式)或者在被 YARN、Mesos、K8s 管理的集群上运行,也能部署在云上。
运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。
API:DataStream、DataSet、Table、SQL API。
扩展库:Flink 还包括用于 CEP(复杂事件处理)、机器学习、图形处理等场景。

本文仅调研 Flink on Azure Kubernetes 并做流批场景的技术可行性验证,对于其他部署模式不展开讨论,有兴趣的同学可以自行研究。

本实验需要正确安装 Azure Cli、Terraform,Kubectl、Docker 等软件包,详细步骤请参考相应文档,本文不进行赘述。具体的操作过程在 AKS Vnet 的一台内网服务器上进行,该服务器作为 Flink 集群的跳板机使用。

3.1 创建 AKS 集群、ACR 及跳板机

为了提高效率,本文直接使用 Terraform 来做自动化一键部署,Terraform 的介绍不做赘述,同学们自行搜索吧。废话少说,直接上 Code,具体 Terraform Azure 的用法可以参考这里。本文按照 Terraform 的最佳实践结构设计,由于 variables.tf 包含一些个人 Azure 账号信息,所以本文只展现主要的 azure-aks.tf 文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# Configure the Microsoft Azure Provider
provider "azurerm" {
subscription_id = "${var.subscription_id}"
client_id = "${var.client_id}"
client_secret = "${var.client_secret}"
tenant_id = "${var.tenant_id}"
}

# Create a resource group if it doesn’t exist
resource "azurerm_resource_group" "myterraformgroup" {
name = "${var.resource_group_name}"
location = "${var.location}"
tags = {
environment = "Azure Terraform Kubernetes Service"
}
}

# Create virtual network
resource "azurerm_virtual_network" "myterraformnetwork" {
name = "${var.vnet_name}"
address_space = "${var.vnet_address_space}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"

tags = {
environment = "Azure Terraform Kubernetes Service"
}
}

# Create Network Security Group and rule
resource "azurerm_network_security_group" "public_nsg" {
name = "${var.public_nsg}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"

security_rule {
name = "SSH"
priority = 1001
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "22"
source_address_prefix = "*"
destination_address_prefix = "*"
}

tags = {
environment = "Azure Terraform Kubernetes Service"
}
}
resource "azurerm_network_security_group" "op_nsg" {
name = "${var.op_nsg}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"

security_rule {
name = "SSH"
priority = 1001
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "22"
source_address_prefix = "*"
destination_address_prefix = "*"
}

tags = {
environment = "Azure Terraform Kubernetes Service"
}
}
resource "azurerm_network_security_group" "aks_nsg" {
name = "${var.aks_nsg}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"

security_rule {
name = "SSH"
priority = 1001
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "22"
source_address_prefix = "*"
destination_address_prefix = "*"
}
security_rule {
name = "AKSWebUI"
priority = 1002
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "8081"
source_address_prefix = "*"
destination_address_prefix = "*"
}

tags = {
environment = "Azure Terraform Kubernetes Service"
}
}

# Create subnet
resource "azurerm_subnet" "public_subnet" {
name = "${var.public_subnet}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
virtual_network_name = "${azurerm_virtual_network.myterraformnetwork.name}"
address_prefix = "${var.public_subnet_address_prefix}"
network_security_group_id = "${azurerm_network_security_group.public_nsg.id}"
}
resource "azurerm_subnet" "op_subnet" {
name = "${var.op_subnet}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
virtual_network_name = "${azurerm_virtual_network.myterraformnetwork.name}"
address_prefix = "${var.op_subnet_address_prefix}"
network_security_group_id = "${azurerm_network_security_group.public_nsg.id}"
}
resource "azurerm_subnet" "aks_subnet" {
name = "${var.aks_subnet}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
virtual_network_name = "${azurerm_virtual_network.myterraformnetwork.name}"
address_prefix = "${var.aks_subnet_address_prefix}"
network_security_group_id = "${azurerm_network_security_group.aks_nsg.id}"
}

# Create Azure Kubernetes Service Cluster
resource "azurerm_log_analytics_workspace" "myterraform_log_analytics_workspace" {
# The WorkSpace name has to be unique across the whole of azure, not just the current subscription/tenant.
name = "${var.log_analytics_workspace_name}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
sku = "${var.log_analytics_workspace_sku}"
}
resource "azurerm_log_analytics_solution" "myterraform_log_analytics_solution" {
solution_name = "ContainerInsights"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
workspace_resource_id = "${azurerm_log_analytics_workspace.myterraform_log_analytics_workspace.id}"
workspace_name = "${azurerm_log_analytics_workspace.myterraform_log_analytics_workspace.name}"

plan {
publisher = "Microsoft"
product = "OMSGallery/ContainerInsights"
}
}
resource "azurerm_kubernetes_cluster" "myterraform_kubernetes_cluster" {
name = "${var.azure_kubernetes_service_cluster_name}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
dns_prefix = "${var.azure_kubernetes_service_dns_prefix}"

kubernetes_version = "${var.azure_kubernetes_service_cluster_version}"

linux_profile {
admin_username = "${var.admin_username}"

ssh_key {
key_data = "${var.ssh_public_key_data}"
}
}

agent_pool_profile {
name = "agentpool"
count = "${var.agent_count}"
vm_size = "Standard_D4s_v3"
os_type = "Linux"
os_disk_size_gb = 30
vnet_subnet_id = "${azurerm_subnet.aks_subnet.id}"
}

network_profile {
network_plugin = "azure"
network_policy = "azure"
dns_service_ip = "10.0.0.10"
docker_bridge_cidr = "172.17.0.1/16"
service_cidr = "10.0.0.0/16"
load_balancer_sku = "standard"
}

role_based_access_control {
enabled = "true"
}

addon_profile {
oms_agent {
enabled = "true"
log_analytics_workspace_id = "${azurerm_log_analytics_workspace.myterraform_log_analytics_workspace.id}"
}
}

service_principal {
client_id = "${var.client_id}"
client_secret = "${var.client_secret}"
}

tags = {
Environment = "Azure Terraform Kubernetes Service"
}
}

# Create Azure Container Registry
resource "azurerm_container_registry" "myterraform_container_registry" {
name = "${var.azure_container_registry_name}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
location = "${azurerm_resource_group.myterraformgroup.location}"
sku = "Premium"
admin_enabled = false
}

# Create AKS JumperVM
resource "azurerm_public_ip" "aks_jumpervm0001_pip" {
name = "${var.aks_jumpervm_name}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
allocation_method = "Static"

tags = {
environment = "Azure Terraform Kubernetes Service"
}
}
resource "azurerm_network_interface" "aks_jumpervm0001_nic" {
name = "${var.aks_jumpervm_nic_name}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"

ip_configuration {
name = "${var.aks_jumpervm_nic_name}"
subnet_id = "${azurerm_subnet.aks_subnet.id}"
private_ip_address_allocation = "Dynamic"
public_ip_address_id = "${azurerm_public_ip.aks_jumpervm0001_pip.id}"
}

tags = {
environment = "Azure Terraform Kubernetes Service"
}
}

# Create virtual machine
resource "azurerm_virtual_machine" "aksjumpervm0001" {
name = "${var.aks_jumpervm_name}"
location = "${azurerm_resource_group.myterraformgroup.location}"
resource_group_name = "${azurerm_resource_group.myterraformgroup.name}"
network_interface_ids = ["${azurerm_network_interface.aks_jumpervm0001_nic.id}"]
vm_size = "Standard_D2s_v3"

storage_os_disk {
name = "${var.aks_jumpervm_name}OsDisk"
caching = "ReadWrite"
create_option = "FromImage"
managed_disk_type = "Standard_LRS"
}

storage_image_reference {
id = "/subscriptions/xxxx-xxxx-xxxx-xxxx-xxxx/resourceGroups/xxxx-xxxx/providers/Microsoft.Compute/images/xxxx"
}

os_profile {
computer_name = "${var.aks_jumpervm_name}"
admin_username = "azureuser"
}

os_profile_linux_config {
disable_password_authentication = true
ssh_keys {
path = "/home/azureuser/.ssh/authorized_keys"
key_data = "${var.ssh_public_key_data}"
}
}
}

所有资源会在 10 分钟以内部署完成。

3.2 获取 AKS Credential 并查看 AKS Cluster 信息:

1
az aks get-credentials --resource-group aksrg001 --name akscluster001

集群节点信息:

1
2
3
4
5
$ kubectl get node
NAME STATUS ROLES AGE VERSION
aks-agentpool-26667448-0 Ready agent 17m v1.14.8
aks-agentpool-26667448-1 Ready agent 16m v1.14.8
aks-agentpool-26667448-2 Ready agent 16m v1.14.8

3.3 Docker Login 登陆 ACR (Azure Container Registry)

1
docker login aksacr001.azurecr.io

3.4.1 Streaming 任务示例

该任务会从某个端口中读取文本,分割为单词,并且每 5 秒钟打印一次每个单词出现的次数。以下代码是从 Flink 官方文档 上获取来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("10.11.4.4", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);

dataStream.print();

env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}

}

接下来执行 mvn clean package 命令,打包好的 Jar 文件路径为 target/flink-on-kubernetes-1.0.0-SNAPSHOT-jar-with-dependencies.jar。

Flink 在 DockerHub 上提供了一个官方的容器镜像,我们可以以该镜像为基础,构建独立的 Flink 镜像,主要就是将自定义 Jar 包打到镜像里面去。此外,新版 Flink 已将 Hadoop 依赖从官方发行版中剥离,因此我们在打镜像时也需要包含进去。官方的 Dockerfile 主要做了将 OpenJDK 1.8 作为 JDK 环境,下载并安装 Flink 至 /opt/flink,同时添加 flink 用户和组等。我们在此基础上构建自定义 Flink 镜像:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
FROM flink:1.9.1-scala_2.12

ARG hadoop_jar
ARG job_jar

ENV FLINK_CONF=$FLINK_HOME/conf/flink-conf.yaml

RUN set -x && \
sed -i -e "s/jobmanager\.heap\.size:.*/jobmanager.heap.size: 128m/g" $FLINK_CONF && \
sed -i -e "s/taskmanager\.heap\.size:.*/taskmanager.heap.size: 256m/g" $FLINK_CONF

COPY --chown=flink:flink $hadoop_jar $job_jar $FLINK_HOME/lib/

USER flink

注:hadoop_jar 从这里下载;job_jar 为刚刚编译打包好的 flink-on-kubernetes-1.0.0-SNAPSHOT-jar-with-dependencies.jar。下载好后做编译:

1
2
3
4
$ cd /path/to/Dockerfile
$ cp /path/to/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar hadoop.jar
$ cp /path/to/flink-on-kubernetes-1.0.0-SNAPSHOT-jar-with-dependencies.jar job.jar
$ docker build --build-arg hadoop_jar=hadoop.jar --build-arg job_jar=job.jar --tag flink-on-kubernetes:1.0.0 .

查看镜像:

1
2
3
$ docker image ls
REPOSITORY TAG IMAGE ID CREATED SIZE
flink-on-kubernetes 1.0.0 25a02549b943 8 seconds ago 575MB

镜像推送至 ACR:

1
2
docker tag flink-on-kubernetes:1.0.0 aksacr001.azurecr.io/flink-on-kubernetes:v1
docker push aksacr001.azurecr.io/flink-on-kubernetes:v1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ vi flink-streaming-on-aks-jobmanager.yml
apiVersion: batch/v1
kind: Job
metadata:
name: ${JOB}-jobmanager
spec:
template:
metadata:
labels:
app: flink
instance: ${JOB}-jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: aksacr001.azurecr.io/flink-on-kubernetes:v1
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
"-Djobmanager.rpc.address=${JOB}-jobmanager",
"-Dparallelism.default=1",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui

使用 kubectl 命令创建对象,并查看状态:

1
2
3
4
5
$ export JOB=flink-streaming-on-aks
$ envsubst <flink-streaming-on-aks-jobmanager.yml | kubectl apply -f -
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-streaming-on-aks-jobmanager-xhkkc 1/1 Running 0 48s

接着创建一个 Service 来将 JobManager 的端口开放出来,以便 TaskManager 做服务注册和调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ vi flink-streaming-on-aks-service.yml
apiVersion: v1
kind: Service
metadata:
name: ${JOB}-jobmanager
spec:
selector:
app: flink
instance: ${JOB}-jobmanager
type: NodePort
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081

1
2
3
4
$ envsubst <flink-streaming-on-aks-service.yml | kubectl apply -f -
$ kubectl get svc flink-streaming-on-aks-jobmanager
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-streaming-on-aks-jobmanager NodePort 10.0.25.24 <none> 6123:32291/TCP,6124:30580/TCP,6125:30458/TCP,8081:30855/TCP 46s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ vi flink-streaming-on-aks-taskmanager.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ${JOB}-taskmanager
spec:
selector:
matchLabels:
app: flink
instance: ${JOB}-taskmanager
replicas: 3
template:
metadata:
labels:
app: flink
instance: ${JOB}-taskmanager
spec:
containers:
- name: taskmanager
image: aksacr001.azurecr.io/flink-on-kubernetes:v1
command: ["/opt/flink/bin/taskmanager.sh"]
args: ["start-foreground", "-Djobmanager.rpc.address=${JOB}-jobmanager"]
1
2
3
4
5
6
7
$ envsubst <flink-streaming-on-aks-taskmanager.yml | kubectl apply -f -
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-streaming-on-aks-jobmanager-xhkkc 1/1 Running 0 19m
flink-streaming-on-aks-taskmanager-b9df9657d-2vnrt 0/1 Running 0 5s
flink-streaming-on-aks-taskmanager-b9df9657d-5nx95 0/1 Running 0 6s
flink-streaming-on-aks-taskmanager-b9df9657d-g2p7w 0/1 Running 0 5s

目前启动了 JobManager 和 TaskManager 的 AKS Pods 作为整个 Streaming 流计算任务的计算资源,下面进行实时计算任务测试。Flink WebUI如下:

Flink JobManager 的 WebUI 可以清晰看到 Job 以及整个 Flink Job 的 Pipeline,包括所有的 Log 和 GC 的情况也有显示。

还记得 3.6.1 的 Java 代码吗? 此时我们需要通过 nc 命令打开 9999 端口,然后输入 messages (输出系统日志来模拟) :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ nc -lk 9999
Sep 18 05:17:16 localhost journal: Runtime journal is using 8.0M (max allowed 398.2M, trying to leave 597.3M free of 3.8G available → current limit 398.2M).

Sep 18 05:17:16 localhost kernel: Initializing cgroup subsys cpuset
Sep 18 05:17:16 localhost kernel: Initializing cgroup subsys cpu
Sep 18 05:17:16 localhost kernel: Initializing cgroup subsys cpuacct

Sep 18 05:17:16 localhost kernel: BIOS-e820: [mem 0x0000000000000000-0x000000000009fbff] usable
Sep 18 05:17:16 localhost kernel: BIOS-e820: [mem 0x000000000009fc00-0x000000000009ffff] reserved
Sep 18 05:17:16 localhost kernel: BIOS-e820: [mem 0x00000000000e0000-0x00000000000fffff] reserved
Sep 18 05:17:16 localhost kernel: BIOS-e820: [mem 0x0000000000100000-0x000000003ffeffff] usable

Sep 18 05:17:16 localhost kernel: DMI: Microsoft Corporation Virtual Machine/Virtual Machine, BIOS 090007 06/02/2017
Sep 18 05:17:16 localhost kernel: Hypervisor detected: Microsoft HyperV
Sep 18 05:17:16 localhost kernel: HyperV: features 0x2e7f, hints 0x44c2c
Sep 18 05:17:16 localhost kernel: Hyper-V Host Build:14393-10.0-0-0.299
Sep 18 05:17:16 localhost kernel: HyperV: LAPIC Timer Frequency: 0x30d40

Flink Streaming Job 实时结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
$ kubectl logs -f -l instance=$JOB-taskmanager
(,4)
(Sep,1)
(398.2M).,1)
(limit,1)
(current,1)
(→,1)
(available,1)
(3.8G,1)
(of,1)
(free,1)
(597.3M,1)
(leave,1)
(to,1)
(trying,1)
(398.2M,,1)
(allowed,1)
((max,1)
(8.0M,1)
(using,1)
(is,1)
(journal,1)
(Runtime,1)
(journal:,1)
(localhost,1)
(05:17:16,1)
(18,1)
(,2)
(cpuacct,1)
(cpu,1)
(cpuset,1)
(subsys,3)
(cgroup,3)
(Initializing,3)
(kernel:,3)
(localhost,3)
(05:17:16,3)
(18,3)
(Sep,3)
(Sep,4)
(0x0000000000100000-0x000000003ffeffff],1)
(0x00000000000e0000-0x00000000000fffff],1)
(reserved,2)
(0x000000000009fc00-0x000000000009ffff],1)
(usable,2)
(0x0000000000000000-0x000000000009fbff],1)
([mem,4)
(BIOS-e820:,4)
(kernel:,4)
(localhost,4)
(05:17:16,4)
(18,4)
(,1)
(Sep,5)
(0x30d40,1)
(Frequency:,1)
(Timer,1)
(LAPIC,1)
(Build:14393-10.0-0-0.299,1)
(Host,1)
(Hyper-V,1)
(0x44c2c,1)
(hints,1)
(0x2e7f,,1)
(features,1)
(HyperV:,2)
(HyperV,1)
(detected:,1)
(Hypervisor,1)
... 后面结果省略了 ...

随着实时任务不断的输入追加,整个 Flink Streaming Job 都处于 Running 状态,永远都不会停下来。

玩完 Streaming 流计算,我们再来看看批处理任务。其实批处理任务和流计算任务本属一家,只是时间维度上的特例,所以这也是 Flink 对流批任务处理的思想的统一。下面我们也运行同样的 WordCount 批处理任务来玩玩 Flink Batch on AKS。

Flink 官网 选择合适的版本下载,本例中使用 1.9.1 版本。

1
2
wget https://www-us.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz
tar -zvxf flink-1.9.1-bin-scala_2.12.tgz

3.5.1 编写 ConfigMap Yaml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ vi flink-batch-on-aks-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-batch-on-aks-configmap
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

创建并查看 ConfigMap:

1
2
3
4
$ kubectl apply -f flink-batch-on-aks-configmap.yaml
$ kubectl get cm
NAME DATA AGE
flink-batch-on-aks-configmap 2 25s

3.5.2 编写 JobManager Yaml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
$ vi flink-batch-on-aks-jobmanager.yaml 
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-batch-on-aks-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: aksacr001.azurecr.io/flink-on-kubernetes:v1
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-batch-on-aks-configmap
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties

创建并查看 JobManager :

1
2
3
4
$ kubectl apply -f flink-batch-on-aks-jobmanager.yaml
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-batch-on-aks-jobmanager-7ddb5cf4bc-nh7b8 1/1 Running 0 7s

3.5.3 编写 TaskManager Yaml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
$ vi flink-batch-on-aks-taskmanager.yml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-batch-on-aks-taskmanager
spec:
replicas: 2
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: aksacr001.azurecr.io/flink-on-kubernetes:v1
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
volumes:
- name: flink-config-volume
configMap:
name: flink-batch-on-aks-configmap
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties

创建并查看 TaskManager:

1
2
3
4
5
6
$ kubectl apply -f flink-batch-on-aks-taskmanager.yaml
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-batch-on-aks-jobmanager-7ddb5cf4bc-nh7b8 1/1 Running 0 2m28s
flink-batch-on-aks-taskmanager-cdc4498b9-6pctv 1/1 Running 0 11s
flink-batch-on-aks-taskmanager-cdc4498b9-kbkdf 1/1 Running 0 11s

3.5.4 编写 JobManager Service Yaml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ vi flink-batch-on-aks-jobmanager-service.yaml 
apiVersion: v1
kind: Service
metadata:
name: flink-batch-on-aks-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager

创建并查看 TaskManager Service :

1
2
3
4
$ kubectl apply -f flink-batch-on-aks-jobmanager-service.yaml 
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-batch-on-aks-jobmanager ClusterIP 10.0.23.214 <none> 6123/TCP,6124/TCP,8081/TCP 7s

1
$ kubectl port-forward flink-batch-on-aks-jobmanager-7ddb5cf4bc-nh7b8 --address 0.0.0.0 8081:8081

1
2
3
4
5
6
$ ./bin/flink run -m 10.11.4.4:8081 ./examples/streaming/WordCount.jar --input ./input.txt
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Program execution finished
Job with JobID 7a9037a6c6573a6a7f01f36e6e9d7382 has finished.
Job Runtime: 138 ms

可以通过 Flink Batch UI 查看计算结果。

4. 总结

Flink 的思想是把一切任务都看作流来实现流批的计算。本文举例了 2 个场景,验证了在 AKS 上如何运行 Flink Streaming & Batch 作业,希望能够给大家一些参考。但是本文也有很多其他场景并未考虑到,比如:JobManager HA、External Checkpoints (可以和 Azure Blob Storage集成)、动态的任务扩容、与 Eventhub 或 Kafka 的集成等等,这些场景留待大家自己发现和实践了。