批处理

《设计数据密集型应用》一书中的第三章第一小节告诉我们:不是所有服务都是在线系统的方式存在的,也会存在离线系统这一概念。即一个批处理系统,执行很多个Job来完成一些任务。这些任务通常会有大量的输入数据,根据不同的业务Job对数据进行处理,最终它会输出一些数据。这个过程往往需要一段时间,无需用户等待这些Job完成。

在介绍批处理系统前,开篇先谈论到Unix工具的批处理逻辑,继而引出 Unix 设计思想。

Unix 管道的发明者道格・麦克罗伊(Doug McIlroy)在 1964 年首先描述了这种情况:“我们需要一种类似园艺胶管的方式来拼接程序 —— 当我们需要将消息从一个程序传递另一个程序时,直接接上去就行。I/O 应该也按照这种方式进行 ”。水管的类比仍然在生效,通过管道连接程序的想法成为了现在被称为 Unix 哲学 的一部分 —— 这一组设计原则在 Unix 用户与开发者之间流行起来,该哲学在 1978 年表述如下:

  1. 让每个程序都做好一件事。要做一件新的工作,写一个新程序,而不是通过添加 “功能” 让老程序复杂化。
  2. 期待每个程序的输出成为另一个程序的输入。不要将无关信息混入输出。避免使用严格的列数据或二进制输入格式。不要坚持交互式输入。
  3. 设计和构建软件时,即使是操作系统,也让它们能够尽早地被试用,最好在几周内完成。不要犹豫,扔掉笨拙的部分,重建它们。
  4. 优先使用工具来减轻编程任务,即使必须曲线救国编写工具,且在用完后很可能要扔掉大部分。

一般情况下,如果你希望一个程序的输出能够成为另一个程序的输入,那意味着这些程序必须使用相同的数据格式——或者是,一个兼容的接口。如果你希望能够将任何程序的输出连接到任何程序的输入,那意味着所有程序必须使用相同的 I/O 接口。

Linux下一切皆文件这句话也继承了Unix的哲学,在Unix中上述兼容的接口是一个文件,更准确来说是一个文件描述符。文件指示一串有序的字节序列。但是可以使用相同的接口来表示更多不同的东西:文件系统上的真实文件,到另一个进程的通信通道(Unix套接字等),驱动程序等。

大部分Unix程序会将这个字节序列(这个接口,这个文件)视为ASCII文本。但是不是所有的。

几十年后重新审视这个设计,尽管它还不够完美,但是已经足够出色了。即使具有相同数据模型的数据库,将数据从一种数据库导出再导入到另一种数据库也并不容易。

Unix 工具的另一个特点是使用标准输入和标准输出。Unix 方法在程序不关心输入是从哪里而来,也不关心输出到哪里。工程师们可以将输入/输出与程序逻辑分开,使用各个小工具组成更大的系统。因为只要实现了接口,你可以自由编写程序将它们和操作系统提供的工具组合在一起。

MapReduce 和分布式文件系统

MapReduce 像一个Unix工具,但是它分布在数千台机器上。作者评论它:相当的简单粗暴,但是令人惊异地管用。一个MapReduce作业和一个Unix进程类比:它接受一个或多个输入,产生一个或多个输出。

MapReduce 和 Unix 的区别在于,它在分布式文件系统上读写文件。

MapReduce 本质上是一个编程框架,我们可以使用它编写代码来处理分布式文件系统中的大型数据集。它有四个步骤:

1.读取一组输入文件,并将其分解成记录。(类似于服务器日志,一行日志就是一条记录)

2.调用Mapper函数,从每条输入记录中提取一对键值。(Mapper 函数可以由工程师们根据业务需求自由编写,例如你可以取日志中的某个信息当作键,并将值留空)

3.按键排序所有的键值对。

4.调用Reducer函数遍历排序后的键值对。如果同一个键出现多次,排列使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多状态。

有经验的工程师看到上述四个步骤后很快就能明白,MapperReducer函数是编写自定义数据处理代码的地方。Mapper 的作用就是从每条记录中提取键值,然后将它们放入到一个适合排序的表单中,同时它并不保存输入记录到下一个记录中的任何状态,可以说每条记录它都是独立处理的。而 Reducer的作用就是处理已排序的数据。

MapReduce中的分布式执行

MapReduce 可以在多台机器上并行执行计算,而无需编写代码来显式处理并行问题。本身Mapper 和Reducer 一次只能处理一条记录,它们也不在乎输入源和输出到什么地方,它本质就是一个无状态处理数据的工具,所以框架可以处理在机器之间移动数据的复杂性。

在分布式计算中,并行化基于分区,这意味着分布式批处理中的输出文件或文件快被认为是一个单独的分区。MapReduce 调度器会在其中一台存储输入文件副本的机器上运行每个Mapper。这个行为被称为计算放在数据附近,其实是为了节约网络复制输入文件的开销,减少网络复制并增加局部性。

计算的Reduce端也会被分区。虽然 Map 任务的数量由输入文件块的数量决定,但 Reducer 的任务的数量是由作业作者配置的。为了确保具有相同键的所有键值对最终落在相同的 Reducer 处,框架使用键的散列值来确定哪个 Reduce 任务应该接收到特定的键值对。

当Mapper读取输入文件,并写完排序后的输出文件,MapReduce调度器会通知Reducer获取输出文件。然后Reducer会按分区,排序,从 Mapper 向 Reducer 复制分区数据,这一整个过程被称为 混洗(shuffle)。然后将从Mapper获取的文件合并在一起,并保留有序特性,如果不同的Mapper生成了相同的键,则将它们相邻。

MapReduce工作流

单个MapReduce作业解决的问题范围很有限,所以在实际实际使用中将MapReduce作业链接为**工作流(workflow)**,一个作业的输出变为下一个作业的输入。Hadoop MapReduce 框架没有工作流的配置或设置,这个工作流的链是通过目录名隐式实现的。

只有当作业成功完成后,批处理作业的输出才会被视为有效的。这意味着作业之间存在依赖,有很多针对Hadoop的工作流调度器被开发出来。这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。

连接是如何实现的?

对于批处理,连接指的是在数据集中解析某种关联的全量存在。

