xingbofeng.github.io icon indicating copy to clipboard operation
xingbofeng.github.io copied to clipboard

批处理和流处理

Open xingbofeng opened this issue 3 months ago • 0 comments

批处理模式

批处理是一种离线的数据处理方式,它将一批数据(通常是一组数据记录)作为输入,并在特定的时间或条件下进行处理。批处理通常用于对日志等数据进行分析、转换和汇总,以生成报告、进行数据挖掘、进行机器学习等。

在批处理中,数据被分为离散的块,批处理作业会按照一定的顺序和步骤对这些数据进行处理。这批处理作业可以定期运行,也可以在特定的触发条件下执行。批处理通常要求数据存储在一个集中的位置,以便进行批量处理。

传统的数据融合通常基于批模式。在批的模式下,我们会通过一些周期性运行的JOB,将数据从关系型数据库、文件存储向下游的目标数据库进行同步,中间可能有各种类型的转换。批处理通常用于处理大规模离线任务。“大规模”体现在:每次处理输入的数据量大;每次处理运行的时间长(可能几分钟~几天)。

MapReduce模式

常见的批处理模式是MapReduce模式,其主要思想是自动将一个大的计算拆解成 Map 和 Reduce ,MapReduce模式并行的读写分区,然后执行作业,再聚合。提供了两个纯函数,Map、Reduce。因为纯函数,保证了无副作用,所有流程和架构清晰,并且容易重试。如图所示:

MapReduce 可以在多台机器上并行执行计算,而无需编写代码来显示处理并行问题。Mapper 和 Reducer 一次只能处理一条记录;它们不需要知道它们的输入来自哪里,或者输出去往什么地方,所以框架可以处理在机器之间移动数据的复杂性:

Spark框架

Map 和 Reduce 中间夹杂着一步数据移动,也就是 shuffle。

Shuffle是MapReduce中的一个重要步骤,它是指将 Map 阶段输出的数据按照某种规则重新分配到 Reduce 阶段的各个节点上,以便 Reduce 节点能够对相同的key进行聚合操作。

具体来说,Shuffle过程包括三个步骤:

  • Map端的Partition:将Map输出的key-value对根据key进行分区,每个分区对应一个reduce任务。

  • Shuffle的数据传输:将Map端分区后的数据按照分区规则发送到对应的Reduce节点上。

  • Reduce端的Merge:将同一个key的数据进行合并,以便Reduce节点进行聚合操作。

Shuffle过程是MapReduce中非常耗费时间和网络带宽的一个步骤,所以优化Shuffle过程对于提高MapReduce的性能非常重要。常见的Shuffle优化方法包括增加Map端的本地聚合、增加Reduce端的并行度、使用压缩和序列化等。由于 MapReduce 的框架限制,一个 MapReduce 任务只能包含一次 Map 和一次 Reduce,计算完成之后,MapReduce 会将运算结果写回到磁盘中(更准确地说是分布式存储系统)供下次计算使用。如果所做的运算涉及大量循环,那么整个计算过程会不断重复地往磁盘里读写中间结果。这样的读写数据会引起大量的网络传输以及磁盘读写,极其耗时,而且它们都是没什么实际价值的废操作。因为上一次循环的结果会立马被下一次使用,完全没必要将其写入磁盘。

Spark的内存计算模型还支持数据的高速缓存和重复使用,这也有助于提高计算效率和速度。同时,Spark还采用了一种基于DAG的执行引擎,可以在数据处理过程中自动优化计算流程,提高计算效率和速度。Spark 延续了MapReduce 的设计思路:对数据的计算也分为 Map 和Reduce 两类。但不同的是,一个Spark 任务并不止包含一个 Map 和一个Reduce,而是由一系列的Map、Reduce构成。这样,计算的中间结果可以高效地转给下一个计算步骤,提高算法性能。虽然 Spark 的改进看似很小,但实验结果显示,它的算法性能相比MapReduce 提高了很多。

流处理模式

流处理是一种实时的数据处理方式,它将数据流作为输入,并在数据流中不断地进行处理和分析。流处理通常用于对实时数据进行分析、监控和决策,以便快速响应业务需求。

在流处理中,数据是连续不断地产生和处理的,而不是像批处理那样一次性处理一批数据。流处理系统通常需要实时处理数据,因此需要快速响应和高效处理数据。流处理通常要求数据存储在分布式系统中,以便进行实时处理和分析。流处理系统可以处理无限量的数据。显然,同批处理一样,在流处理过程中,也都需要维持中间状态。

流处理系统通常采用事件驱动的方式进行处理,即当新的事件到达时,系统会立即对其进行处理并产生相应的输出。流处理系统通常支持窗口化处理,即将数据流分割为固定大小的窗口,并对每个窗口内的数据进行处理和分析。

