关山难越,谁悲失路之人;萍水相逢,尽是他乡之客。
百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程教程 > 技术文章 > 正文

Flink入门:读取Kafka实时数据流,实现WordCount

guanshanw 2023-09-17 17:24 34 浏览 0 评论

本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上。通过本文你可以了解如何编写和运行Flink程序。

代码拆解

首先要设置Flink的执行环境:

// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

设置Kafka相关参数,连接对应的服务器和端口号,读取名为Shakespeare的Topic中的数据源,将数据源命名为stream:

// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";

// Source
FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);

使用Flink算子处理这个数据流:

// Transformations
// 使用Flink算子对输入流的文本进行操作
// 按空格切词、计数、分区、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream
    .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
      String[] tokens = line.split("\\s");
      // 输出结果 (word, 1)
      for (String token : tokens) {
        if (token.length() > 0) {
          collector.collect(new Tuple2<>(token, 1));
        }
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT))
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);

这里使用的是Flink提供的DataStream级别的API,主要包括转换、分组、窗口和聚合等操作。

将数据流打印:

// Sink
wordCount.print();

最后执行这个程序:

// execute
env.execute("kafka streaming word count");

env.execute 是启动Flink作业所必需的,只有在execute()被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行。

完整代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class WordCountKafkaInStdOut {

    public static void main(String[] args) throws Exception {

        // 创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "Shakespeare";
        String outputTopic = "WordCount";

        // Source
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        // Transformations
        // 使用Flink算子对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        DataStream<Tuple2<String, Integer>> wordCount = stream
            .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
                String[] tokens = line.split("\\s");
                // 输出结果 (word, 1)
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        // Sink
        wordCount.print();

        // execute
        env.execute("kafka streaming word count");

    }
}

执行程序

我们在Kafka能做什么?十分钟构建你的实时数据流管道这篇文章中曾提到如何启动一个Kafka集群,并向某个Topic内发送数据流。在本次Flink作业启动之前,我们先要按照那篇文章中提到的方式启动一个Kafka集群,创建对应的Topic,并向Topic中写入数据。

Intellij Idea调试执行

在IntelliJ Idea中,点击绿色按钮,执行这个程序。下图中任意两个绿色按钮都可以启动程序。

IntelliJ Idea下方会显示程序中输出到标准输出上的内容,包括本次需要打印的结果。

恭喜你,你的第一个Flink程序运行成功!

在集群上提交作业

第一步中我们已经下载并搭建了本地集群,接着我们在模板的基础上添加了代码,并可以在IntelliJ Idea中调试运行。在生产环境,一般需要将代码编译打包,提交到集群上。

注意,这里涉及两个目录,一个是我们存放我们刚刚编写代码的工程目录,简称工程目录,另一个是从Flink官网下载解压的Flink主目录,主目录下的bin目录中有Flink提供好的命令行工具。

进入工程目录,使用Maven命令行编译打包:

# 使用Maven将自己的代码编译打包
# 打好的包一般放在工程目录的target子文件夹下
$ mvn clean package

回到刚刚下载解压的Flink主目录,使用Flink提供的命令行工具flink,将我们刚刚打包好的作业提交到集群上。命令行的参数--class用来指定哪个主类作为入口。我们之后会介绍命令行的具体使用方法。

$ bin/flink run --class com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut /Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar

这时,仪表盘上就多了一个Flink程序。

程序的输出会打到Flink主目录下面的log目录下的.out文件中,使用下面的命令查看结果:

$ tail -f log/flink-*-taskexecutor-*.out

停止本地集群:

$ ./bin/stop-cluster.sh

Flink开发和调试过程中,一般有几种方式执行程序:

  1. 使用IntelliJ Idea内置的运行按钮。这种方式主要在本地调试时使用。
  2. 使用Flink提供的标准命令行工具向集群提交作业,包括Java和Scala程序。这种方式更适合生产环境。
  3. 使用Flink提供的其他命令行工具,比如针对Scala、Python和SQL的交互式环境。这种方式也是在调试时使用。

相关推荐

七条简单命令让您玩转Git
七条简单命令让您玩转Git