实现某个业务需求的连接最简单的方法是:遍历事件,然后为其中的ID在远程服务器上查询数据库。但是它性能可能非常差,处理吞吐量将受限于数据库服务器的往返时间,本地缓存的有效性很大程度上取决于数据的分布,并行运行大量查询可能会轻易压垮数据库。

为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)限于单台机器上进行。为待处理的每条记录发起随机访问的网络请求实在是太慢了。而且,查询远程数据库意味着批处理作业变为 非确定的(nondeterministic),因为远程数据库中的数据可能会改变。

更好的办法是直接获取连接的目标数据库副本,然后全部放入同一个分布式文件系统中,比如你可以将用户数据库存储在一组文件中,而目标数据库存储在另一组文件中,并用MapReduce将所有相关记录集中在同一个地方进行高效处理。

Mapper会从每个输入记录中提取键值,通过框架对输出进行分区,然后对键值对进行排序,相同Key键在Reducer 输入中彼此相邻。 Map-Reduce 作业甚至可以也让这些记录排序,然后按照时间戳排序。这被称为二次排序

Reducer 可以很容易执行实际的连接逻辑。它一次处理一个记录,所以只需要将记录保存在内存中就无需网络请求。这个算法被称为排序合并连接,因为 Mapper 的输出是按键排序的,然后 Reducer 将来自连接两侧的有序记录列表合并在一起。

Mapper和排序过程保证数据都必须放在同一个地方。使用MapReduce编程模型,能将计算的物理网络通信层面从应用逻辑中剥离出来。这种分离与数据库的用法形成了鲜明对比。Mapper处理了所有的网络通信,所以它也不用考虑一部分的分布式故障:例如另一个节点崩溃之类的。

分组的实现

除了连接,将相关数据放在一起的另外一种模式就是:分组。比如SQL中的Group By 按照某个键进行分组。

使用 MapReduce 实现这种分组操作的最简单方法是设置 Mapper,以便它们生成的键值对使用所需的分组键。然后分区和排序过程将所有具有相同分区键的记录导向同一个 Reducer。因此在 MapReduce 之上实现分组和连接看上去非常相似。

分组还有一个常见用途是整理特定用户会话的所有活动事件,用于找出用户进行的一系列操作。

如果你有多个 Web 服务器处理用户请求,则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话 cookie,用户 ID 或类似的标识符作为分组键,以将特定用户的所有活动事件放在一起来实现会话化,与此同时,不同用户的事件仍然散布在不同的分区中。

处理偏斜

在现实生活中会存在与单个键关联的大量数据,比如社交平台中“关注”某个用户,这个用户往往存在大量的追随者。这种情况下“相同键的所有记录放在相同的位置”这种模式就被破坏了。这种不成比例的活动数据库记录被称为关键对象或热键

单个Reducer中收集追随者众多的键可能导致严重的偏斜,这代表某个Reducer必须比其他Reducer处理更多的记录。由于 MapReduce 作业只有在所有 Mapper 和 Reducer 都完成时才完成,所有后续作业必须等待最慢的 Reducer 才能启动。

如果连续的输入存在热键,可以使用算法进行补偿。其中一种处理方法:

1.运行一个抽样作业判断哪些键是热键

2.连接实际执行时,Mapper 会将热键的关联记录 随机发送到几个 Reducer 之一

3.对于另外一侧的连接输入,与热键相关的记录需要被复制到 所有 处理该键的 Reducer 上

这种方式将处理热键的工作分散到多个Reducer上,这样就可以更好地并行化,代价是需要将连接另一侧的输入记录复制到多个 Reducer 上。 Crunch 中的 分片连接(sharded join) 方法与之类似,但需要显式指定热键而不是使用抽样作业。

Map侧连接

连接算法在 Reducer 中执行实际的连接逻辑,因此被称为 Reduce 侧连接。Mapper 扮演着预处理输入数据的角色:从每个输入记录中提取键值,将键值对分配给 Reducer 分区,并按键排序。

Reduce 侧方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper 都可以对其预处理以备连接。然而不利的一面是,排序,复制至 Reducer,以及合并 Reducer 输入,所有这些操作可能开销巨大。当数据通过 MapReduce 阶段时,数据可能需要落盘好几次,取决于可用的内存缓冲区。

但是如果能对输入数据作出某种假设,则通过使用所谓的 Map 侧连接来加快连接速度是可行的。这种方法使用了一个裁减掉 Reducer 与排序的 MapReduce 作业,每个 Mapper 只是简单地从分布式文件系统中读取一个输入文件块,然后将输出文件写入文件系统,仅此而已。

广播散列连接

适用于执行 Map 端连接的最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小,以便可以将其全部加载到每个 Mapper 的内存中。

如果用户数据库小到足以放进内存中,当Mapper启动时,它可以将用户数据库从分布式文件系统读取到内存中的散列表。Mapper 可以扫描用户活动事件,然后简单在散列表中查找每个事件的用户ID。

那么什么是广播散列连接

每个连接较大输入端分区的 Mapper 都会将较小输入端数据集整个读入内存中,这叫做广播。而散列意味着使用散列表。

另一种方法是将较小输入存储在本地磁盘上的只读索引中。索引中经常使用的部分将保留在操作系统的页面缓存中,因而这种方法可以提供与内存散列表几乎一样快的随机查找性能,但实际上并不需要数据集能放入内存中。

分区情况下

如果 Map 侧连接的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。

如果分区正确无误,可以确定的是,所有你可能需要连接的记录都落在同一个编号的分区中。因此每个 Mapper 只需要从输入两端各读取一个分区就足够了。好处是每个 Mapper 都可以在内存散列表中少放点数据。

这种方法只有当连接两端输入有相同的分区数,且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的 MapReduce 作业生成的,那么这可能是一个合理的假设。

分区散列连接在 Hive 中称为 Map 侧桶连接(bucketed map joins)

有一些方案属于Map侧连接的变体,比如输入的数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序。在这种情况下,数据集能不能放在内存中反倒不重要了因为这个情况下Mapper同样可以执行并归操作(按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对)。

如果能进行Map侧合并连接,这意味着前一个MapReduce作业可能一开始就把输入数据做了分区并排序。

当下游作业使用 MapReduce 连接的输出时,选择 Map 侧连接或 Reduce 侧连接会影响输出的结构。

在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。

批处理的输出

批处理的输出通常不是报表,而是一些其他类型的结构。

比如建立搜索索引,谷歌最开始就是这样做的,使用MapReduce 为搜索引擎建立索引,由多个MapReduce作业组成的工作流。时至今日,使用MapReduce依旧是构建索引的优选方案。

但是要注意,索引文件一旦创建就是不可变的。如果索引的文档集合发生改变,这代表我们要定期重跑整个索引工作流,然后批量替换以前的索引文件。如果只有少量索引文件需要更改,这种方案就会让计算成本变得很高,且频繁。

在上述基础上,可以使用增量建立索引,如果要在索引中添加,删除或更新文档,Lucene 会写新的段文件,并在后台异步合并压缩段文件。

总而言之,批处理的输出应该如何回到应用可以查询的数据库中呢?

1.直接在Mapper或Reducer中直接写入数据库服务器,但是会造成几个问题:

  • 性能差:为每条记录,发起数据库网络请求要比正常处理任务要慢得多;
  • 并行处理任务时,数量级太大可能会压垮数据库;
  • 工作流任务可能存在执行失败,这需要工程师自己维护后续造成的影响

2.创建一个新的数据库,将这个数据库作为文件写入分布式文件系统中作业的输出目录。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。不少键值存储都支持在 MapReduce 作业中构建数据库文件。

程序读取输入并写入输出。在这一过程中,输入保持不变,任何先前的输出都被新输出完全替换,且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,略做改动或进行调试,而不会搅乱系统的状态。

上述设计哲学在MapReduce作用输出的时候也同样体现:

1.作业可以随时重新运行,根据输出结果来矫正代码。但是要注意,如果部署了数据库,回滚代码也无法修复数据库中的数据。

2.由于回滚很容易,比起在错误意味着不可挽回的伤害的环境,功能开发进展能快很多。这种 最小化不可逆性(minimizing irreversibility) 的原则有利于敏捷软件开发

3.如果 Map 或 Reduce 任务失败,MapReduce 框架将自动重新调度,并在同样的输入上再次运行它。如果失败是由代码中的错误造成的,那么它会不断崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于临时问题导致的,那么故障就会被容忍。因为输入不可变,这种自动重试是安全的,而失败任务的输出会被 MapReduce 框架丢弃。

4.同一组文件可用作各种不同作业的输入,包括计算指标的监控作业并且评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异) 。

5.与 Unix 工具类似,MapReduce 作业将逻辑与布线(配置输入和输出目录)分离,这使得关注点分离,可以重用代码:一个团队可以专注实现一个做好一件事的作业;而其他团队可以决定何时何地运行这项作业。

Hadoop与分布式数据库的对比

数据库专注在一组机器上并行执行SQL查询,而 MapReduce 和分布式文件系统的组合则更像是一个可以运行任意程序的通用操作系统。

在存储方面

数据库会限制数据库模型(比如关系或文档)来构建数据,而分布式文件系统中的文件只是字节序列

将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨越以前相互分离的数据集进行连接。 MPP 数据库所要求的谨慎模式设计拖慢了集中式数据收集速度;以原始形式收集数据,稍后再操心模式的设计,能使数据收集速度加快(有时被称为 “数据湖(data lake)” 或 “企业数据中心(enterprise data hub)”)

不加区分的数据转储转移了解释数据的负担:数据集的生产者不再需要强制将其转化为标准格式,数据的解释成为消费者的问题。

如果生产者和消费者是不同优先级的不同团队,这可能是一种优势。甚至可能不存在一个理想的数据模型,对于不同目的有不同的合适视角。以原始形式简单地转储数据,可以允许多种这样的转换。这种方法被称为 寿司原则(sushi principle):“原始数据更好”。

处理模型时的多样性:

数据库需要根据不同领域来进行数据模型的更替,比如推荐系统的模型或机器学习,同时又不是所有类型的处理都可以合理表达为SQL查询。这些类型的处理通常是特别针对特定应用的,所以必须要编写代码来处理,而不仅仅是查询这一项。

MapReduce 使工程师能够轻松地在大型数据集上运行自己的代码。如果你有 HDFS 和 MapReduce,那么你 可以 在它之上建立一个 SQL 查询执行引擎,事实上这正是 Hive 项目所做的。但是,你也可以编写许多其他形式的批处理,这些批处理不必非要用 SQL 查询表示。

随后,人们发现 MapReduce 对于某些类型的处理而言局限性很大,表现很差,因此在 Hadoop 之上其他各种处理模型也被开发出来(我们将在 “MapReduce 之后” 中看到其中一些)。只有两种处理模型,SQL 和 MapReduce,还不够,需要更多不同的模型!而且由于 Hadoop 平台的开放性,实施一整套方法是可行的,而这在单体 MPP 数据库的范畴内是不可能的。

至关重要的是,这些不同的处理模型都可以在共享的单个机器集群上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在 Hadoop 方式中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个集群内不同的工作负载。不需要移动数据,使得从数据中挖掘价值变得容易得多,也使采用新的处理模型容易的多。

Hadoop 生态系统包括随机访问的 OLTP 数据库,如 HBase(请参阅 “SSTables 和 LSM 树”)和 MPP 风格的分析型数据库,如 Impala 。 HBase 与 Impala 都不使用 MapReduce,但都使用 HDFS 进行存储。它们是迥异的数据访问与处理方法,但是它们可以共存,并被集成到同一个系统中。

处理故障:

批处理系统可能对处理故障不太敏感,因为就算失败了也不会影响某个用户,只需要再次执行一遍即可。

但是大部分数据库会中止,并且选择让用户来判断是否要重新提交执行,或通过预设自动重新运行它。由于查询通常最多运行几秒钟或几分钟,所以这种错误处理的方法是可以接受的,因为重试的代价不是太大。 MPP 数据库还倾向于在内存中保留尽可能多的数据以避免从磁盘读取的开销。

MapReduce 可以容忍单个 Map 或 Reduce 任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。它也会非常急切地将数据写入磁盘,一方面是为了容错,另一部分是因为假设数据集太大而不能适应内存。

MapReduce 方式更适用于较大的作业:要处理如此之多的数据并运行很长时间的作业,以至于在此过程中很可能至少遇到一个任务故障。在这种情况下,由于单个任务失败而重新运行整个作业将是非常浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销,但如果任务失败率足够高,这仍然是一种合理的权衡。

