资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 软件开发 > 后端开发 > Java

Flink流处理引擎系统学习(二)

Java 更新时间: 发布时间: 计算机考试归档 最新发布

Flink流处理引擎系统学习(二)

前言

第一个Flink示例,首先分享下课程李老师关于任何一门新技术的学习方法:
1、进官网
2、找项目构建
3、找example里的demo代码
4、拷贝到构建的项目里
go


一、Flink的项目构建

一般官网都会提供项目的构建步骤、或者脚本的。Flink在这里就是提供的脚本,步骤如下:
1、打开官网:https://flink.apache.org/

2、拷贝建Flink项目的脚本

当然,其实是可以在idea里New Project一步步选择Flink的依赖,新建项目的。
这里要说的是,这个脚本需要整理下,整理成一行 ""实际应该是换行符号。
整理后的脚本:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.4 -DgroupId=frauddetection  -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false

注意:
如果想自定义项目名称,修改frauddetection为自定义名称就行。


ok,创建demo项目成功。
特别说明:
脚本建的demo项目需要修改依赖范围provided为compile,否则运行报错。

二、第一个example 1.githup找example

打开github首页,搜索框输入flink,搜索。

先找个批处理的统计,wordCount试试。

2.拷贝到demo项目

这里我就整理下2个类,直接拷贝下来,按照自己自定义的包名改下,重现引包就行
WordCount类

package spendreport.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
import org.apache.flink.util.Collector;


public class WordCount {
// *************************************************************************
  //     PROGRAM
  // *************************************************************************

  public static void main(String[] args) throws Exception {

    //1、获取命令行参数
    final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    // get input data
    DataSet text = null;
    if (params.has("input")) {
      // union all the inputs from text files
      for (String input : params.getMultiParameterRequired("input")) {
        if (text == null) {
          text = env.readTextFile(input);
        } else {
          text = text.union(env.readTextFile(input));
        }
      }
      Preconditions.checkNotNull(text, "Input DataSet should not be null.");
    } else {
      // get default test text data
      System.out.println("Executing WordCount example with default input data set.");
      System.out.println("Use --input to specify file input.");
      text = WordCountData.getDefaultTextLineDataSet(env);
    }

    DataSet> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .groupBy(0)
            .sum(1);

    // emit result
    if (params.has("output")) {
      counts.writeAsCsv(params.get("output"), "n", " ");
      // execute program
      env.execute("WordCount Example");
    } else {
      System.out.println("Printing result to stdout. Use --output to specify output path.");
      counts.print();
    }
  }

  // *************************************************************************
  //     USER FUNCTIONS
  // *************************************************************************

  
  public static final class Tokenizer
      implements FlatMapFunction> {

    @Override
    public void flatMap(String value, Collector> out) {
      // normalize and split the line
      String[] tokens = value.toLowerCase().split("\W+");

      // emit the pairs
      for (String token : tokens) {
        if (token.length() > 0) {
          out.collect(new Tuple2<>(token, 1));
        }
      }
    }
  }
}

WordCountData类

package spendreport.batch;

   



import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;


public class WordCountData {

    public static final String[] WORDS =
            new String[] {
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,",
                "And by opposing end them?--To die,--to sleep,--",
                "No more; and by a sleep to say we end",
                "The heartache, and the thousand natural shocks",
                "That flesh is heir to,--'tis a consummation",
                "Devoutly to be wish'd. To die,--to sleep;--",
                "To sleep! perchance to dream:--ay, there's the rub;",
                "For in that sleep of death what dreams may come,",
                "When we have shuffled off this mortal coil,",
                "Must give us pause: there's the respect",
                "That makes calamity of so long life;",
                "For who would bear the whips and scorns of time,",
                "The oppressor's wrong, the proud man's contumely,",
                "The pangs of despis'd love, the law's delay,",
                "The insolence of office, and the spurns",
                "That patient merit of the unworthy takes,",
                "When he himself might his quietus make",
                "With a bare bodkin? who would these fardels bear,",
                "To grunt and sweat under a weary life,",
                "But that the dread of something after death,--",
                "The undiscover'd country, from whose bourn",
                "No traveller returns,--puzzles the will,",
                "And makes us rather bear those ills we have",
                "Than fly to others that we know not of?",
                "Thus conscience does make cowards of us all;",
                "And thus the native hue of resolution",
                "Is sicklied o'er with the pale cast of thought;",
                "And enterprises of great pith and moment,",
                "With this regard, their currents turn awry,",
                "And lose the name of action.--Soft you now!",
                "The fair Ophelia!--Nymph, in thy orisons",
                "Be all my sins remember'd."
            };

    public static DataSet getDefaultTextLineDataSet(ExecutionEnvironment env) {
        return env.fromElements(WORDS);
    }
}

demo项目结构

运行结果


总结
  1. 确实香,而且耗性能还低,比自己写java方法处理低。
  2. DataSet与DataStream区别(这个example里用的DataSet)
  • 表示Flink app中的分布式数据集
  • 包含重复的、不可变数据集
  • DataSet有界、DataStream可以是无界
  • 可以从数据源、也可以通过各种转换操作创建
  1. Flink共通的编码套路
  • 获取执行环境(execution environment)
  • 加载/创建初始数据集
  • 对数据集进行各种转换操作(生成新的数据集)
  • 指定将计算的结果放到何处去
  • 触发APP执行
  1. 懒性计算
  • Flink APP都是延迟执行的
  • 只有当execute()被显示调用时才会真正执行
  • 本地执行还是在集群上执行取决于执行环境的类型
  • 好处:用户可以根据业务构建复杂的应用,Flink可以整体进优化并生成执行计划
    好了,就跟大家分享到这里,tegether go up!!!
转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/820070.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【Flink流处理引擎系统学习(二)】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2