凭借着出色的协作能力、快速部署效果与代码构建辅助作用,Git已经得到越来越多企业用户的青睐。除了用于开发商业及消费级应用之外,众多科学及政府机构也开始尝试使用这...

2023-10-07 12:14 guanshanw

基本完整的关于Git分支branch的操作
基本完整的关于Git分支branch的操作

Git使用背景项目中要用到dev或者其他分支开发完代码,需要将该分支合并到master的需求操作步骤下面以dev名称为lex为分支名为例来操作一遍客户端操作:...

2023-10-07 12:14 guanshanw

Git 进阶(合并与变基)
Git 进阶(合并与变基)

在Git中整合来自不同分支的修改主要有两种方法:合并(merge)以及变基(rebase)合并(merge)merge流程图merge的原理是找到这两个分...

2023-10-07 12:13 guanshanw

Git学习笔记 003 Git进阶功能 part5 合并(第一部分)

合并(merge)是很常用的操作。尤其是一个庞大的很多人参与开发的企业级应用。一般会设定一个主分支,和多个副分支。在副分支开发完成后,合并到主分支中。始终保持主分支是一个完整的,稳定的最新状态的分支。...

非标题党,三张图帮你理解git merge和git rebase的区别
非标题党,三张图帮你理解git merge和git rebase的区别

初始场景:基于正常的开发分支修改几个小bug,然后在合并到开发分支上。gitmergegitcheckoutfeaturegitmergeho...

2023-10-07 12:13 guanshanw

git 初次使用(01)
git 初次使用(01)

先从github上克隆代码下来:使用vscode克隆代码如下图,填写上github仓库地址:vscode有时候克隆代码速度比较慢,可以用命令行方式克隆gitc...

2023-10-07 12:12 guanshanw

Git 远程操作

4.Git远程操作命令说明gitremote远程版本库操作gitfetch从远程获取版本库gitpull下载远程代码并合并gitpush上传远程代码并合并4.1远程版本库操作gitre...

Git常用命令-总结
Git常用命令-总结

创建git用户$gitconfig--globaluser.name"YourName"$gitconfig--globaluser.em...

2023-10-07 12:12 guanshanw

git中删除从别人clone下来项目的git信息,并修改为自己的分支

如果你从别人的Git存储库中克隆了一个项目,并想要删除与该存储库相关的Git信息,并将其修改为你自己的分支,则可以执行以下步骤:使用gitclone命令克隆存储库:gitclone<u...

git系列-回滚和放弃本地修改

回滚历史提交就是reset的功能。这种情况是已经提交远程仓库,需要回滚到之前的提交。gitreset--hardcommitId//注:强制提交后,当前版本后面的提交版本将会删掉!gi...

GIT使用小技巧大全
GIT使用小技巧大全

在大型软件工程的开发过程中,版本控制是无法绕过去的;目前来说,最火的版本控制软件就是GIT了。早两年SVN比较火,不过被大神linus喷了几次后,就日落西山了,...

2023-10-07 12:11 guanshanw

git相关命令-上
git相关命令-上

这些命令都是看了文档后,个人觉得比较有用的一些,展示给大家。回到远程仓库的状态抛弃本地所有的修改,回到远程仓库的状态。gitfetch--all&...

2023-10-07 12:10 guanshanw

Git命令行接口:掌握Git的必备技能
Git命令行接口:掌握Git的必备技能

Git是一款强大的分布式版本控制工具,它支持命令行界面操作。熟练掌握Git命令行接口,是开发者使用Git的必备技能之一。在这篇文章中,我们将介绍Git命令行接口...

2023-10-07 12:10 guanshanw

Git命令详解
Git命令详解

相信各位小伙伴们应该都对git有一些了解,毕竟作为代码管理的神器,就算不是IT行业的小伙伴肯定也或多或少的听说过一些。今天就来和小伙伴们分享一下自己总结的常用命...

2023-10-07 12:10 guanshanw

工作7年收集到的git命令
工作7年收集到的git命令

概念git中的术语解释:仓库也叫版本库(repository)stage:暂存区,add后会存到暂存区,commit后提交到版本库git安装linux...

2023-10-07 12:10 guanshanw

取消回复欢迎 发表评论: