- 一、说明
- 二、编写程序
- 三、运行演示
- 四、打包部署到服务器
本案例实现的功能统计对Kafka中的消息单词出现的次数,即词频统计。其主要演示了Flink流式程序消费kafka中的消息,其目的想让初学者了解Flink如何编写消费Kafka中消息的程序以及通过程序的演示来进一步学习flink。
实验环境:
- Kafka版本是:kafka_2.11-2.3.1
- Flink版本是:flink-1.10.1
- Kafka单节点部署在slave01上(安装过程可以参考我的其他博文)
- 开发工具IDEA
- 创建maven项目
- 添加flink依赖,可参考Flink的流批WordCount入门案例
- 添加Kafka的依赖到pom.xml中,将如下内容添加至pom.xml中:
org.apache.flink flink-connector-kafka-0.11_2.12 1.10.1 - 编写程序,完成代码如下所示:
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import java.util.Properties object SourceFromKafka { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "slave01:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)) stream3.flatMap(_.split(" ")) .map((_,1)) .keyBy(0) .sum(1) .print() env.execute("SourceFromKafka Job starting……") } }
- 启动Kafka,在Kafka安装目录下的config目录下执行如下命令即可:
kafka-server-start.sh ./server.properties &
- 创建Kafka主题sensor,执行如下命令:
kafka-topics.sh --create --zookeeper master:2181,slave01:2181,slave02:2181,--replication-factor 1 --partitions 2 --topic sensor
- 启动Kafka控制台生产者,用于生成数据,执行如下命令:
kafka-console-producer.sh --broker-list slave01:9092 --topic sensor
- 在IDEA中运行我们写好的程序,成功后如下图所示:
- Kafka生产者控制台界面中分别输入如下数据:
hello world
hello world
hello world
hello world
hello world - 观察IDEA中程序控制台中输出的结果,如下图所示:
- IDEA中右侧菜单:maven-clean-compile-package
- 将以下的Kafka依赖jar包上传至flink安装目录下的lib目录下:下载链接
- kafka-clients-2.3.1.jar
- flink-connector-kafka-base_2.12-1.10.1.jar
- flink-connector-kafka-0.11_2.12-1.10.1.jar
- 如果以上三个jar依赖不行,根据报错,可以将下列两个jar包也上传至flink安装目录下的lib目录:
flink-connector-kafka-0.10_2.12-1.10.1.jar
flink-connector-kafka-0.9_2.12-1.10.1.jar
完整下载
- 打开Flink的WebUI,并将程序包上传到服务器
- 查看程序运行结果