MapReduce 只是分布式系统的许多可能的编程模型之一。

物化中间状态

但在很多情况下,你知道一个作业的输出只能用作另一个作业的输入,这些作业由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的 中间状态(intermediate state):一种将数据从一个作业传递到下一个作业的方式。在一个用于构建推荐系统的,由 50 或 100 个 MapReduce 作业组成的复杂工作流中,存在着很多这样的中间状态。

将这个中间状态写入文件的过程称为 物化(materialization)

MapReduce 完全物化中间状态的方法存在不足之处:

  • MapReduce 作业只有在前驱作业(生成其输入)中的所有任务都完成时才能启动,而由 Unix 管道连接的进程会同时启动,输出一旦生成就会被消费。不同机器上的数据偏斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成。必须等待至前驱作业的所有任务完成,拖慢了整个工作流程的执行。
  • Mapper 通常是多余的:它们仅仅是读取刚刚由 Reducer 写入的同样文件,为下一个阶段的分区和排序做准备。在许多情况下,Mapper 代码可能是前驱 Reducer 的一部分:如果 Reducer 和 Mapper 的输出有着相同的分区与排序方式,那么 Reducer 就可以直接串在一起,而不用与 Mapper 相互交织。
  • 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,对这些临时数据这么搞就比较过分了。

为了解决上述问题,几种用于分布式批处理的新执行引擎在设计时会有一个共同点:将整个工作流作为单个作业来处理,而不是把它分解为独立的子作业。

由于它们将工作流显式建模为数据从几个处理阶段穿过,所以这些系统被称为 数据流引擎(dataflow engines)。像 MapReduce 一样,它们在一条线上通过反复调用用户定义的函数来一次处理一条记录,它们通过输入分区来并行化载荷,它们通过网络将一个函数的输出复制到另一个函数的输入。

与 MapReduce 不同,这些函数不需要严格扮演交织的 Map 与 Reduce 的角色,而是可以以更灵活的方式进行组合。我们称这些函数为 算子(operators),数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入。

1.对记录按照键重新分区并排序,用于实现排序合并连接和分组

2.接受多个输入,然后使用同一种方式进行分区,但是不排序。这种情况适用于分区重要但顺序无关紧要。

3.对于广播散列连接,可以将一个算子的输出,发送到连接算子的所有分区

这样做的优点是:

  • 排序这种昂贵的工作只需要在实际需要的地方执行,不需要默认在Map或者Reduce阶段中实现
  • Mapper的工作可以合并在上一个 Reduce 算子中
  • 方便调度,数据可以通过共享内存缓冲区传输,不需要通过网络复制数据
  • 算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。

数据流引擎执行与 MapReduce 工作流同样的计算,根据上述优化,通常执行速度要明显快得多。

容错

完全物化中间状态至分布式文件系统的一个优点是,它具有持久性,这使得 MapReduce 中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。

如果一台机器上发生故障,并且该机器上的中间状态丢失,则它回从其他仍然可用的数据重新计算。但是要注意,这种处理方式要求“算子”是确定性的,输入同样的数据集那么它们经过算子处理后的数据一定是相同的。工程师需要消除算子中存在的不稳定性,比如禁止使用随机数,系统时钟等。

通过重算数据来从故障中恢复并不总是正确的答案:如果中间状态数据要比源数据小得多,或者如果计算量非常大,那么将中间数据物化为文件可能要比重新计算廉价的多。

MapReduce 就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是 Unix 管道。

手写MapReduce作业可能是个苦力活,随着技术发展,一些高级语言可以迁移到新的数据流执行引擎,而无需重写作业代码。但是本人没有这方面的编程经验,更适合的可能是通过数据流API来进行实践,通过某些通用的接口来完成这部分的拓展和工作,这倒也符合 Unix 的哲学。

MapReduce 是围绕着回调函数的概念建立的:对于每条记录或者一组记录,调用一个用户定义的函数(Mapper 或 Reducer),并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以基于大量已有库的生态系统创作:解析、自然语言分析、图像分析以及运行数值或统计算法等。

自由运行任意代码,长期以来都是传统 MapReduce 批处理系统与 MPP 数据库的区别所在。

批处理系统在机器学习或推荐系统,分类器等领域可能更实用。

批处理引擎正被用于分布式执行日益广泛的各领域算法。随着批处理系统获得各种内置功能以及高级声明式算子,且随着 MPP 数据库变得更加灵活和易于编程,两者开始看起来相似了:最终,它们都只是存储和处理数据的系统。

本章小结

本章中讨论了批处理系统,从Unix工具开始延伸,再到MapReduce框架实现原理和过程,以及分布式中批处理面对分区,容错等方面的处理,最后到探讨批处理系统的发展。

得益于这个框架,在处理批处理作业时不需要担心实现容错机制,上述原理也向我们证明了批处理系统的可靠性。

分布式批处理引擎有一个刻意限制的编程模型:回调函数(比如 Mapper 和 Reducer)被假定是无状态的,而且除了指定的输出外,必须没有任何外部可见的副作用。这一限制允许框架在其抽象下隐藏一些困难的分布式系统问题:当遇到崩溃和网络问题时,任务可以安全地重试,任何失败任务的输出都被丢弃。如果某个分区的多个任务成功,则其中只有一个能使其输出实际可见。

批处理作业的显著特点是,它读取一些输入数据并产生一些输出数据,但不修改输入 —— 换句话说,输出是从输入衍生出的。最关键的是,输入数据是 有界的(bounded):它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个作业知道自己什么时候完成了整个输入的读取,所以一个工作在做完后,最终总是会完成的。

流处理

当事件发生时就立即进行处理,这就是流处理的思想。事件流视为一种数据管理机制:无界限,增量处理。

传递事件流

在流处理术语中,一个事件由生产者生成一次,然后可能有多个消费者进行处理。在文件系统中,文件名标识一组相关记录;在流式系统中,相关的事件通常被聚合为一个 主题(topic)流(stream)

文件或数据库可以连接生产者或消费者:生产者生成的每个事件写入数据存储,每个消费者定期轮询数据存储,检查自上次运行以来新出现的事件。

