Apache Beam的KafkaIO与Flink实战实例详解

Apache Beam简介

Apache Beam是大数据的编程模型,定义了数据处理的编程范式和接口,它并不涉及具体的执行引擎的实现,但是,基于Beam开发的数据处理程序可以执行在任意的分布式计算引擎上,目前Dataflow、SparkFlinkApex提供了对批处理和流处理的支持,GearPump提供了流处理的支持,Storm的支持也在开发中。

综上所述,Apache Beam的目标是:

提供统一批处理和流处理的编程范式
能运行在任何可执行的引擎之上

为无限、乱序、互联网级别的数据集处理提供简单灵活、功能丰富以及表达能力十分强大的SDK。

在本文章中,将详细介绍一个Apache Beam 程序通过 kafkaIO 读取 Kafka 集群的数据,进行数据格式转换。数据统计后,通过 KafkaIO写操作把消息写入 Kafka 集群。最后把程序运行在 Flink 的计算平台上。

开发环境安装

本文使用Centos安装开发需要的Kafka和Flink

系统版本:Centos 7

Kafka 版本:kafka_2.10-0.10.1.1.tgz (百度网盘,提取码: gp6e)

Flink 版本:flink-1.5.2-bin-hadoop27-scala_2.11.tgz (百度网盘,提取码: cqdi)

Kafka安装和运行

上传Kafka到服务器,例如:/home目录,并解压缩

tar xvf kafka_2.10-0.10.1.1.tgz

由于Kafka需要ZooKeeper,所以先启动ZooKeeper

cd /home/kafka_2.10-0.10.1.1
bin/zookeeper-server-start.sh config/zookeeper.properties &

启动Kafka

bin/kafka-server-start.sh config/server.properties &

Flink安装和运行

上传Flink到服务器,例如:/home目录,并解压缩

tar xvf flink-1.5.2-bin-hadoop27-scala_2.11.tgz

启动Flink

cd /home/flink-1.5.2/bin
./start-cluster.sh

在浏览器查看Flink运行情况(192.168.233.133为示例IP)

http://192.168.233.133:8081/

创建一个Maven项目

文章最后,附有完整Maven项目下载地址

在 pom 文件中添加依赖

beam版本:2.4.0

