SinkTask 的源码实际位于Kafka项目中,主要用在 Kafka Connect 模块,它是一个接收 Kafka 数据,输出到外部系统的 Task 抽象类。其父类 Task 是个接口,只有三个方法:
public interface Task { String version(); void start(Mapprops); void stop(); }
简单明了,下面我们具体看下 SinkTask。
SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the {@link #put(Collection)} API, which should either write them to the downstream system or batch them for later writing. Periodically, Connect will call {@link #flush(Map)} to ensure that batched records are actually pushed to the downstream system.
SinkTask是一个任务,它接收从Kafka加载的records,并将它们发送到另一个系统。Connect框架为每个 Task 实例分配一组分区,并将处理从这些分区收到的所有records。当从Kafka获取records时,它们将使用 `#put(Collection) ` API 把记录传递给 SinkTask,该API会将这些records写入下游系统或缓存下来(攒批)以供以后写出。Connect 框架会定期调用 `#flush(Map) `方法以确保攒批的records确实被推送到下游系统。
一个SinkTask的生命周期如下:
- 初始化:SinkTask 的初始化会调用两个方法。首先调用 `#initialize(SinkTaskContext)` 方法准备 task 的上下文(context),然后再调用 `#start(Map)` 接收配置,启动处理数据时用到的所有服务。
- 分区指派:初始化后,Connect 会通过 `#open(Collection)` 方法为每个task指派一组分区。这组分区只属于这个Task,直到 他们被`#close(Collection)` 方法关闭。
- 数据处理:一旦为写数据而打开了分区,Connect 将开始通过 `#put(Collection)` API 向Task传递数据。Connect 会周期性的使用 `#flush(Map)` 方法要求 task刷写数据。
- 分区再平衡:有时候,Connect 需要改该任务的分区分配。发生这种情况时,当前分配的分区将使用` #close(Collection)` API 关闭,新分配使用 `#open(Collection)`打开。
- 关闭:当task需要关闭时,Connect会关闭活动的分区(如果有的话),并使用`#stop()` 关闭task。
除了上面提到的7个方法外,SinkTask中还有 `#preCommit`(预提交) 以及另外两个过期方法:#onPartitionsAssigned 以及 #onPartitionsRevoked 两个方法,这两个方法已经分别被 open以及close取代。
public abstract class SinkTask implements Task { // 消费哪些 topic public static final String TOPICS_ConFIG = "topics"; // 主题正则,用于过滤主题 public static final String TOPICS_REGEX_ConFIG = "topics.regex"; protected SinkTaskContext context; // 初始化,接收SinkTaskContext public void initialize(SinkTaskContext context) { this.context = context; } // 接收参数 public abstract void start(Mapprops); // 处理数据 public abstract void put(Collection records); // 刷写数据 public void flush(Map currentOffsets) { } // commit之前的预提交 public Map preCommit(Map currentOffsets) { flush(currentOffsets); return currentOffsets; } // 指派分区 public void open(Collection partitions) { this.onPartitionsAssigned(partitions); } // 过期方法,作用同 open(Collection) @Deprecated public void onPartitionsAssigned(Collection partitions) { } // 关闭分区 public void close(Collection partitions) { this.onPartitionsRevoked(partitions); } // 过期方法 作用同 close(Collection) @Deprecated public void onPartitionsRevoked(Collection partitions) { } // 关闭 task @Override public abstract void stop(); }