如果想低延迟的连续处理时,一旦数据存储不是为这种用途专门设计,那么就不得不使用轮询。轮询的越频繁,能返回新事件的请求比例就越低,额外开销就会越高。

关系型数据库通常有触发器,针对变化作出反应,但是功能有限且难用。

消息传递系统

消息传递系统:生产者发送包含事件的消息,然后将消息推送给消费者。

Unix管道也可以视为一个简易的消息传递系统,只不过Unix管道是一个生产者一个消费者。而消息传递系统允许多个生产者将消息发送到同一个主题,同时允许多个消费者节点接收主题中的消息。

如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?

1.系统可以丢掉消息

2.将消息放入缓冲队列

3.使用流量控制。

如果节点崩溃或暂时脱机,会发生什么情况?会存在消息丢失的情况吗?

和数据库一样,持久性需要写入磁盘,这同时也代表如果你需要更高的吞吐量和更低的延迟,你可能要忍受消息丢失的可能。但是关于是否能接受这点,则取决于应用。假如对某个采集指标数据并不敏感,偶尔丢失数据可能并不重要。但是这里要特别警醒,如果大量消息被丢弃,可能应用来不及做出反应来意识到指标已经不正确了。如果涉及到更复杂一点的场景,比如金融领域,丢失消息是一定不能忍受的。

如果直接从生产者传递给消费者,又会发生什么呢?

比如很多消息传递系统使用生产者和消费者之间使用网络通信,而不通过中间节点。

  • UDP组播广泛应用于金融行业,因为低时延非常重要。然 UDP 本身是不可靠的,但应用层的协议可以恢复丢失的数据包。
  • 无代理的消息库,通过TCO或者IP多播实现发布/订阅消息传递。
  • 使用UDP消息传递收集到的所有机器的指标,并对其进行监控。只有接收到所有消息,才认为计数器指标是正确的。
  • 直接通过HTTP或RPC请求将消息推送给消费者。

消息直接传递系统始终要注意:它们的容错程度即为有限:即使协议检测到并重传在网络中丢失的数据包,它们也只能假设生产者和消费者始终在线。

如果消费者处于脱机状态,则可能会丢失其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但当生产者崩溃时,它可能会丢失消息缓冲区及其本应发送的消息,这种方法可能就没用了。

消息代理(消息队列)

消息代理实际上是一种针对处理消息流而优化的数据库。它作为服务器运行,生产和消费者作为客户端连接到服务器。生产者将消息写入代理,消费者用过代理读取消息。

一些消息代理只将消息保存在内存中,而另一些消息代理将消息写入磁盘,以便在代理崩溃的时候不会丢失消息。针对缓慢的消费者,它们通常会允许无上限的排队,而不是选择丢弃消息或使用流量控制。

排队的结果是,消费者通常是 异步(asynchronous) 的:当生产者发送消息时,通常只会等待代理确认消息已经被缓存,而不等待消息被消费者处理。向消费者递送消息将发生在未来某个未定的时间点 —— 通常在几分之一秒之内,但有时当消息堆积时会显著延迟。

消息代理和数据库的区别:

1.数据库会一直保留数据直至接收到显式的删除命令(在实际中一般是软删除),而大部分消息代理在消息成功传递给消费者后会自动删除消息。消息代理不适合长期存储数据。

2.大多数消息代理都认为它们的工作集相当小,指队列相对较短。如果代理需要缓存很多消息,每个消息需要很长时间处理,整体应用的吞吐量可能会恶化。

3.数据库支持索引,支持各种查询数据的方式。消息代理通常支持按照某种模式匹配主题,订阅子集。

4.数据库在查询时,会有快照这一概念。消息代理不支持任意查询,但是数据发生变化时,它们会主动通知客户端。

多个消费者情况:

当多个消费者从同一个主题中读取消息,会存在两种主要模式:

1.负载均衡,被多个消费者共享。如果希望能并行处理消息时,该模式非常有效。

2.扇出,每条消息都会被传递给所有消费者,这种模式下允许几个独立的消费者监听相同的订阅,而不会互相影响。

两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅同一个主题,每一组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。

确认与重新传递

消费者可能会随时崩溃,有一种情况是:代理传递消息给消费者,但是消费者没有处理,或者中途崩溃只处理了部分。为了确保每一个消息都能处理,消息代理使用确认:客户端必须明确告知消息处理的事件,以便代理能将消息从队列中移除。

如果与客户端的连接关闭,或者代理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者。

即使消息代理试图保留消息的顺序(如 JMS 和 AMQP 标准所要求的),负载均衡与重传的组合也不可避免地导致消息被重新排序。为避免此问题,你可以让每个消费者使用单独的队列(即不使用负载均衡功能)。如果消息是完全独立的,则消息顺序重排并不是一个问题。如果消息之间存在因果依赖关系,这就是一个很重要的问题。

分区日志

基于日志的消息代理:既有数据库的持久存储方式,又有消息传递的低延迟通知。

日志只不过是磁盘上简单的仅追加记录序列。同样的结构也可以用于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。 Unix 工具 tail -f 能监视文件被追加写入的数据,基本上就是这样工作的。

为了伸缩超出单个磁盘所能提供的更高吞吐量,可以对日志进行分区。不同分区可以托管在不同的机器上,使得每个分区都有一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。

在每个分区内,代理为每个消息分配一个单调递增的序列号或 偏移量。这种序列号是有意义的,因为分区是仅追加写入的,所以分区内的消息是完全有序的。没有跨不同分区的顺序保证。

日志与传统的消息传递相比

基于日志的的方法天然支持扇出式消息传递,因为多个消费者可以独立读取日志,而不会相互影响。为了在一组消费者之间实现负载均衡,代理可以将整个分区分配给消费者组所在的节点,而不是分配单条消息给消费者。

通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点:

1.共享消费主题工作的节点数,最多为该主题中的日志分区数,因为同一个分区内的所有消息被递送到同一个节点

2.如果某条消息处理的和缓慢,则它会阻塞该分区中后续消息的处理

因此在消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况下,JMS/AMQP 风格的消息代理是可取的。另一方面,在消息吞吐量很高,处理迅速,顺序很重要的情况下,基于日志的方法表现得非常好。

消费者偏移量

顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。

消息代理表现的像单领导数据库复制中的主库,而消费者表现的像从库。

