大规模数据处理

MapReduce的不足

需要的:

  1. 一种技术抽象让多步骤数据处理变得易于维护
  2. 不要复杂的配置,需要能自动进行性能优化
  3. 要能把数据处理的描述语言,与背后的运行引擎解耦合开来
  4. 要统一批处理和流处理的编程模型
  5. 要在架构层面提供异常处理和数据监控的能力

20221214142627

批处理与流处理

模式

Workflow

复制模式

flowchart
  数据集 --> 复制器
  复制器 --> 工作流1
  复制器 --> 工作流2
  复制器 --> 工作流3

过滤模式

flowchart
  数据集(1,2,3) --> 过滤器
  过滤器 --> 工作流(1,2)

分离模式

flowchart
  数据集(1,2,3) --> 分离器
  分离器 --> 工作流1(1,2)
  分离器 --> 工作流2(2,3)

合并模式

flowchart
  数据集1(1,2) --> 合并器
  数据集2(2,3) --> 合并器
  合并器 --> 工作流(1,2,3)

发布订阅

架构

Lambda

完整的数据集 = λ (实时数据) * λ (历史数据)

stateDiagram-v2
  数据 --> 批处理层
  数据 --> 速度层
  state 服务层 {
    批处理数据
    速度数据
  }
  速度层 --> 速度数据
  批处理层 --> 批处理数据

  应用 --> 批处理数据
  应用 --> 速度数据

Kappa

stateDiagram-v2
  state 速度层 {
    任务N
    任务N+1
  }
  数据 --> 速度层
  state 服务层 {
    数据N
    数据N+1
  }
  任务N --> 数据N
  任务N+1 --> 数据N+1
  应用 --> 数据N+1

Spark

Spark 比 MapReduce 快的原因:更为简单的 RDD 编程模型减少了作业调度次数,以及优先使用内存

各组件

  1. SparkContext 启动 DAGScheduler 构造执行的 DAG 图,拆分成计算任务
  2. Driver 向 Cluster Manager 请求计算资源,分配 Worker
  3. Worker 向 Driver 注册并下载代码执行

RDD

分区:同一个 RDD 包含的数据被存储在系统的不同节点中,需要读取时根据ID 和分区的 index 可以唯一确定对应数据块的编号,从而通过底层存储层的接口中提取到数据进行处理

不可变:一个 RDD 都是只读的,只可以对现有的 RDD 进行转换(Transformation)操作,得到新的 RDD 作为中间计算的结果

并行:由于上面两个特性,就可以并行对 RDD 进行操作

结构

SparkContext:所有 Spark 功能的入口,它代表了与 Spark 节点的连接,一个线程只有一个 SparkContext

SparkConf: 一些参数配置信息

Partitions:数据的逻辑结构,每个 Partition 会映射到某个节点内存或硬盘的一个数据块

Dependencies:每一步产生的 RDD 里都会存储它的依赖关系,即它是通过哪个 RDD 经过哪个转换操作得到的

窄依赖,父 RDD 的分区可以一一对应到子 RDD 的分区

宽依赖,父 RDD 的每个分区可以被多个子 RDD 的分区使用

窄依赖允许子 RDD 的每个分区可以被并行处理产生,而宽依赖则必须等父 RDD 的所有分区都被计算好之后才能开始处理

Checkpoint:对于一些计算过程比较耗时的 RDD,可以进行持久化,标记这个 RDD 有被检查点处理过,并且清空它的所有依赖关系,这样在进行崩溃恢复的时候就不用在向前向父 RDD 回溯

Storage Level:记录 RDD 持久化时的存储级别,内存或内存硬盘 或在分区节点上内存、内存硬盘

Iterator:迭代函数,Compute:计算函数 都是用来表示 RDD 怎样通过父 RDD 计算得到的

数据操作

大部分操作跟Stream差不多

Spark 的 Shuffle 操作跟 MapReduce 是一样的,其通过生产与消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换

Shuffle 中间文件

调度系统

  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet
  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构
  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源
  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task
  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

存储系统

SparkSQL

架构

DataSet

DataSet 所描述的数据都被组织到有名字的列中,就像关系型数据库中的表一样

20221218152850

DataFrame

可以被看作是一种特殊的 DataSet,但是它的每一列并不存储类型信息,所以在编译时并不能发现类型错误

SparkStreaming

Spark Streaming 用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输出的数据也是一块一块的,通过提供了一个对于流数据的抽象 DStream 来描述数据流,底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD

20221218153730

主要缺点是实时计算延迟较高,这是由于 Spark Streaming 不支持太小的批处理的时间间隔

StructuredStreaming

输入的数据流按照时间间隔(以一秒为例)划分成数据段。每一秒都会把新输入的数据添加到表中,Spark 也会每秒更新输出结果。输出结果也是表的形式,输出表可以写入硬盘或者 HDFS。

20221218154712

Structured Streaming 提供一个 level 更高的 API,这样的数据抽象可以让开发者用一套统一的方案去处理批处理和流处理

相比 SparkStreaming,StructuredStreaming可以支持更小的时间间隔,2.3 也引入了连续处理模式,同时也有对事件时间的支持

架构

20221219144426

核心模型

最核心的数据结构是 Stream,它代表一个运行在多个分区上的并行流

当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow:

2022121914389

程序天生是并行和分布式的。一个 Stream 可以包含多个分区(Stream Partitions),一个操作符可以被分成多个操作符子任务,每一个子任务是在不同的线程或者不同的机器节点中独立执行的:

20221219144240

Beam

一个适配流处理、批处理的中间层

编程模型

PCollection

特性:

Transform

stateDiagram-v2
  数据1 --> Transform
  数据2 --> Transform
  Transform --> 数据3
  Transform --> 数据4

常见的 Transform 接口:

Pipeline

stateDiagram-v2
  输入 --> PCollection1: Transform1
  PCollection1 --> PCollection2: Transform2
  PCollection2 --> PCollection3: Transform3
  PCollection3 --> 输出: Transform4

分布式环境下,整个数据流水线会启动 N 个 Workers 来同时处理 PCollection,在具体处理某一个特定 Transform 的时候,数据流水线会将这个 Transform 的输入数据集 PCollection 里面的元素分割成不同的 Bundle,将这些 Bundle 分发给不同的 Worker 来处理

在单个 Transfrom中,如果某一个 Bundle 里面的元素因为任意原因导致处理失败了,则这整个 Bundle 里的元素都必须重新处理

在多步骤的 Transform 上,如果处理的一个 Bundle 元素发生错误了,则这个元素所在的整个 Bundle 以及与这个 Bundle 有关联的所有 Bundle 都必须重新处理

IO

StreamingSQL

/* 窗口:最近10个温度的平均值 */
Select bid, avg(t) as T From BoilerStream WINDOW HOPPING (SIZE 10, ADVANCE BY 1);
/* join */
from TempStream[temp > 30.0]#window.time(1 min) as T
  join RegulatorStream[isOn == false]#window.length(1) as R
  on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream; // Siddhi Streaming SQL
/*  某个模式有没有在特定的时间段内发生 */
from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ]
    within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;