与批处理相比,流处理具有更低的延迟和更高的实时性,能够更快地响应业务需求。

流分析一般需要在一个时间窗口内做聚合分析,例如一段时间内的计算,平均值。一般窗口类型可分为以下三类。

  • 滚动窗口:滚动窗口下窗口之间不重叠,且窗口长度是固定的。
  • 滑动窗口:滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。
  • 会话窗口:没有固定的持续时间,将一组用户在时间上紧密相关的所有事件分组在一起。一旦用户在一段时间内处于非活动状态,窗口结束。

举个例子:

  • 可以使用滑动窗口来计算每半小时的用户购买某件商品的总次数。假设当前时间是10:00,滑动窗口的大小为30分钟,步长为5分钟。则可以设置第一个窗口为10:00-10:30,第二个窗口为10:06-10:35,以此类推。在每个窗口内,可以将用户购买行为按照时间顺序进行排序,并计算购买某件商品的总次数。然后,在每个窗口结束时,可以将统计结果输出到报表中,供业务人员进行分析和决策。

  • 可以使用会话窗口来对每个用户的购买行为进行分析。在会话窗口中,可以将同一个用户在一段时间内的所有购买记录分组在一起进行分析。例如,可以将同一个用户在30分钟内的所有购买记录分组在一起,计算总浏览量。然后,在每个会话结束时,可以将统计结果输出到报表中,供业务人员进行分析和决策。

Flink

Flink是Apache软件基金会的一个顶级项目,是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,并且可以同时支持实时计算和批量计算。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个(sink)结束。

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。

Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据中。

Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

Flink 算子之间可以通过一对一(直传)模式或重新分发模式传输数据,如图所示:

一对一模式(例如上图中的 Source 和 map() 算子之间)可以保留元素的分区和顺序信息。这意味着 map() 算子的 subtask 输入的数据以及其顺序与 Source 算子的 subtask 输出的数据和顺序完全相同,即同一分区的数据只会进入到下游算子的同一分区。

重新分发模式(例如上图中的 map() 和 keyBy/window 之间,以及 keyBy/window 和 Sink 之间)则会更改数据所在的流分区。当你在程序中选择使用不同的 transformation,每个算子子任务也会根据不同的 transformation 将数据发送到不同的目标子任务。例如以下这几种 transformation 和其对应分发数据的模式:keyBy()(通过散列键重新分区)、broadcast()(广播)或 rebalance()(随机重新分发)。在重新分发数据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,keyBy/window 的 subtask 接收到的 map() 的 subtask 中的元素都是有序的)。因此,上图所示的 keyBy/window 和 Sink 算子之间数据的重新分发时,不同键(key)的聚合结果到达 Sink 的顺序是不确定的。

Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。

Flink的可靠性保证

Flink通过Checkpoint机制来保证消息的可靠性和一致性。Checkpoint机制是Flink中的一种容错机制,用于在任务执行过程中周期性地保存任务状态,并在任务发生故障或异常时恢复任务状态。通过Checkpoint机制,Flink可以保证消息的可靠性和一致性,从而避免数据丢失和处理不准确的问题。

状态(State):状态是对应的 操作者 严格在 Checkpoint 之前的所有事件,并且不包含在此 Checkpoint 后的任何事件后而生成的状态。

Flink通过Checkpoint机制实现消息的可靠性和一致性的步骤如下:

  • 触发Checkpoint:Flink会周期性地触发Checkpoint操作,将任务的状态保存到持久化存储介质中。Checkpoint的触发间隔可以通过配置文件进行设置。

  • 状态的保存:Checkpoint操作会将任务的状态保存到持久化存储介质中,如HDFS、S3、NFS等。保存的状态包括任务的算子状态、任务的输入/输出状态和任务的元数据信息等。

  • 状态的一致性:Checkpoint机制保证在同一时间只能有一个Checkpoint操作在进行,从而保证Checkpoint的一致性。当一个Checkpoint操作正在进行时,其他的Checkpoint操作会被阻塞,直到当前Checkpoint操作完成。

  • 状态的恢复:当任务发生故障或异常时,可以根据最近一次成功的Checkpoint来恢复任务状态,并从该状态继续处理数据。Flink提供了多种恢复策略,如从最近的Checkpoint开始恢复、从最早的Checkpoint开始恢复、从指定的Checkpoint开始恢复等。

需要注意的是,Checkpoint机制需要消耗一定的计算和存储资源,因此需要根据具体场景合理设置Checkpoint的触发间隔和保存位置。同时,Checkpoint机制也需要考虑到任务的性能和可靠性,以确保任务的高效执行和数据处理的准确性。