如果消费者节点失效,则将失效消费者的分区指派给其他节点,并从最后记录的偏移量开始消费消息。

当消费者跟不上生产者时

如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让运维人员来修复慢消费者,并在消息开始丢失之前让其赶上。

即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运维优势:你可以实验性地消费生产日志,以进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的只有消费者偏移量。

这种行为也与传统的消息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列 —— 否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。

重播就消息

如果使用AMQP和JMS风格的消息代理,处理和确认消息会将消息删除。如果是在基于日志的消息代理中,则表现的更像是从文件中读取数据。

除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量的前进。但偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地操纵:例如你可以用昨天的偏移量跑一个消费者副本,并将输出写到不同的位置,以便重新处理最近一天的消息。你可以使用各种不同的处理代码重复任意次。

这样更像批处理,它允许重复执行多次。

数据库与流

事件是指某个时刻发生的事情的记录。某些写入数据库的事实是可以被捕捉,存储和处理的事件。

事实上,复制日志就是一个由数据库写入事件组成的流,由主库在处理事务时生成。

保持系统同步

事实上大部分重要应用都需要组合使用几种不同的技术来满足所有的需求。每一种技术都有自己的数据副本,并根据自己的目的进行存储方式的优化。

由于相同或相关的数据出现在了不同的地方,因此相互间需要保持同步:如果某个项目在数据库中被更新,它也应当在缓存、搜索索引和数据仓库中被更新。对于数据仓库,这种同步通常由 ETL 进程执行,通常是先取得数据库的完整副本,然后执行转换,并批量加载到数据仓库中 —— 换句话说,批处理。我们在 “批处理工作流的输出” 中同样看到了如何使用批处理创建搜索索引、推荐系统和其他衍生数据系统。

有一种代替方法是“双写”,用于解决周期性的完整数据库转存过于缓慢的问题,其中应用代码在数据库变更时明确写入每个系统:首先写入数据库,然后更新搜索索引,然后使缓存项失效(甚至同时执行这些写入)。

但是双写会造成竞争条件:一个值将简单地以无提示方式覆盖另一个值。

双重写入的另一个问题是,其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相不一致的结果。确保它们要么都成功要么都失败,是原子提交问题的一个例子,解决这个问题的代价是昂贵的。

如果你只有一个单领导者复制的数据库,那么这个领导者决定了写入顺序,而状态机复制方法可以在数据库副本上工作。然而,没有单个主库:数据库可能有一个领导者,搜索索引也可能有一个领导者,但是两者都不追随对方,所以可能会发生冲突。

变更数据捕获

变更数据捕获(change data capture, CDC)一种观察写入数据库的所有数据变更,并将其提取并转换为可以复制到其他系统中的形式的过程。

例如,你可以捕获数据库中的变更,并不断将相同的变更应用至搜索索引。如果变更日志以相同的顺序应用,则可以预期搜索索引中的数据与数据库中的数据是匹配的。搜索索引和任何其他衍生数据系统只是变更流的消费者。

日志消费可以被称为衍生数据系统,存储在搜索索引和数据仓库中的数据,只是 记录系统 数据的额外视图。变更数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在衍生数据系统中,以便衍生系统具有数据的准确副本。

本质上,变更数据捕获更像让一个数据库成为领导者,而其他组件变为追随者。基于日志的消息代理非常适合从源数据库传输变更事件,因为它保留了消息的顺序。

数据库触发器可用来实现变更数据捕获,通过注册观察所有变更的初发期,并将相应的变更项写入变更日志表中。

类似于消息代理,变更数据捕获通常是异步的:记录数据库系统在提交变更之前不会等待消费者应用变更。这种设计具有的运维优势是,添加缓慢的消费者不会过度影响记录系统。不过,所有复制延迟可能有的问题在这里都可能出现。

初始快照

如果拥有所有对数据库进行变更的日志,我可以通过重播日志来重建数据库的完整状态。但是实际上这种方式会太占磁盘空间,且重播过于费时,所以一般情况下日志都是被截断的。

数据库的快照必须与变更日志中的已知位置或偏移量相对应,以便在处理完快照后知道从哪里开始应用变更。一些 CDC 工具集成了这种快照功能,而其他工具则把它留给你手动执行。

日志压缩

日志压缩可以是一个备选方案。存储引擎定期在日志中查找具有相同键的记录,丢掉所有重复的内容,并只保留每个键的最新更新。这个压缩与合并过程在后台运行。

在日志结构存储引擎中,具有特殊值 NULL(墓碑,即 tombstone)的更新表示该键被删除,并会在日志压缩过程中被移除。但只要键不被覆盖或删除,它就会永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不取决于数据库中曾经发生的写入次数。如果相同的键经常被覆盖写入,则先前的值将最终将被垃圾回收,只有最新的值会保留下来。

在基于日志的消息代理与变更数据捕获的上下文中也适用相同的想法。如果 CDC 系统被配置为,每个变更都包含一个主键,且每个键的更新都替换了该键以前的值,那么只需要保留对键的最新写入就足够了。

现在,无论何时需要重建衍生数据系统(如搜索索引),你可以从压缩日志主题的零偏移量处启动新的消费者,然后依次扫描日志中的所有消息。日志能保证包含数据库中每个键的最新值(也可能是一些较旧的值)—— 换句话说,你可以使用它来获取数据库内容的完整副本,而无需从 CDC 源数据库取一个快照。

Apache Kafka 支持这种日志压缩功能。

事件溯源

事件溯源涉及到 将所有对应用状态的变更 存储为变更事件日志。

与变更数据捕获类似,事件溯源涉及到 将所有对应用状态的变更 存储为变更事件日志。最大的区别是事件溯源将这一想法应用到了一个不同的抽象层次上:

  • 在变更数据捕获中,应用以 可变方式(mutable way) 使用数据库,可以任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免竞态条件。写入数据库的应用不需要知道 CDC 的存在。
  • 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。事件被设计为旨在反映应用层面发生的事情,而不是底层的状态变更。

事件溯源是一种强大的数据建模技术:从应用的角度来看,将用户的行为记录为不可变的事件更有意义,而不是在可变数据库中记录这些行为的影响。事件溯源使得应用随时间演化更为容易,通过更容易理解事情发生的原因来帮助调试的进行,并有利于防止应用 Bug。

