资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 前沿技术 > 大数据 > 大数据系统

Confluent 源码学习 - SinkTask

大数据系统 更新时间: 发布时间: 计算机考试归档 最新发布

Confluent 源码学习 - SinkTask

SinkTask 的源码实际位于Kafka项目中,主要用在 Kafka Connect 模块,它是一个接收 Kafka 数据,输出到外部系统的 Task 抽象类。其父类 Task 是个接口,只有三个方法:

public interface Task {
    
    String version();

    
    void start(Map props);

    
    void stop();
}

简单明了,下面我们具体看下 SinkTask。

SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the {@link #put(Collection)} API, which should either write them to the downstream system or batch them for later writing. Periodically, Connect will call {@link #flush(Map)} to ensure that batched records are actually pushed to the downstream system.

SinkTask是一个任务,它接收从Kafka加载的records,并将它们发送到另一个系统。Connect框架为每个 Task 实例分配一组分区,并将处理从这些分区收到的所有records。当从Kafka获取records时,它们将使用 `#put(Collection) ` API 把记录传递给 SinkTask,该API会将这些records写入下游系统或缓存下来(攒批)以供以后写出。Connect 框架会定期调用 `#flush(Map) `方法以确保攒批的records确实被推送到下游系统。

一个SinkTask的生命周期如下:

  1. 初始化:SinkTask 的初始化会调用两个方法。首先调用 `#initialize(SinkTaskContext)` 方法准备 task 的上下文(context),然后再调用 `#start(Map)` 接收配置,启动处理数据时用到的所有服务。
  2. 分区指派:初始化后,Connect 会通过 `#open(Collection)` 方法为每个task指派一组分区。这组分区只属于这个Task,直到 他们被`#close(Collection)` 方法关闭。
  3. 数据处理:一旦为写数据而打开了分区,Connect 将开始通过 `#put(Collection)` API  向Task传递数据。Connect 会周期性的使用 `#flush(Map)` 方法要求 task刷写数据。
  4. 分区再平衡:有时候,Connect 需要改该任务的分区分配。发生这种情况时,当前分配的分区将使用` #close(Collection)` API 关闭,新分配使用 `#open(Collection)`打开。
  5. 关闭:当task需要关闭时,Connect会关闭活动的分区(如果有的话),并使用`#stop()` 关闭task。

除了上面提到的7个方法外,SinkTask中还有 `#preCommit`(预提交) 以及另外两个过期方法:#onPartitionsAssigned 以及 #onPartitionsRevoked 两个方法,这两个方法已经分别被 open以及close取代。

public abstract class SinkTask implements Task {

    // 消费哪些 topic
    public static final String TOPICS_ConFIG = "topics";

    //  主题正则,用于过滤主题
    public static final String TOPICS_REGEX_ConFIG = "topics.regex";

    protected SinkTaskContext context;

    // 初始化,接收SinkTaskContext
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }

    // 接收参数
    public abstract void start(Map props);

    // 处理数据
    public abstract void put(Collection records);

    // 刷写数据
    public void flush(Map currentOffsets) {
    }

    // commit之前的预提交
    public Map preCommit(Map currentOffsets) {
        flush(currentOffsets);
        return currentOffsets;
    }

    // 指派分区
    public void open(Collection partitions) {
        this.onPartitionsAssigned(partitions);
    }

    // 过期方法,作用同 open(Collection)
    @Deprecated
    public void onPartitionsAssigned(Collection partitions) {
    }

    // 关闭分区
    public void close(Collection partitions) {
        this.onPartitionsRevoked(partitions);
    }

    // 过期方法 作用同 close(Collection)
    @Deprecated
    public void onPartitionsRevoked(Collection partitions) {
    }

    // 关闭 task
    @Override
    public abstract void stop();
}
转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/280035.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【Confluent 源码学习 - SinkTask】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2