Flink的 Checkpoint机制确保了恰好一次(Exactly once):事件既不会丢失也不会被重复传递。但这个Exactly once 只能保证Flink内部自身,对于Flink和外界的数据传输的可靠性,如Kafka、HDFS等,要保证其数据可靠性,需要另行设置:

最多一次(NONE):事件可能会丢失但不会被重复传递 至少一次(AT_LEAST_ONCE):事件不会丢失但可能会被重复传递 恰好一次(EXACTLY_ONCE):事件既不会丢失也不会被重复传递

  • Flink内部:Flink 内部可以通过检查点机制保证状态和处理结果的 Exactly once 语义
  • 输入端:输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)
  • 输出端:
    • 两阶段(2PC)提交,开启DeliveryGuarantee.EXACTLY_ONCE
    • 事务超时配置:Flink配置的事务超时时间 transaction.timeout.ms 默认是1小时,而 Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

示例

以下是一个从源Kafka读取数据,并经过一次Map处理后写入目标Kafka的示例程序。程序的主要逻辑如下:

  1. 创建StreamExecutionEnvironment对象,设置Checkpoint的间隔时间和模式,并设置Checkpoint的存储位置和删除策略。

  2. 使用KafkaSource读取Kafka数据,并通过map操作对数据进行处理。

  3. 使用KafkaSink将处理后的数据写出到Kafka中。在写入Kafka时,设置了精确一次(exactly-once)的数据可靠性保证,通过开启2PC(two-phase-commit)来实现数据的精确一次写入。具体来说,设置了以下参数:

  • producerConfig.setProperty("transaction.max.timeout.ms", Integer.toString(10 * 60 * 1000)):设置Kafka事务的超时时间为10分钟。
  • setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE):设置数据的可靠性保证为精确一次。
  • setTransactionalIdPrefix("th-data-trans"):设置事务前缀为th-data-trans。

需要注意的是,为了实现精确一次的数据可靠性保证,需要同时满足以下条件:

  • 开启Checkpoint机制,保证数据的一致性和可靠性。
  • 设置KafkaSink的数据可靠性保证为精确一次,并开启2PC。
  • 设置事务前缀和事务超时时间,保证事务的唯一性和超时处理。
package app;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class THDataTransFlink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置 Checkpoint 点,而且保证恰好一次,保证数据可靠性
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 设置 Checkpoint storage 的位置
        checkpointConfig.setCheckpointStorage("/usr/local/flink_check_point/");
        // 在任务取消时保留已经完成的Checkpoint,不进行删除
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 读取kafka数据
        KafkaSource<Req> source = KafkaSource
                .<Req>builder()
                .setBootstrapServers("source.example.com:9092")
                .setProperty("enable.auto.commit", "true")
                .setProperty("auto.commit.interval.ms", "1000")
                .setTopics("topic")
                .setGroupId("kafka_group_id")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new ReqDeserializationSchema())
                .build();

        // 一次map
        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "topic")
                .map(new KafkaProcess());

        // 写出到Kafka
        // 精准一次写入Kafka,必须满足以下条件:
        // 1、开启Checkpoint
        // 2、sink 保证级别是 精准一次
        // 3、sink 设置事务前缀
        // 4、sink 设置事务超时时间:Checkpoint 间隔 < 事务超时时间
        KafkaRecordSerializationSchema<String> recordSerializer = KafkaRecordSerializationSchema
                .builder()
                .setTopic("topic_sink")
                .setValueSerializationSchema(new SimpleStringSchema()).build();
        Properties producerConfig = new Properties();
        // 设置kafka事务超时时间
        producerConfig.setProperty("transaction.max.timeout.ms", Integer.toString(10 * 60 * 1000));
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setKafkaProducerConfig(producerConfig)
                .setBootstrapServers("target.example.com:9092")
                .setRecordSerializer(recordSerializer)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 设置数据可靠性保证恰好一次,开启2PC
                .setTransactionalIdPrefix("th-data-trans") // 设置事务前置
                .build();

        stream.sinkTo(sink);

        env.execute("task");
    }
}

总结

本文介绍了批处理和流处理模式的概念,通过 批处理的 Map Reduce 模式引入,到 Spark 的简单介绍,Spark由于使用分布式内存系统维护 Map Reduce 的中间状态,提升批处理的性能。之后通过流处理的概念引入,并简单介绍了Flink的概念及其如何保证其数据可靠性。最后通过一个Kafka2Kafka的demo演示了如何通过流的方式来处理数据。

参考: Apache Flink 《数据密集型应用系统设计》 实时流计算框架Flink 批处理计算与流处理计算的区别是什么?

xingbofeng avatar Mar 11 '24 11:03 xingbofeng