用户期望看到的是系统的当前状态,而不是变更历史。因此,使用时间溯源的应用需要拉取事件日志,并将其转换为适合向用户显示的应用状态。

重播事件日志允许重新构建系统的当前状态。不过日志压缩需要采用不同的方式处理:

  • 用于记录更新的 CDC 事件通常包含记录的 完整新版本,因此主键的当前值完全由该主键的最近事件确定,而日志压缩可以丢弃相同主键的先前事件。
  • 另一方面,事件溯源在更高层次进行建模:事件通常表示用户操作的意图,而不是因为操作而发生的状态更新机制。在这种情况下,后面的事件通常不会覆盖先前的事件,所以你需要完整的历史事件来重新构建最终状态。这里进行同样的日志压缩是不可能的。

使用事件溯源的应用通常有一些机制,用于存储从事件日志中导出的当前状态快照,因此它们不需要重复处理完整的日志。然而这只是一种性能优化,用来加速读取,提高从崩溃中恢复的速度;真正的目的是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。

命令和事件

事件溯源的哲学是仔细区分 事件(event)命令(command)。事件生成的时刻,它就成为了事实。一个请求刚刚到达的时,它一开始是一个命令:在这个时间点上它人可能失败。如果验证成功且被接受,那么它就会变味一个持久化不可变的事件。

事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经成为日志中不可变的一部分,并且可能已经被其他消费者看到了。因此任何对命令的验证,都需要在它成为事件之前同步完成。例如,通过使用一个可以原子性地自动验证命令并发布事件的可串行事务。

并发控制

事件溯源和变更数据捕获的最大缺点是,事件日志的消费者通常是异步的,所以可能会出现这样的情况:用户会写入日志,然后从日志衍生视图中读取,结果发现他的写入还没有反映在读取视图中。

有一种解决方案是:将事件追加到日志时同步执行读取视图更新。如果要将这些写入操作合并成一个原子单元,则需要使用事务。所以将事件日志和读取试图保存在同一个存储系统中,要么就需要跨不同系统进行分布式事务。

事件日志导出当前状态也简化了并发控制的某些部分。许多对多对象事务的需求源于单个用户操作需要在多个不同的位置更改数据。

如果事件日志与应用状态以相同的方式分区,那么直接使用单线程日志消费者就不需要写入并发控制,它从设计上一次只处理一个事件。而日志通过分区中定义事件的序列顺序,消除了并发性的不确定性。

流处理

流的来源(用户活动事件,传感器和写入数据库),流传输(直接通过消息传送,通过消息代理,通过事件日志)

那么可以用流来做什么呢?

1.将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后能被其他客户端查询。

2.可以通过某种方式将事件推送给用户,例如发送报警邮件或推送通知,或将事件流数据传送到屏幕上。在这种情况下,用户则是最终消费者。

3.可以处理一个或多个输入流,然后产生一个或者多个输出流。流可以经过多个处理阶段组成的流水线,最后再进行输出。

复合事件处理(complex event processing, CEP):与正则表达式允许你在字符串中搜索特定字符模式的方式类似,CEP 允许你指定规则以在流中搜索某些事件模式。

流分析:分析往往对找出特定事件序列并不关心,而更关注大量事件上的聚合与统计指标,比如测量某种类型事件的速率(每个时间间隔内发生的频率)。

流分析系统有时使用概率算法。概率算法产出近似的结果,比起精确算法的优点是内存使用要少得多。事实上,流处理并没有任何内在的近似性,而概率算法只是一种优化。

流搜索:存在基于复杂标准来搜索单个事件的需求。

例如,媒体监测服务可以订阅新闻文章 Feed 与来自媒体的播客,搜索任何关于公司、产品或感兴趣的话题的新闻。这是通过预先构建一个搜索查询来完成的,然后不断地将新闻项的流与该查询进行匹配。在一些网站上也有类似的功能:例如,当市场上出现符合其搜索条件的新房产时,房地产网站的用户可以要求网站通知他们。Elasticsearch 的这种过滤器功能,是实现这种流搜索的一种选择.

如果是传统的搜索引擎会先索引文件,然后在索引上跑查询。搜索一个数据流则反过来,查询被存储,而文档从查询中流过,就像在CEP中。

消息传递系统可以作为RPC的代替方案,作为一种服务间的通讯机制。RPC 类系统与流处理之间有一些交叉领域,比如分布式RPC功能:它允许将用户查询分散到一系列也处理事件流到节点上;然后这些查询与来自输入流到事件交织,但是结果可以被汇总并发回给用户。

时间推理

流处理通常需要与时间打交道,尤其是用于分析目的。

在批处理中过程中,大量的历史事件被快速地处理。如果需要按时间来分析,批处理器需要检查每个事件中嵌入的时间戳。读取运行批处理机器的系统时钟没有任何意义,因为处理运行的时间与事件实际发生的时间无关。

批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是历史中的一年,而不是处理中的几分钟。而且使用事件中的时间戳,使得处理是 确定性 的:在相同的输入上再次运行相同的处理过程会得到相同的结果

另一方面,许多流处理框架使用处理机器上的本地系统时钟(处理时间,即 processing time)来确定 窗口(windowing)。这种方法的优点是简单,如果事件创建与事件处理之间的延迟可以忽略不计,那也是合理的。然而,如果存在任何显著的处理延迟 —— 即,事件处理显著地晚于事件实际发生的时间,这种处理方式就失效了。

书中例举了很多可能导致处理延迟的情况发生:排队,网络故障,性能原因导致消息代理出现争用,流消费者重启,从故障中恢复时重新处理过去的事件等。

比较棘手的事,消息延迟还可能导致无法预测消息顺序。将事件时间和处理时间搞混会导致错误的数据。

矫正不正确的设备时钟:

  • 事件发生的时间,取决于设备时钟
  • 事件发送往服务器的时间,取决于设备时钟
  • 事件被服务器接收的时间,取决于服务器时钟

用第三个时间戳减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移。然后可以将该偏移应用于事件时间戳,从而估计事件实际发生的真实时间。

本章小结

书中在流处理一文中给我们展示了,消息代理,即:

AMQP/JMS 风格的消息代理:将单条消息分配给消费者,消费者在成功处理单条消息后确认消息。消息被确认后从代理中删除。