<!-- beam -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>{beam.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-core-java</artifactId>
    <version>{beam.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-flink_2.11</artifactId>
    <version>${beam.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- flink -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.5.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.5.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.5.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime_2.11</artifactId>
    <version>1.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-core</artifactId>
    <version>1.5.2</version>
    <scope>provided</scope>
</dependency>

<!-- kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.1</version>
</dependency>

在 pom 文件中添加打包插件

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.3</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.suyuening.beam.BeamFlinkKafka</mainClass>  <!--这里运行类!也就是入口类 -->
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

Beam完整示例代码

package com.suyuening.beam;

import org.apache.beam.runners.core.java.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;

/**
 * 类描述: Apache Beam 程序通过 kafkaIO 读取 Kafka 集群的数据,进行数据格式转换。数据统计后,通过 KafkaIO
 * 写操作把消息写入 Kafka 集群。最后把程序运行在 Flink 的计算平台上。
 * <p>
 * 系统版本 centos 7
 * </p>
 * <p>
 * Kafka 集群版本: kafka_2.10-0.10.1.1.tgz
 * </p>
 * <pFlink 版本:flink-1.5.2-bin-hadoop27-scala_2.11.tgz
 * </p>
 *
 * @author suyuening
 * @version 1.0
 * @date 2020/1/20 13:10
 */
public class BeamFlinkKafka {

	/**
	 * kafka 的服务器地址和端口,多个用逗号分隔。例如:
	 * 192.168.233.1:9092,192.168.233.2:9092,192.168.233.3:9092
	 */
	private static final String BOOTSTRAP_SERVERS = "192.168.233.133:9092";
	/** 要读取的 kafka 的 topic */
	private static final String KAFKA_READ_TOPIC = "testmsg";
	/** 要写入的 kafka 的 topic */
	private static final String KAFKA_WRITE_TOPIC = "senkafkamsg";

	public static void main(String[] args) {
		// 创建管道工厂
		PipelineOptions options = PipelineOptionsFactory.create();
		// 显式指定 PipelineRunner:FlinkRunner 必须指定如果不制定则为本地
		options.setRunner(FlinkRunner.class);
		// 设置相关管道
		Pipeline pipeline = Pipeline.create(options);
		// 这里 kV 后说明 kafka 中的 key 和 value 均为 String 类型
		PCollection<KafkaRecord<String, String>> lines = pipeline
				.apply(KafkaIO.<String, String>read().withBootstrapServers(BOOTSTRAP_SERVERS) // 必需设置 kafka 的服务器地址和端口
						.withTopic(KAFKA_READ_TOPIC)// 必需设置要读取的 kafka 的 topic
						.withKeyDeserializer(StringDeserializer.class)// 必需序列化 key
						.withValueDeserializer(StringDeserializer.class)// 必需序列化 value
						.updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")));// 这个属性kafka最常见的
		// 为输出的消息类型。或者进行处理后返回的消息类型
		PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata",
				ParDo.of(new DoFn<KafkaRecord<String, String>, String>() {
					private static final long serialVersionUID = 1L;

					@ProcessElement
					public void processElement(ProcessContext ctx) {
						System.out.println("输出的分区为 ----:" + ctx.element().getKV());
						ctx.output(ctx.element().getKV().getValue());// 其实我们这里是把"张海 涛在发送消息 ***"进行返回操作
					}
				}));
		PCollection<String> windowedEvents = kafkadata
				.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))));
		PCollection<KV<String, Long>> wordcount = windowedEvents.apply(Count.<String>perElement()); // 统计每一个 kafka 消息的
																									// Count
		PCollection<String> wordtj = wordcount.apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key 为 Word,Value为
																							// Count)
				new SimpleFunction<KV<String, Long>, String>() {
					private static final long serialVersionUID = 1L;

					@Override
					public String apply(KV<String, Long> input) {
						System.out.println("进行统计:" + input.getKey() + ": " + input.getValue());
						return input.getKey() + ": " + input.getValue();
					}
				}));

		wordtj.apply(KafkaIO.<Void, String>write().withBootstrapServers(BOOTSTRAP_SERVERS)// 设置写会 kafka 的集群配置地址
				.withTopic(KAFKA_WRITE_TOPIC)// 设置返回 kafka 的消息主题
				// .withKeySerializer(StringSerializer.class)// 这里不用设置了,因为上面 Void
				.withValueSerializer(StringSerializer.class)
				// Dataflow runner and Spark 兼容, Flink 对 kafka0.11 才支持。我的版本是 0.10 不兼容
				// .withEOS(20, "eos-sink-group-id")
				.values() // 只需要在此写入默认的 key 就行了,默认为 null 值
		); // 输出结果
		pipeline.run().waitUntilFinish();
	}
}

执行mvn clean package打包

通过Apache Flink Dashboard 提交 job

测试Job

添加Kafka主题

cd /home/kafka_2.10-0.10.1.1
bin/kafka-topics.sh --create --zookeeper 192.168.233.133:2181 --replication-factor 1 --partitions 3 --topic testmsg
bin/kafka-topics.sh --create --zookeeper 192.168.233.133:2181 --replication-factor 1 --partitions 3 --topic senkafkamsg

启动Kafka生产者,生产消息到testmsg。在控制台输入测试字符串

cd /home/kafka_2.10-0.10.1.1
bin/kafka-console-producer.sh -broker-list 192.168.233.133:9092 -topic testmsg

启动Kafka消费者,消费senkafkamsg主题,查看Flink处理消息后的结果

bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.133:9092 --from-beginning --topic senkafkamsg

效果图

完整Maven项目下载地址:https://github.com/infoplat/KafkaFlink

参考文章

Apache Beam实战指南 | 手把手教你玩转KafkaIO与Flink