基于日志的消息代理:代理将一个分区中的所有消息分配给同一个消费者节点,并始终以相同的顺序传递消息。并行是通过分区实现的,消费者通过存档最近处理消息的偏移量来跟踪工作进度。消息代理将消息保留在磁盘上,因此如有必要的话,可以回跳并重新读取旧消息。

就流的来源而言,我们讨论了几种可能性:用户活动事件,定期读数的传感器,和 Feed 数据(例如,金融中的市场数据)能够自然地表示为流。我们发现将数据库写入视作流也是很有用的:我们可以捕获变更日志 —— 即对数据库所做的所有变更的历史记录 —— 隐式地通过变更数据捕获,或显式地通过事件溯源。日志压缩允许流也能保有数据库内容的完整副本。

将数据库表示为流为系统集成带来了很多强大机遇。通过消费变更日志并将其应用至衍生系统,你能使诸如搜索索引、缓存以及分析系统这类衍生数据系统不断保持更新。你甚至能从头开始,通过读取从创世至今的所有变更日志,为现有数据创建全新的视图。

像流一样维护状态以及消息重播的基础设施,是在各种流处理框架中实现流连接和容错的基础。我们讨论了流处理的几种目的,包括搜索事件模式(复杂事件处理),计算分窗聚合(流分析),以及保证衍生数据系统处于最新状态(物化视图)。

然后我们讨论了在流处理中对时间进行推理的困难,包括处理时间与事件时间戳之间的区别,以及当你认为窗口已经完事之后,如何处理到达的掉队事件的问题。

我们区分了流处理中可能出现的三种连接类型:

  • 流流连接

    两个输入流都由活动事件组成,而连接算子在某个时间窗口内搜索相关的事件。例如,它可能会将同一个用户 30 分钟内进行的两个活动联系在一起。如果你想要找出一个流内的相关事件,连接的两侧输入可能实际上都是同一个流(自连接,即 self-join)。

  • 流表连接

    一个输入流由活动事件组成,另一个输入流是数据库变更日志。变更日志保证了数据库的本地副本是最新的。对于每个活动事件,连接算子将查询数据库,并输出一个扩展的活动事件。

  • 表表连接

    两个输入流都是数据库变更日志。在这种情况下,一侧的每一个变化都与另一侧的最新状态相连接。结果是两表连接所得物化视图的变更流。

最后,我们讨论了在流处理中实现容错和恰好一次语义的技术。与批处理一样,我们需要放弃任何失败任务的部分输出。然而由于流处理长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微批次、存档点、事务或幂等写入。

写在最后

作者认为(《设计数据密集型应用》作者),在愿意为分布式事务付出代价的有限场景中,已经被成功应用,作者相信位分布式事务设计一种更好的协议是可行的。但是这不是立竿见影的事,如果在没有广泛支持的良好分布式事务协议的情况下,作者认为基于日志的衍生数据是集成不同数据系统的最有前途的方法。

大多数共识算法都是针对单个节点的吞吐量足以处理整个事件流的情况而设计的,并且这些算法不提供多个节点共享事件排序工作的机制。设计可以伸缩至单个节点的吞吐量之上,且在地理位置分散的环境中仍然工作良好的的共识算法仍然是一个开放的研究问题。

批处理和流处理有许多共同的原则,主要的根本区别在于流处理器在无限数据集上运行,而批处理输入是已知的有限大小。处理引擎的实现方式也有很多细节上的差异,但是这些区别已经开始模糊。

Lambad 架构

Lambda 架构的核心思想是通过将不可变事件附加到不断增长的数据集来记录传入数据,这类似于事件溯源。为了从这些事件中衍生出读取优化的视图,Lambda 架构建议并行运行两个不同的系统:批处理系统(如 Hadoop MapReduce)和独立的流处理系统(如 Storm)。

传统的同步写入方法需要跨异构存储系统的分布式事务,作者认为这是错误的解决方案。单个存储或流处理系统内的事务是可行的,但是数据跨越不同技术之间的边界时,具有幂等写入的异步事件日志是一种更加健壮和实用的方法。

运行几种不同基础设施的复杂性可能是一个问题:每种软件都有一个学习曲线,配置问题和操作怪癖,因此部署尽可能少的移动部件是很有必要的。比起使用应用代码拼接多个工具而成的系统,单一集成软件产品也可以在其设计应对的工作负载类型上实现更好、更可预测的性能。

分拆的目标不是要针对个别数据库与特定工作负载的性能进行竞争;我们的目标是允许你结合多个不同的数据库,以便在比单个软件可能实现的更广泛的工作负载范围内实现更好的性能。

如果有一项技术可以满足你的所有需求,那么最好使用该产品,而不是试图用更低层级的组件重新实现它。

数据库这种有状态系统被设计的永远记住事物,如果出现问题不加以解决,那么问题将永远存在下去。基础设施级别的服务意味着它们更加底层,同时需要更仔细的思考。

原子性,隔离性和持久性等事务性是一直构建正确应用的首选工具。

事物在某些领域被完全抛弃,被更好性能与可伸缩模型取代。如果需要更强的正确性保证,那么可串行化与原子提交是个好办法,但是它们只能在单个数据中心工作,并限制了系统能够实现的规模与容错特性。

幂等:即确保它无论是执行一次还是执行多次都具有相同的效果。但是,将不是天生幂等的操作变为幂等的操作需要一些额外的努力与关注:你可能需要维护一些额外的元数据(例如更新了值的操作 ID 集合),并在从一个节点故障切换至另一个节点时做好防护。

事务是代价高昂的,当涉及异构存储技术时尤为甚。使用分布式事务是因为它开销太大,最后不得不在应用代码中重新实现容错机制。

唯一性约束需要达成共识:如果存在多个具有相同值的并非请求,则系统需要决定冲突操作中的哪一个被接受,并拒绝其他违背约束的操作。达成共识的最常见方式就是使单个节点作为领导,并使其负责所有决策。

无协调数据系统可以使用多领导着配置运维,跨越多个数据中心,在区域间异步复制。任何一个数据中心都可以持续独立运行,因为不需要同步的跨区域协调。但是及时性保证会很弱。

密码学审计与完整性检查通常依赖 默克尔树(Merkle tree),这是一颗散列值的树,能够用于高效地证明一条记录出现在一个数据集中。