Category Archives: 一致性哈希

分布式服务化系统一致性的“最佳实干”

1 背景

一致性是一个抽象的、具有多重含义的计算机术语,在不同应用场景下,有不同的定义和含义。在传统的IT时代,一致性通常指强一致性,强一致性通常体现在你中有我、我中有你、浑然一体;而在互联网时代,一致性的含义远远超出了它原有的含义,在我们讨论互联网时代的一致性之前,我们先了解一下互联网时代的特点,互联网时代信息量巨大、需要计算能力巨大,不但对用户响应速度要求快,而且吞吐量指标也要向外扩展(既:水平伸缩),于是单节点的服务器无法满足需求,服务节点开始池化,想想那个经典的故事,一只筷子一折就断,一把筷子怎么都折不断,可见人多力量大的思想是多么的重要,但是人多也不一定能解决所有事情,还得进行有序、合理的分配任务,进行有效的管理,于是互联网时代谈论最多的话题就是拆分,拆分一般分为“水平拆分”和“垂直拆分”(大家不要对应到数据库或者缓存拆分,这里主要表达一种逻辑)。这里,“水平拆分”指的是同一个功能由于单机节点无法满足性能需求,需要扩展成为多节点,多个节点具有一致的功能,组成一个服务池,一个节点服务一部分的请求量,团结起来共同处理大规模高并发的请求量。“垂直拆分”指的是按照功能拆分,秉着“专业的人干专业的事儿”的原则,把一个复杂的功能拆分到多个单一的简单的元功能,不同的元功能组合在一起,和未拆分前完成的功能是一致的,由于每个元功能职责单一、功能简单,让维护和变更都变得更简单、安全,更易于产品版本的迭代,在这样的一个互联网的时代和环境,一致性指分布式服务化系统之间的弱一致性,包括应用系统一致性和数据一致性。

无论是水平拆分还是垂直拆分,都解决了特定场景下的特定问题,凡事有好的一面,都会有坏的一面,拆分后的系统或者服务化的系统最大的问题就是一致性问题,这么多个具有元功能的模块,或者同一个功能池中的多个节点之间,如何保证他们的信息是一致的、工作步伐是一致的、状态是一致的、互相协调有序的工作呢?

本文根据作者在互联网企业的实际项目经验,对服务化系统中最难解决的一致性问题进行研究和探讨,试图从实践经验中找到规律,抽象出模式,分享给大家,希望对大家的项目实施有所帮助,在对实践的总结中也会对相关的一致性术语做最朴实的解释,希望能帮助大家彻底理解一致性的本质,并能将其应用到实践,解决读者现实中遇到的服务化系统的一致性问题,本文使用理论与实践相结合的方法,突出在实践中解决问题的模式,因此叫做《分布式服务化系统一致性的“最佳实干”》。

2 问题

本节列举不一致会导致的种种问题,这也包括一例生活中的问题。

案例1:买房

假如你想要享受生活的随意,只想买个两居,不想让房贷有太大压力,而你媳妇却想要买个三居,还得带花园的,那么你们就不一致了,不一致导致生活不愉快、不协调,严重情况下还会吵架,可见生活中的不一致问题影响很大。

案例2:转账

转账是经典的不一致案例,设想一下银行为你处理一笔转账,扣减你账户上的余额,然后增加别人账户的余额;如果扣减你的账户余额成功,增加别人账户余额失败,那么你就会损失这笔资金。反过来,如果扣减你的账户余额失败,增加别人账户余额成功,那么银行就会损失这笔资金,银行需要赔付。对于资金处理系统来说,上面任何一种场景都是不允许发生的,一旦发生就会有资金损失,后果是不堪设想的,严重情况会让一个公司瞬间倒闭,可参考案例

案例3:下订单和扣库存

电商系统中也有一个经典的案例,下订单和扣库存如何保持一致,如果先下订单,扣库存失败,那么将会导致超卖;如果下订单没有成功,扣库存成功,那么会导致少卖。两种情况都会导致运营成本的增加,严重情况下需要赔付。

案例4:同步超时

服务化的系统间调用常常因为网络问题导致系统间调用超时,即使是网络很好的机房,在亿次流量的基数下,同步调用超时也是家常便饭。系统A同步调用系统B超时,系统A可以明确得到超时反馈,但是无法确定系统B是否已经完成了预定的功能或者没有完成预定的功能。于是,系统A就迷茫了,不知道应该继续做什么,如何反馈给使用方。(曾经的一个B2B产品的客户要求接口超时重新通知他们,这个在技术上是难以实现的,因为服务器本身可能并不知道自己超时,可能会继续正常的返回数据,只是客户端并没有接受到结果罢了,因此这不是一个合理的解决方案)。

案例5:异步回调超时

此案例和上一个同步超时案例类似,不过这个场景使用了异步回调,系统A同步调用系统B发起指令,系统B采用受理模式,受理后则返回受理成功,然后系统B异步通知系统A。在这个过程中,如果系统A由于某种原因迟迟没有收到回调结果,那么两个系统间的状态就不一致,互相认知不同会导致系统间发生错误,严重情况下会影响核心事务,甚至会导致资金损失。

案例6:掉单

分布式系统中,两个系统协作处理一个流程,分别为对方的上下游,如果一个系统中存在一个请求,通常指订单,另外一个系统不存在,则导致掉单,掉单的后果很严重,有时候也会导致资金损失。

案例7:系统间状态不一致

这个案例与上面掉单案例类似,不同的是两个系统间都存在请求,但是请求的状态不一致。

案例8:缓存和数据库不一致

交易相关系统基本离不开关系型数据库,依赖关系型数据库提供的ACID特性(后面介绍),但是在大规模高并发的互联网系统里,一些特殊的场景对读的性能要求极高,服务于交易的数据库难以抗住大规模的读流量,通常需要在数据库前垫缓存,那么缓存和数据库之间的数据如何保持一致性?是要保持强一致呢还是弱一致性呢?

案例9:本地缓存节点间不一致

一个服务池上的多个节点为了满足较高的性能需求,需要使用本地缓存,使用了本地缓存,每个节点都会有一份缓存数据的拷贝,如果这些数据是静态的、不变的,那永远都不会有问题,但是如果这些数据是半静态的或者常被更新的,当被更新的时候,各个节点更新是有先后顺序的,在更新的瞬间,各个节点的数据是不一致的,如果这些数据是为某一个开关服务的,想象一下重复的请求走进了不同的节点(在failover或者补偿导致的场景下,重复请求是一定会发生的,也是服务化系统必须处理的),一个请求走了开关打开的逻辑,同时另外一个请求走了开关关闭的逻辑,这导致请求被处理两次,最坏的情况下会导致灾难性的后果,就是资金损失。

案例10:缓存数据结构不一致

这个案例会时有发生,某系统需要种某一数据结构的缓存,这一数据结构有多个数据元素组成,其中,某个数据元素都需要从数据库中或者服务中获取,如果一部分数据元素获取失败,由于程序处理不正确,仍然将不完全的数据结构存入缓存,那么缓存的消费者消费的时候很有可能因为没有合理处理异常情况而出错。

3 模式

3.1 生活中不一致问题的解决

大家回顾一下上一节列举的生活中的案例1-买房,如果置身事外来看,解决这种不一致的办法有两个,一个是避免不一致的发生,如果已经是媳妇了就不好办了:),还有一种方法就是慢慢的补偿,先买个两居,然后慢慢的等资金充裕了再换三居,买比特币赚了再换带花园的房子,于是问题最终被解决了,最终大家处于一致的状态,都开心了。这样可以解决案例1的问题,很自然由于有了过渡的方法,问题在不经意间就消失了,可见“过渡”也是解决一致性问题的一个模式。

从案例1的解决方案来看,我们要解决一致性问题,一个最直接最简单的方法就是保持强一致性,对于案例1的情况,尽量避免在结婚前两个人能够互相了解达成一致,避免不一致问题的发生;不过有些事情事已至此,发生了就是发生了,出现了不一致的问题,我们应该考虑去补偿,尽最大的努力从不一致状态修复到一致状态,避免损失全部或者一部分,也不失为一个好方法。

因此,避免不一致是上策,出现了不一致及时发现及时修复是中策,有问题不积极解决留给他人解决是下策。

3.2 酸碱平衡理论

ACID在英文中的意思是“酸”,BASE的意识是“碱”,这一段讲的是“酸碱平衡”的故事。

1. ACID(酸)

如何保证强一致性呢?计算机专业的童鞋在学习关系型数据库的时候都学习了ACID原理,这里对ACID做个简单的介绍。如果想全面的学习ACID原理,请参考ACID

关系型数据库天生就是解决具有复杂事务场景的问题,关系型数据库完全满足ACID的特性。

ACID指的是:

  • A: Atomicity,原子性
  • C: Consistency,一致性
  • I: Isolation,隔离性
  • D: Durability,持久性

具有ACID的特性的数据库支持强一致性,强一致性代表数据库本身不会出现不一致,每个事务是原子的,或者成功或者失败,事物间是隔离的,互相完全不影响,而且最终状态是持久落盘的,因此,数据库会从一个明确的状态到另外一个明确的状态,中间的临时状态是不会出现的,如果出现也会及时的自动的修复,因此是强一致的。

3个典型的关系型数据库Oracle、Mysql、Db2都能保证强一致性,Oracle和Mysql使用多版本控制协议实现,而DB2使用改进的两阶段提交协议来实现。

如果你在为交易相关系统做技术选型,交易的存储应该只考虑关系型数据库,对于核心系统,如果需要较好的性能,可以考虑使用更强悍的硬件,这种向上扩展(升级硬件)虽然成本较高,但是是最简单粗暴有效的方式,另外,Nosql完全不适合交易场景,Nosql主要用来做数据分析、ETL、报表、数据挖掘、推荐、日志处理等非交易场景。

前面提到的案例2-转账案例3-下订单和扣库存都可以利用关系型数据库的强一致性解决。

然而,前面提到,互联网项目多数具有大规模高并发的特性,必须应用拆分的理念,对高并发的压力采取“大而化小、小而化了”的方法,否则难以满足动辄亿级流量的需求,即使使用关系型数据库,单机也难以满足存储和TPS上的需求。为了保证案例2-转账可以利用关系型数据库的强一致性,在拆分的时候尽量的把转账相关的账户放入一个数据库分片,对于案例3,尽量的保证把订单和库存放入同一个数据库分片,这样通过关系型数据库自然就解决了不一致的问题。

然而,有些时候事与愿违,由于业务规则的限制,无法将相关的数据分到同一个数据库分片,这个时候我们就需要实现最终一致性。

对于案例2-转账场景,假设账户数量巨大,对账户存储进行了拆分,关系型数据库一共分了8个实例,每个实例8个库,每个库8个表,共512张表,假如要转账的两个账户正好落在了一个库里,那么可以依赖关系型数据库的事务保持强一致性。

如果要转账的两个账户正好落在了不同的库里,转账操作是无法封装在同一个数据库事务中的,这个时候会发生一个库的账户扣减余额成功,另外一个库的账户增加余额失败的情况。

对于这种情况,我们需要继续探讨解决之道,CAP原理和BASE原理,BASE原理通过记录事务的中间的临时状态,实现最终一致性。

2. CAP(帽子理论)

如果想深入的学习CAP理论,请参考CAP

由于对系统或者数据进行了拆分,我们的系统不再是单机系统,而是分布式系统,针对分布式系的帽子理论包含三个元素:

  • C:Consistency,一致性, 数据一致更新,所有数据变动都是同步的
  • A:Availability,可用性, 好的响应性能,完全的可用性指的是在任何故障模型下,服务都会在有限的时间处理响应
  • P:Partition tolerance,分区容错性,可靠性

帽子理论证明,任何分布式系统只可同时满足二点,没法三者兼顾。关系型数据库由于关系型数据库是单节点的,因此,不具有分区容错性,但是具有一致性和可用性,而分布式的服务化系统都需要满足分区容错性,那么我们必须在一致性和可用性中进行权衡,具体表现在服务化系统处理的异常请求在某一个时间段内可能是不完全的,但是经过自动的或者手工的补偿后,达到了最终的一致性。

3. BASE(碱)

BASE理论解决CAP理论提出了分布式系统的一致性和可用性不能兼得的问题,如果想全面的学习BASE原理,请参考Eventual consistency

BASE在英文中有“碱”的意思,对应本节开头的ACID在英文中“酸”的意思,基于这两个名词提出了酸碱平衡的结论,简单来说是在不同的场景下,可以分别利用ACID和BASE来解决分布式服务化系统的一致性问题。

BASE模型与ACID模型截然不同,满足CAP理论,通过牺牲强一致性,获得可用性,一般应用在服务化系统的应用层或者大数据处理系统,通过达到最终一致性来尽量满足业务的绝大部分需求。

BASE模型包含个三个元素:

  • BA:Basically Available,基本可用
  • S:Soft State,软状态,状态可以有一段时间不同步
  • E:Eventually Consistent,最终一致,最终数据是一致的就可以了,而不是时时保持强一致

BASE模型的软状态是实现BASE理论的方法,基本可用和最终一致是目标。按照BASE模型实现的系统,由于不保证强一致性,系统在处理请求的过程中,可以存在短暂的不一致,在短暂的不一致窗口请求处理处在临时状态中,系统在做每步操作的时候,通过记录每一个临时状态,在系统出现故障的时候,可以从这些中间状态继续未完成的请求处理或者退回到原始状态,最后达到一致的状态。

案例1-转账为例,我们把用户A给用户B转账分成四个阶段,第一个阶段用户A准备转账,第二个阶段从用户A账户扣减余额,第三个阶段对用户B增加余额,第四个阶段完成转账。系统需要记录操作过程中每一步骤的状态,一旦系统出现故障,系统能够自动发现没有完成的任务,然后,根据任务所处的状态,继续执行任务,最终完成任务,达到一致的最终状态。

在实际应用中,上面这个过程通常是通过持久化执行任务的状态和环境信息,一旦出现问题,定时任务会捞取未执行完的任务,继续未执行完的任务,直到执行完成为止,或者取消已经完成的部分操作回到原始状态。这种方法在任务完成每个阶段的时候,都要更新数据库中任务的状态,这在大规模高并发系统中不会有太好的性能,一个更好的办法是用Write-Ahead Log(写前日志),这和数据库的Bin Log(操作日志)相似,在做每一个操作步骤,都先写入日志,如果操作遇到问题而停止的时候,可以读取日志按照步骤进行恢复,并且继续执行未完成的工作,最后达到一致。写前日志可以利用机械硬盘的追加写而达到较好性能,因此,这是一种专业化的实现方式,多数业务系系统还是使用数据库记录的字段来记录任务的执行状态,也就是记录中间的“软状态”,一个任务的状态流转一般可以通过数据库的行级锁来实现,这比使用Write-Ahead Log实现更简单、更快速。

有了BASE理论作为基础,我们对复杂的分布式事务进行拆解,对其中的每一步骤都记录其状态,有问题的时候可以根据记录的状态来继续执行任务,达到最终的一致,通过这个方法我们可以解决案例2-转账案例3-下订单和扣库存中遇到的问题。

4. 酸碱平衡的总结

  1. 使用向上扩展(强悍的硬件)运行专业的关系型数据库(例如:Oracle或者DB2)能够保证强一致性,钱能解决的问题就不是问题
  2. 如果钱是问题,可以对廉价硬件运行的开源关系型数据库(例如:Mysql)进行分片,将相关的数据分到数据库的同一个片,仍然能够使用关系型数据库保证事务
  3. 如果业务规则限制,无法将相关的数据分到同一个片,就需要实现最终一致性,通过记录事务的软状态(中间状态、临时状态),一旦处于不一致,可以通过系统自动化或者人工干预来修复不一致的情况

3.3 分布式一致性协议

国际开放标准组织Open Group定义了DTS(分布式事务处理模型),模型中包含4个角色:应用程序、事务管理器、资源管理器、通信资源管理器四部分。事务处理器是统管全局的管理者,资源处理器和通信资源处理器是事务的参与者。

J2EE规范也包含此分布式事务处理模型的规范,并在所有的AppServer中进行实现,J2EE规范中定义了TX协议和XA协议,TX协议定义应用程序与事务管理器之间的接口,而XA协议定义了事务管理器与资源处理器之间的接口,在过去,大家使用AppServer,例如:Websphere、Weblogic、Jboss等配置数据源的时候会看见类似XADatasource的数据源,这就是实现了DTS的关系型数据库的数据源。企业级开发JEE中,关系型数据库、JMS服务扮演资源管理器的角色,而EJB容器则扮演事务管理器的角色。

下面我们就介绍两阶段提交协议三阶段提交协议以及阿里巴巴提出的TCC,它们都是根据DTS这一思想演变出来的。

1. 两阶段提交协议

上面描述的JEE的XA协议就是根据两阶段提交来保证事务的完整性,并实现分布式服务化的强一致性。

两阶段提交协议把分布式事务分成两个过程,一个是准备阶段,一个是提交阶段,准备阶段和提交阶段都是由事务管理器发起的,为了接下来讲解方便,我们把事务管理器称为协调者,把资管管理器称为参与者。

两阶段如下:

  1. 准备阶段:协调者向参与者发起指令,参与者评估自己的状态,如果参与者评估指令可以完成,参与者会写redo或者undo日志(这也是前面提起的Write-Ahead Log的一种),然后锁定资源,执行操作,但是并不提交
  2. 提交阶段:如果每个参与者明确返回准备成功,也就是预留资源和执行操作成功,协调者向参与者发起提交指令,参与者提交资源变更的事务,释放锁定的资源;如果任何一个参与者明确返回准备失败,也就是预留资源或者执行操作失败,协调者向参与者发起中止指令,参与者取消已经变更的事务,执行undo日志,释放锁定的资源

两阶段提交协议成功场景示意图如下:

两阶段提交协议

我们看到两阶段提交协议在准备阶段锁定资源,是一个重量级的操作,并能保证强一致性,但是实现起来复杂、成本较高,不够灵活,更重要的是它有如下致命的问题:

  1. 阻塞:从上面的描述来看,对于任何一次指令必须收到明确的响应,才会继续做下一步,否则处于阻塞状态,占用的资源被一直锁定,不会被释放
  2. 单点故障:如果协调者宕机,参与者没有了协调者指挥,会一直阻塞,尽管可以通过选举新的协调者替代原有协调者,但是如果之前协调者在发送一个提交指令后宕机,而提交指令仅仅被一个参与者接受,并且参与者接收后也宕机,新上任的协调者无法处理这种情况
  3. 脑裂:协调者发送提交指令,有的参与者接收到执行了事务,有的参与者没有接收到事务,就没有执行事务,多个参与者之间是不一致的

上面所有的这些问题,都是需要人工干预处理,没有自动化的解决方案,因此两阶段提交协议在正常情况下能保证系统的强一致性,但是在出现异常情况下,当前处理的操作处于错误状态,需要管理员人工干预解决,因此可用性不够好,这也符合CAP协议的一致性和可用性不能兼得的原理。

2. 三阶段提交协议

三阶段提交协议是两阶段提交协议的改进版本。它通过超时机制解决了阻塞的问题,并且把两个阶段增加为三个阶段:

  1. 询问阶段:协调者询问参与者是否可以完成指令,协调者只需要回答是还是不是,而不需要做真正的操作,这个阶段超时导致中止
  2. 准备阶段:如果在询问阶段所有的参与者都返回可以执行操作,协调者向参与者发送预执行请求,然后参与者写redo和undo日志,执行操作,但是不提交操作;如果在询问阶段任何参与者返回不能执行操作的结果,则协调者向参与者发送中止请求,这里的逻辑与两阶段提交协议的的准备阶段是相似的,这个阶段超时导致成功
  3. 提交阶段:如果每个参与者在准备阶段返回准备成功,也就是预留资源和执行操作成功,协调者向参与者发起提交指令,参与者提交资源变更的事务,释放锁定的资源;如果任何一个参与者返回准备失败,也就是预留资源或者执行操作失败,协调者向参与者发起中止指令,参与者取消已经变更的事务,执行undo日志,释放锁定的资源,这里的逻辑与两阶段提交协议的提交阶段一致

三阶段提交协议成功场景示意图如下:

三阶段提交协议

然而,这里与两阶段提交协议有两个主要的不同:

  1. 增加了一个询问阶段,询问阶段可以确保尽可能早的发现无法执行操作而需要中止的行为,但是它并不能发现所有的这种行为,只会减少这种情况的发生
  2. 在准备阶段以后,协调者和参与者执行的任务中都增加了超时,一旦超时,协调者和参与者都继续提交事务,默认为成功,这也是根据概率统计上超时后默认成功的正确性最大

三阶段提交协议与两阶段提交协议相比,具有如上的优点,但是一旦发生超时,系统仍然会发生不一致,只不过这种情况很少见罢了,好处就是至少不会阻塞和永远锁定资源。

3. TCC

上面两节讲解了两阶段提交协议和三阶段提交协议,实际上他们能解决案例2-转账案例3-下订单和扣库存中的分布式事务的问题,但是遇到极端情况,系统会发生阻塞或者不一致的问题,需要运营或者技术人工解决。无论两阶段还是三阶段方案中都包含多个参与者、多个阶段实现一个事务,实现复杂,性能也是一个很大的问题,因此,在互联网高并发系统中,鲜有使用两阶段提交和三阶段提交协议的场景。

阿里巴巴提出了新的TCC协议,TCC协议将一个任务拆分成Try、Confirm、Cancel,正常的流程会先执行Try,如果执行没有问题,再执行Confirm,如果执行过程中出了问题,则执行操作的逆操Cancel,从正常的流程上讲,这仍然是一个两阶段的提交协议,但是,在执行出现问题的时候,有一定的自我修复能力,如果任何一个参与者出现了问题,协调者通过执行操作的逆操作来取消之前的操作,达到最终的一致状态。

可以看出,从时序上,如果遇到极端情况下TCC会有很多问题的,例如,如果在Cancel的时候一些参与者收到指令,而一些参与者没有收到指令,整个系统仍然是不一致的,这种复杂的情况,系统首先会通过补偿的方式,尝试自动修复的,如果系统无法修复,必须由人工参与解决。

从TCC的逻辑上看,可以说TCC是简化版的三阶段提交协议,解决了两阶段提交协议的阻塞问题,但是没有解决极端情况下会出现不一致和脑裂的问题。然而,TCC通过自动化补偿手段,会把需要人工处理的不一致情况降到到最少,也是一种非常有用的解决方案,根据线人,阿里在内部的一些中间件上实现了TCC模式。

我们给出一个使用TCC的实际案例,在秒杀的场景,用户发起下单请求,应用层先查询库存,确认商品库存还有余量,则锁定库存,此时订单状态为待支付,然后指引用户去支付,由于某种原因用户支付失败,或者支付超时,系统会自动将锁定的库存解锁供其他用户秒杀。

TCC协议使用场景示意图如下:

TCC

总结一下,两阶段提交协议、三阶段提交协议、TCC协议都能保证分布式事务的一致性,他们保证的分布式系统的一致性从强到弱,TCC达到的目标是最终一致性,其中任何一种方法都可以不同程度的解决案例2:转账、案例3:下订单和扣库存的问题,只是实现的一致性的级别不一样而已,对于案例4:同步超时可以通过TCC的理念解决,如果同步调用超时,调用方可以使用fastfail策略,返回调用方的使用方失败的结果,同时调用服务的逆向cancel操作,保证服务的最终一致性。

3.4 保证最终一致性的模式

在大规模高并发服务化系统中,一个功能被拆分成多个具有单一功能的元功能,一个流程会有多个系统的多个元功能组合实现,如果使用两阶段提交协议和三阶段提交协议,确实能解决系统间一致性问题,除了这两个协议带来的自身的问题,这些协议的实现比较复杂、成本比较高,最重要的是性能并不好,相比来看,TCC协议更简单、容易实现,但是TCC协议由于每个事务都需要执行Try,再执行Confirm,略微显得臃肿,因此,在现实的系统中,底线要求仅仅需要能达到最终一致性,而不需要实现专业的、复杂的一致性协议,实现最终一致性有一些非常有效的、简单粗暴的模式,下面就介绍这些模式及其应用场景。

1. 查询模式

任何一个服务操作都需要提供一个查询接口,用来向外部输出操作执行的状态。服务操作的使用方可以通过查询接口,得知服务操作执行的状态,然后根据不同状态来做不同的处理操作。

为了能够实现查询,每个服务操作都需要有唯一的流水号标识,也可使用此次服务操作对应的资源ID来标志,例如:请求流水号、订单号等。

首先,单笔查询操作是必须提供的,我们也鼓励使用单笔订单查询,这是因为每次调用需要占用的负载是可控的,批量查询则根据需要来提供,如果使用了批量查询,需要有合理的分页机制,并且必须限制分页的大小,以及对批量查询的QPS需要有容量评估和流控等。

查询模式的示意图如下:

查询模式

对于案例4:同步超时、案例5:异步回调超时、案例6:掉单、案例7:系统间状态不一致,我们都需要使用查询模式来了解被调用服务的处理情况,来决定下一步做什么:补偿未完成的操作还是回滚已经完成的操作。

2. 补偿模式

有了上面的查询模式,在任何情况下,我们都能得知具体的操作所处的状态,如果整个操作处于不正常的状态,我们需要修正操作中有问题的子操作,这可能需要重新执行未完成的子操作,后者取消已经完成的子操作,通过修复使整个分布式系统达到一致,为了让系统最终一致而做的努力都叫做补偿。

对于服务化系统中同步调用的操作,业务操作发起的主动方在还没有得到业务操作执行方的明确返回或者调用超时,场景可参考案例4:同步超时,这个时候业务发起的主动方需要及时的调用业务执行方获得操作执行的状态,这里使用查询模式,获得业务操作的执行方的状态后,如果业务执行方已经完预设的工作,则业务发起方给业务的使用方返回成功,如果业务操作的执行方的状态为失败或者未知,则会立即告诉业务的使用方失败,然后调用业务操作的逆向操作,保证操作不被执行或者回滚已经执行的操作,让业务的使用方、业务发起的主动方、业务的操作方最终达成一致的状态。

补偿模式的示意图如下:

补偿模式

补偿操作根据发起形式分为:

  1. 自动恢复:程序根据发生不一致的环境,通过继续未完成的操作,或者回滚已经完成的操作,自动来达到一致
  2. 通知运营:如果程序无法自动恢复,并且设计时考虑到了不一致的场景,可以提供运营功能,通过运营手工进行补偿
  3. 通知技术:如果很不巧,系统无法自动回复,又没有运营功能,那必须通过技术手段来解决,技术手段包括走数据库变更或者代码变更来解决,这是最糟的一种场景

3. 异步确保模式

异步确保模式是补偿模式的一个典型案例,经常应用到使用方对响应时间要求并不太高,我们通常把这类操作从主流程中摘除,通过异步的方式进行处理,处理后把结果通过通知系统通知给使用方,这个方案最大的好处能够对高并发流量进行消峰,例如:电商系统中的物流、配送,以及支付系统中的计费、入账等。

实践中,将要执行的异步操作封装后持久入库,然后通过定时捞取未完成的任务进行补偿操作来实现异步确保模式,只要定时系统足够健壮,任何一个任务最终会被成功执行。

异步确保模式的示意图如下:

异步确保模式

对于案例5:异步回调超时,使用的就是异步确保模式,这种情况下对于某个操作,如果迟迟没有收到响应,我们通过查询模式和补偿模式来继续未完成的操作。

4. 定期校对模式

既然我们在系统中实现最终一致性,系统在没有达到一致之前,系统间的状态是不一致的,甚至是混乱的,需要补偿操作来达到一致的目的,但是我们如何来发现需要补偿的操作呢?

在操作的主流程中的系统间执行校对操作,我们可以事后异步的批量校对操作的状态,如果发现不一致的操作,则进行补偿,补偿操作与补偿模式中的补偿操作是一致的。

另外,实现定期校对的一个关键就是分布式系统中需要有一个自始至终唯一的ID,ID的生成请参考SnowFlake

在分布式系统中,全局唯一ID的示意图如下:

唯一ID

一般情况下,生成全局唯一ID有两种方法:

  1. 持久型:使用数据库表自增字段或者Sequence生成,为了提高效率,每个应用节点可以缓存一批次的ID,如果机器重启可能会损失一部分ID,但是这并不会产生任何问题
  2. 时间型:一般由机器号、业务号、时间、单节点内自增ID组成,由于时间一般精确到秒或者毫秒,因此不需要持久就能保证在分布式系统中全局唯一、粗略递增能特点

实践中,为了能在分布式系统中迅速的定位问题,一般的分布式系统都有技术支持系统,它能够跟踪一个请求的调用链,调用链是在二维的维度跟踪一个调用请求,最后形成一个调用树,原理可参考谷歌的论文Dapper, a Large-Scale Distributed Systems Tracing Infrastructure,一个开源的参考实现为pinpoint

在分布式系统中,调用链的示意图如下:

调用链

全局的唯一流水ID可以把一个请求在分布式系统中的流转的路径聚合,而调用链中的spanid可以把聚合的请求路径通过树形结构进行展示,让技术支持人员轻松的发现系统出现的问题,能够快速定位出现问题的服务节点,提高应急效率。

关于订单跟踪、调用链跟踪、业务链跟踪,我们会在后续文章中详细介绍。

在分布式系统中构建了唯一ID,调用链等基础设施,我们很容易对系统间的不一致进行核对,通常我们需要构建第三方的定期核对系统,以第三方的角度来监控服务执行的健康程度。

定期核对系统示意图如下:

定期核对模式

对于案例6:掉单、案例7:系统间状态不一致通常通过定期校对模式发现问题,并通过补偿模式来修复,最后完成系统间的最终一致性。

定期校对模式多应用在金融系统,金融系统由于涉及到资金安全,需要保证百分之百的准确性,所以,需要多重的一致性保证机制,包括:系统间的一致性对账、现金对账、账务对账、手续费对账等等,这些都属于定期校对模式,顺便说一下,金融系统与社交应用在技术上本质的区别在于社交应用在于量大,而金融系统在于数据的准确性。

到现在为止,我们看到通过查询模式、补偿模式、定期核对模式可以解决案例4到案例7的所有问题,对于案例4:同步超时,如果同步超时,我们需要查询状态进行补偿,对于案例5:异步回调超时,如果迟迟没有收到回调响应,我们也会通过查询状态进行补偿,对于案例6:掉单、案例7:系统间状态不一致,我们通过定期核对模式可以保证系统间操作的一致性,避免掉单和状态不一致导致问题。

5. 可靠消息模式

在分布式系统中,对于主流程中优先级比较低的操作,大多采用异步的方式执行,也就是前面提到的异步确保型,为了让异步操作的调用方和被调用方充分的解耦,也由于专业的消息队列本身具有可伸缩、可分片、可持久等功能,我们通常通过消息队列实现异步化,对于消息队列,我们需要建立特殊的设施保证可靠的消息发送以及处理机的幂等等。

消息的可靠发送

消息的可靠发送可以认为是尽最大努力发送消息通知,有两种实现方法:

第一种,发送消息之前,把消息持久到数据库,状态标记为待发送,然后发送消息,如果发送成功,将消息改为发送成功。定时任务定时从数据库捞取一定时间内未发送的消息,将消息发送。

消息发送模式1

第二种,实现方式与第一种类似,不同的是持久消息的数据库是独立的,并不耦合在业务系统中。发送消息之前,先发送一个预消息给某一个第三方的消息管理器,消息管理器将其持久到数据库,并标记状态为待发送,发送成功后,标记消息为发送成功。定时任务定时从数据库捞取一定时间内未发送的消息,回查业务系统是否要继续发送,根据查询结果来确定消息的状态。

消息发送模式2

一些公司把消息的可靠发送实现在了中间件里,通过Spring的注入,在消息发送的时候自动持久消息记录,如果有消息记录没有发送成功,定时会补偿发送。

消息处理器的幂等性

如果我们要保证消息可靠的发送,简单来说,要保证消息一定要发送出去,那么就需要有重试机制,有了重试机制,消息一定会重复,那么我们需要对重复做处理。

处理重复的最佳方式为保证操作的幂等性,幂等性的数学公式为:

f(f(x)) = f(x)

保证操作的幂等性常用的几个方法:

  1. 使用数据库表的唯一键进行滤重,拒绝重复的请求
  2. 使用分布式表对请求进行滤重
  3. 使用状态流转的方向性来滤重,通常使用行级锁来实现(后续在锁相关的文章中详细说明)
  4. 根据业务的特点,操作本身就是幂等的,例如:删除一个资源、增加一个资源、获得一个资源等

6. 缓存一致性模型

大规模高并发系统中一个常见的核心需求就是亿级的读需求,显然,关系型数据库并不是解决高并发读需求的最佳方案,互联网的经典做法就是使用缓存抗读需求,下面有一些使用缓存的保证一致性的最佳实践:

  1. 如果性能要求不是非常的高,尽量使用分布式缓存,而不要使用本地缓存
  2. 种缓存的时候一定种完全,如果缓存数据的一部分有效,一部分无效,宁可放弃种缓存,也不要把部分数据种入缓存
  3. 数据库与缓存只需要保持弱一致性,而不需要强一致性,读的顺序要先缓存,后数据库,写的顺序要先数据库,后缓存

这里的最佳实践能够解决案例8:缓存和数据库不一致、案例9:本地缓存节点间不一致、案例10:缓存数据结构不一致的问题,对于数据存储层、缓存与数据库、Nosql等的一致性是更深入的存储一致性技术,将会在后续文章单独介绍,这里的数据一致性主要是处理应用层与缓存、应用层与数据库、一部分的缓存与数据库的一致性。

3.5 专题模式

这一节介绍特殊场景下的一致性问题和解决方案。

迁移开关的设计

在大多数企业里,新项目和老项目一般会共存,大家都在努力的下掉老项目,但是由于种种原因总是下不掉,如果要彻底的下掉老项目,就必须要有非常完善的迁移方案,迁移是一项非常复杂而艰巨的任务,我会在将来的文章中详细探讨迁移方案、流程和技术,这里我们只对迁移中使用的开关进行描述。

迁移过程必须使用开关,开关一般都会基于多个维度来设计,例如:全局的、用户的、角色的、商户的、产品的等等,如果迁移过程中遇到问题,我们需要关闭开关,迁移回老的系统,这需要我们的新系统兼容老的数据,老的系统也兼容新的数据,从某种意义上来讲,迁移比实现新系统更加困难。

曾经看过很多简单的开关设计,有的开关设计在应用层次,通过一个curl语句调用,没有权限控制,这样的开关在服务池的每个节点都是不同步的、不一致的;还有的系统把开关配置放在中心化的配置系统、数据库或者缓存等,处理的每个请求都通过统一的开关来判断是否迁移等等,这样的开关有一个致命的缺点,服务请求在处理过程中,开关可能会变化,各个节点之间开关可能不同步、不一致,导致重复的请求可能走到新的逻辑又走了老的逻辑,如果新的逻辑和老的逻辑没有保证幂等性,这个请求就被重复处理了,如果是金融行业的应用,可能会导致资金损失,电商系统可能会导致发货并退款等问题。

这里面我们推荐使用订单开关,不管我们在什么维度上设计了开关,接收到服务请求后,我们在请求创建的关联实体(例如:订单)上标记开关,以后的任何处理流程,包括同步的和异步的处理流程,都通过订单上的开关来判断,而不是通过全局的或者基于配置的开关,这样在订单创建的时候,开关已经确定,不再变更,一旦一份数据不再发生变化,那么它永远是线程安全的,并且不会有不一致的问题。

这个模式在生产中使用比较频繁,建议每个企业都把这个模式作为设计评审的一项,如果不检查这一项,很多开发童鞋都会偷懒,直接在配置中或者数据库中做个开关就上线了。

4 总结

本文从一致性问题的实践出发,从大规模高并发服务化系统的实践经验中进行总结,列举导致不一致的具体问题,围绕着具体问题,总结出解决不一致的方法,并且抽象成模式,供大家在开发服务化系统的过程中参考。

另外,由于篇幅有限,还有一些关于分布式一致性的技术无法在一篇文章中与大家分享,包括:paxos算法、raft算法、zab算法、nwr算法、一致性哈希等,我会在后续文章中详细介绍。

5 反馈与建议

from:http://www.jianshu.com/p/1156151e20c8

保证分布式系统数据一致性的6种方案

编者按:本文由「高可用架构后花园」群讨论整理而成,后花园是一个面向架构师的增值服务,如需了解,请关注「高可用架构」后回复 VIP

有人的地方,就有江湖

有江湖的地方,就有纷争

问题的起源

在电商等业务中,系统一般由多个独立的服务组成,如何解决分布式调用时候数据的一致性?

具体业务场景如下,比如一个业务操作,如果同时调用服务 A、B、C,需要满足要么同时成功;要么同时失败。A、B、C 可能是多个不同部门开发、部署在不同服务器上的远程服务。

在分布式系统来说,如果不想牺牲一致性,CAP 理论告诉我们只能放弃可用性,这显然不能接受。为了便于讨论问题,先简单介绍下数据一致性的基础理论。

强一致
当更新操作完成之后,任何多个后续进程或者线程的访问都会返回最新的更新过的值。这种是对用户最友好的,就是用户上一次写什么,下一次就保证能读到什么。根据 CAP 理论,这种实现需要牺牲可用性。
弱一致性
系统并不保证续进程或者线程的访问都会返回最新的更新过的值。系统在数据写入成功之后,不承诺立即可以读到最新写入的值,也不会具体的承诺多久之后可以读到。
最终一致性
弱一致性的特定形式。系统保证在没有后续更新的前提下,系统最终返回上一次更新操作的值。在没有故障发生的前提下,不一致窗口的时间主要受通信延迟,系统负载和复制副本的个数影响。DNS 是一个典型的最终一致性系统。
在工程实践上,为了保障系统的可用性,互联网系统大多将强一致性需求转换成最终一致性的需求,并通过系统执行幂等性的保证,保证数据的最终一致性。但在电商等场景中,对于数据一致性的解决方法和常见的互联网系统(如 MySQL 主从同步)又有一定区别,群友的讨论分成以下 6 种解决方案。

1. 规避分布式事务——业务整合

业务整合方案主要采用将接口整合到本地执行的方法。拿问题场景来说,则可以将服务 A、B、C 整合为一个服务 D 给业务,这个服务 D 再通过转换为本地事务的方式,比如服务 D 包含本地服务和服务 E,而服务 E 是本地服务 A ~ C 的整合。

优点:解决(规避)了分布式事务。

缺点:显而易见,把本来规划拆分好的业务,又耦合到了一起,业务职责不清晰,不利于维护。

由于这个方法存在明显缺点,通常不建议使用。

2. 经典方案 – eBay 模式

此方案的核心是将需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到本地文本、数据库或消息队列,再通过业务规则自动或人工发起重试。人工重试更多的是应用于支付场景,通过对账系统对事后问题的处理。

消息日志方案的核心是保证服务接口的幂等性。

考虑到网络通讯失败、数据丢包等原因,如果接口不能保证幂等性,数据的唯一性将很难保证。

eBay 方式的主要思路如下。

Base:一种 Acid 的替代方案

此方案是 eBay 的架构师 Dan Pritchett 在 2008 年发表给 ACM 的文章,是一篇解释 BASE 原则,或者说最终一致性的经典文章。文中讨论了 BASE 与 ACID 原则在保证数据一致性的基本差异。

如果 ACID 为分区的数据库提供一致性的选择,那么如何实现可用性呢?答案是

BASE (basically available, soft state, eventually consistent)

BASE 的可用性是通过支持局部故障而不是系统全局故障来实现的。下面是一个简单的例子:如果将用户分区在 5 个数据库服务器上,BASE 设计鼓励类似的处理方式,一个用户数据库的故障只影响这台特定主机那 20% 的用户。这里不涉及任何魔法,不过它确实可以带来更高的可感知的系统可用性。

文章中描述了一个最常见的场景,如果产生了一笔交易,需要在交易表增加记录,同时还要修改用户表的金额。这两个表属于不同的远程服务,所以就涉及到分布式事务一致性的问题。

文中提出了一个经典的解决方法,将主要修改操作以及更新用户表的消息放在一个本地事务来完成。同时为了避免重复消费用户表消息带来的问题,达到多次重试的幂等性,增加一个更新记录表 updates_applied来记录已经处理过的消息。

5ad0007faf14d9018dc

系统的执行伪代码如下

5bd0002b66b92973061

(点击可全屏缩放图片)

基于以上方法,在第一阶段,通过本地的数据库的事务保障,增加了 transaction 表及消息队列 。

在第二阶段,分别读出消息队列(但不删除),通过判断更新记录表 updates_applied 来检测相关记录是否被执行,未被执行的记录会修改 user 表,然后增加一条操作记录到 updates_applied,事务执行成功之后再删除队列。

通过以上方法,达到了分布式系统的最终一致性。进一步了解 eBay 的方案可以参考文末链接。

3. 去哪儿网分布式事务方案

随着业务规模不断地扩大,电商网站一般都要面临拆分之路。就是将原来一个单体应用拆分成多个不同职责的子系统。比如以前可能将面向用户、客户和运营的功能都放在一个系统里,现在拆分为订单中心、代理商管理、运营系统、报价中心、库存管理等多个子系统。

拆分首先要面临的是什么呢?

最开始的单体应用所有功能都在一起,存储也在一起。比如运营要取消某个订单,那直接去更新订单表状态,然后更新库存表就 ok 了。因为是单体应用,库在一起,这些都可以在一个事务里,由关系数据库来保证一致性。

但拆分之后就不同了,不同的子系统都有自己的存储。比如订单中心就只管理自己的订单库,而库存管理也有自己的库。那么运营系统取消订单的时候就是通过接口调用等方式来调用订单中心和库存管理的服务了,而不是直接去操作库。这就涉及一个『分布式事务』的问题。

5bd0002b66dc70ed910

分布式事务有两种解决方式

1. 优先使用异步消息。

上文已经说过,使用异步消息 Consumer 端需要实现幂等。

幂等有两种方式,一种方式是业务逻辑保证幂等。比如接到支付成功的消息订单状态变成支付完成,如果当前状态是支付完成,则再收到一个支付成功的消息则说明消息重复了,直接作为消息成功处理。

另外一种方式如果业务逻辑无法保证幂等,则要增加一个去重表或者类似的实现。对于 producer 端在业务数据库的同实例上放一个消息库,发消息和业务操作在同一个本地事务里。发消息的时候消息并不立即发出,而是向消息库插入一条消息记录,然后在事务提交的时候再异步将消息发出,发送消息如果成功则将消息库里的消息删除,如果遇到消息队列服务异常或网络问题,消息没有成功发出那么消息就留在这里了,会有另外一个服务不断地将这些消息扫出重新发送。

2. 有的业务不适合异步消息的方式,事务的各个参与方都需要同步的得到结果。这种情况的实现方式其实和上面类似,每个参与方的本地业务库的同实例上面放一个事务记录库。

比如 A 同步调用 B,C。A 本地事务成功的时候更新本地事务记录状态,B 和 C 同样。如果有一次 A 调用 B 失败了,这个失败可能是 B 真的失败了,也可能是调用超时,实际 B 成功。则由一个中心服务对比三方的事务记录表,做一个最终决定。假设现在三方的事务记录是 A 成功,B 失败,C 成功。那么最终决定有两种方式,根据具体场景:

  1. 重试 B,直到 B 成功,事务记录表里记录了各项调用参数等信息;
  2. 执行 A 和 B 的补偿操作(一种可行的补偿方式是回滚)。

对 b 场景做一个特殊说明:比如 B 是扣库存服务,在第一次调用的时候因为某种原因失败了,但是重试的时候库存已经变为 0,无法重试成功,这个时候只有回滚 A 和 C 了。

那么可能有人觉得在业务库的同实例里放消息库或事务记录库,会对业务侵入,业务还要关心这个库,是否一个合理的设计?

实际上可以依靠运维的手段来简化开发的侵入,我们的方法是让 DBA 在公司所有 MySQL 实例上预初始化这个库,通过框架层(消息的客户端或事务 RPC 框架)透明的在背后操作这个库,业务开发人员只需要关心自己的业务逻辑,不需要直接访问这个库。

总结起来,其实两种方式的根本原理是类似的,也就是将分布式事务转换为多个本地事务,然后依靠重试等方式达到最终一致性

4. 蘑菇街交易创建过程中的分布式一致性方案

交易创建的一般性流程

我们把交易创建流程抽象出一系列可扩展的功能点,每个功能点都可以有多个实现(具体的实现之间有组合/互斥关系)。把各个功能点按照一定流程串起来,就完成了交易创建的过程。

5ad0007faf4df6af7f3

面临的问题

每个功能点的实现都可能会依赖外部服务。那么如何保证各个服务之间的数据是一致的呢?比如锁定优惠券服务调用超时了,不能确定到底有没有锁券成功,该如何处理?再比如锁券成功了,但是扣减库存失败了,该如何处理?

方案选型

服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。

所以在拆分大的流程为多个小的本地事务的前提下,对于非实时、非强一致性的关联业务写入,在本地事务执行成功后,我们选择发消息通知、关联事务异步化执行的方案。

消息通知往往不能保证 100% 成功;且消息通知后,接收方业务是否能执行成功还是未知数。前者问题可以通过重试解决;后者可以选用事务消息来保证。

但是事务消息框架本身会给业务代码带来侵入性和复杂性,所以我们选择基于 DB 事件变化通知到 MQ 的方式做系统间解耦,通过订阅方消费 MQ 消息时的 ACK 机制,保证消息一定消费成功,达到最终一致性。由于消息可能会被重发,消息订阅方业务逻辑处理要做好幂等保证。

所以目前只剩下需要实时同步做、有强一致性要求的业务场景了。在交易创建过程中,锁券和扣减库存是这样的两个典型场景。

要保证多个系统间数据一致,乍一看,必须要引入分布式事务框架才能解决。但引入非常重的类似二阶段提交分布式事务框架会带来复杂性的急剧上升;在电商领域,绝对的强一致是过于理想化的,我们可以选择准实时的最终一致性。

我们在交易创建流程中,首先创建一个不可见订单,然后在同步调用锁券和扣减库存时,针对调用异常(失败或者超时),发出废单消息到MQ。如果消息发送失败,本地会做时间阶梯式的异步重试;优惠券系统和库存系统收到消息后,会进行判断是否需要做业务回滚,这样就准实时地保证了多个本地事务的最终一致性。

5b30008086e9f8d9583

5. 支付宝及蚂蚁金融云的分布式服务 DTS 方案

业界常用的还有支付宝的一种 xts 方案,由支付宝在 2PC 的基础上改进而来。主要思路如下,大部分信息引用自官方网站。

分布式事务服务简介

分布式事务服务 (Distributed Transaction Service, DTS) 是一个分布式事务框架,用来保障在大规模分布式环境下事务的最终一致性。DTS 从架构上分为 xts-client 和 xts-server 两部分,前者是一个嵌入客户端应用的 JAR 包,主要负责事务数据的写入和处理;后者是一个独立的系统,主要负责异常事务的恢复。

核心特性

传统关系型数据库的事务模型必须遵守 ACID 原则。在单数据库模式下,ACID 模型能有效保障数据的完整性,但是在大规模分布式环境下,一个业务往往会跨越多个数据库,如何保证这多个数据库之间的数据一致性,需要其他行之有效的策略。在 JavaEE 规范中使用 2PC (2 Phase Commit, 两阶段提交) 来处理跨 DB 环境下的事务问题,但是 2PC 是反可伸缩模式,也就是说,在事务处理过程中,参与者需要一直持有资源直到整个分布式事务结束。这样,当业务规模达到千万级以上时,2PC 的局限性就越来越明显,系统可伸缩性会变得很差。基于此,我们采用 BASE 的思想实现了一套类似 2PC 的分布式事务方案,这就是 DTS。DTS在充分保障分布式环境下高可用性、高可靠性的同时兼顾数据一致性的要求,其最大的特点是保证数据最终一致 (Eventually consistent)。

简单的说,DTS 框架有如下特性:

  • 最终一致:事务处理过程中,会有短暂不一致的情况,但通过恢复系统,可以让事务的数据达到最终一致的目标。
  • 协议简单:DTS 定义了类似 2PC 的标准两阶段接口,业务系统只需要实现对应的接口就可以使用 DTS 的事务功能。
  • 与 RPC 服务协议无关:在 SOA 架构下,一个或多个 DB 操作往往被包装成一个一个的 Service,Service 与 Service 之间通过 RPC 协议通信。DTS 框架构建在 SOA 架构上,与底层协议无关。
  • 与底层事务实现无关: DTS 是一个抽象的基于 Service 层的概念,与底层事务实现无关,也就是说在 DTS 的范围内,无论是关系型数据库 MySQL,Oracle,还是 KV 存储 MemCache,或者列存数据库 HBase,只要将对其的操作包装成 DTS 的参与者,就可以接入到 DTS 事务范围内。

以下是分布式事务框架的流程图

5c000029fd4a5102725

实现

  1. 一个完整的业务活动由一个主业务服务与若干从业务服务组成。
  2. 主业务服务负责发起并完成整个业务活动。
  3. 从业务服务提供 TCC 型业务操作。
  4. 业务活动管理器控制业务活动的一致性,它登记业务活动中的操作,并在活动提交时确认所有的两阶段事务的 confirm 操作,在业务活动取消时调用所有两阶段事务的 cancel 操作。”

与 2PC 协议比较

  1. 没有单独的 Prepare 阶段,降低协议成本
  2. 系统故障容忍度高,恢复简单

6. 农信网数据一致性方案

1. 电商业务

公司的支付部门,通过接入其它第三方支付系统来提供支付服务给业务部门,支付服务是一个基于 Dubbo 的 RPC 服务。

对于业务部门来说,电商部门的订单支付,需要调用

  1. 支付平台的支付接口来处理订单;
  2. 同时需要调用积分中心的接口,按照业务规则,给用户增加积分。

从业务规则上需要同时保证业务数据的实时性和一致性,也就是支付成功必须加积分。

我们采用的方式是同步调用,首先处理本地事务业务。考虑到积分业务比较单一且业务影响低于支付,由积分平台提供增加与回撤接口。

具体的流程是先调用积分平台增加用户积分,再调用支付平台进行支付处理,如果处理失败,catch 方法调用积分平台的回撤方法,将本次处理的积分订单回撤。

5bd0002b671c5f212f4

2. 用户信息变更

公司的用户信息,统一由用户中心维护,而用户信息的变更需要同步给各业务子系统,业务子系统再根据变更内容,处理各自业务。用户中心作为 MQ 的 producer,添加通知给 MQ。APP Server 订阅该消息,同步本地数据信息,再处理相关业务比如 APP 退出下线等。

我们采用异步消息通知机制,目前主要使用 ActiveMQ,基于 Virtual Topic 的订阅方式,保证单个业务集群订阅的单次消费。

5ad0007faf7a576a640

总结

分布式服务对衍生的配套系统要求比较多,特别是我们基于消息、日志的最终一致性方案,需要考虑消息的积压、消费情况、监控、报警等。

参考资料

  • Base: An Acid Alternative (eBay 方案)

In partitioned databases, trading some consistency for availability can lead to dramatic improvements in scalability.

英文版 : http://queue.acm.org/detail.cfm?id=1394128 

中文版: http://article.yeeyan.org/view/167444/125572

 

感谢李玉福、余昭辉、蘑菇街七公提供方案,其他多位群成员对本文内容亦有贡献。对于上述方案表述不完善之处,欢迎读者留言指出。

from:http://www.cnblogs.com/soundcode/p/5590710.html

一致性Hash算法(Java实现)

一致性Hash算法

关于一致性Hash算法,在我之前的博文中已经有多次提到了,MemCache超详细解读一文中”一致性Hash算法”部分,对于为什么要使用一致性Hash算法、一致性Hash算法的算法原理做了详细的解读。

算法的具体原理这里再次贴上:

先构造一个长度为232的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 232-1])将服务器节点放置在这个Hash环上,然后根据数据的Key值计算得到其Hash值(其分布也为[0, 232-1]),接着在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。

这种算法解决了普通余数Hash算法伸缩性差的问题,可以保证在上线、下线服务器的情况下尽量有多的请求命中原来路由到的服务器。

当然,万事不可能十全十美,一致性Hash算法比普通的余数Hash算法更具有伸缩性,但是同时其算法实现也更为复杂,本文就来研究一下,如何利用Java代码实现一致性Hash算法。在开始之前,先对一致性Hash算法中的几个核心问题进行一些探究。

 

数据结构的选取

一致性Hash算法最先要考虑的一个问题是:构造出一个长度为232的整数环,根据节点名称的Hash值将服务器节点放置在这个Hash环上。

那么,整数环应该使用何种数据结构,才能使得运行时的时间复杂度最低?首先说明一点,关于时间复杂度,常见的时间复杂度与时间效率的关系有如下的经验规则:

O(1) < O(log2N) < O(n) < O(N * log2N) < O(N2) < O(N3) < 2N < 3N < N!

一般来说,前四个效率比较高,中间两个差强人意,后三个比较差(只要N比较大,这个算法就动不了了)。OK,继续前面的话题,应该如何选取数据结构,我认为有以下几种可行的解决方案。

1、解决方案一:排序+List

我想到的第一种思路是:算出所有待加入数据结构的节点名称的Hash值放入一个数组中,然后使用某种排序算法将其从小到大进行排序,最后将排序后的数据放入List中,采用List而不是数组是为了结点的扩展考虑。

之后,待路由的结点,只需要在List中找到第一个Hash值比它大的服务器节点就可以了,比如服务器节点的Hash值是[0,2,4,6,8,10],带路由的结点是7,只需要找到第一个比7大的整数,也就是8,就是我们最终需要路由过去的服务器节点。

如果暂时不考虑前面的排序,那么这种解决方案的时间复杂度:

(1)最好的情况是第一次就找到,时间复杂度为O(1)

(2)最坏的情况是最后一次才找到,时间复杂度为O(N)

平均下来时间复杂度为O(0.5N+0.5),忽略首项系数和常数,时间复杂度为O(N)。

但是如果考虑到之前的排序,我在网上找了张图,提供了各种排序算法的时间复杂度:

看得出来,排序算法要么稳定但是时间复杂度高、要么时间复杂度低但不稳定,看起来最好的归并排序法的时间复杂度仍然有O(N * logN),稍微耗费性能了一些。

2、解决方案二:遍历+List

既然排序操作比较耗性能,那么能不能不排序?可以的,所以进一步的,有了第二种解决方案。

解决方案使用List不变,不过可以采用遍历的方式:

(1)服务器节点不排序,其Hash值全部直接放入一个List中

(2)带路由的节点,算出其Hash值,由于指明了”顺时针”,因此遍历List,比待路由的节点Hash值大的算出差值并记录,比待路由节点Hash值小的忽略

(3)算出所有的差值之后,最小的那个,就是最终需要路由过去的节点

在这个算法中,看一下时间复杂度:

1、最好情况是只有一个服务器节点的Hash值大于带路由结点的Hash值,其时间复杂度是O(N)+O(1)=O(N+1),忽略常数项,即O(N)

2、最坏情况是所有服务器节点的Hash值都大于带路由结点的Hash值,其时间复杂度是O(N)+O(N)=O(2N),忽略首项系数,即O(N)

所以,总的时间复杂度就是O(N)。其实算法还能更改进一些:给一个位置变量X,如果新的差值比原差值小,X替换为新的位置,否则X不变。这样遍历就减少了一轮,不过经过改进后的算法时间复杂度仍为O(N)。

总而言之,这个解决方案和解决方案一相比,总体来看,似乎更好了一些。

3、解决方案三:二叉查找树

抛开List这种数据结构,另一种数据结构则是使用二叉查找树。对于树不是很清楚的朋友可以简单看一下这篇文章树形结构

当然我们不能简单地使用二叉查找树,因为可能出现不平衡的情况。平衡二叉查找树有AVL树、红黑树等,这里使用红黑树,选用红黑树的原因有两点:

1、红黑树主要的作用是用于存储有序的数据,这其实和第一种解决方案的思路又不谋而合了,但是它的效率非常高

2、JDK里面提供了红黑树的代码实现TreeMap和TreeSet

另外,以TreeMap为例,TreeMap本身提供了一个tailMap(K fromKey)方法,支持从红黑树中查找比fromKey大的值的集合,但并不需要遍历整个数据结构。

使用红黑树,可以使得查找的时间复杂度降低为O(logN),比上面两种解决方案,效率大大提升。

为了验证这个说法,我做了一次测试,从大量数据中查找第一个大于其中间值的那个数据,比如10000数据就找第一个大于5000的数据(模拟平均的情况)。看一下O(N)时间复杂度和O(logN)时间复杂度运行效率的对比:

50000 100000 500000 1000000 4000000
ArrayList 1ms 1ms 4ms 4ms 5ms
LinkedList 4ms 7ms 11ms 13ms 17ms
TreeMap 0ms 0ms 0ms 0ms 0ms

因为再大就内存溢出了,所以只测试到4000000数据。可以看到,数据查找的效率,TreeMap是完胜的,其实再增大数据测试也是一样的,红黑树的数据结构决定了任何一个大于N的最小数据,它都只需要几次至几十次查找就可以查到。

当然,明确一点,有利必有弊,根据我另外一次测试得到的结论是,为了维护红黑树,数据插入效率TreeMap在三种数据结构里面是最差的,且插入要慢上5~10倍

 

Hash值重新计算

服务器节点我们肯定用字符串来表示,比如”192.168.1.1″、”192.168.1.2″,根据字符串得到其Hash值,那么另外一个重要的问题就是Hash值要重新计算,这个问题是我在测试String的hashCode()方法的时候发现的,不妨来看一下为什么要重新计算Hash值:

/**
 * String的hashCode()方法运算结果查看
 * @author 五月的仓颉 http://www.cnblogs.com/xrq730/
 *
 */
public class StringHashCodeTest
{
    public static void main(String[] args)
    {
        System.out.println("192.168.0.0:111的哈希值:" + "192.168.0.0:1111".hashCode());
        System.out.println("192.168.0.1:111的哈希值:" + "192.168.0.1:1111".hashCode());
        System.out.println("192.168.0.2:111的哈希值:" + "192.168.0.2:1111".hashCode());
        System.out.println("192.168.0.3:111的哈希值:" + "192.168.0.3:1111".hashCode());
        System.out.println("192.168.0.4:111的哈希值:" + "192.168.0.4:1111".hashCode());
    }
}

我们在做集群的时候,集群点的IP以这种连续的形式存在是很正常的。看一下运行结果为:

192.168.0.0:111的哈希值:1845870087
192.168.0.1:111的哈希值:1874499238
192.168.0.2:111的哈希值:1903128389
192.168.0.3:111的哈希值:1931757540
192.168.0.4:111的哈希值:1960386691

这个就问题大了,[0,232-1]的区间之中,5个HashCode值却只分布在这么小小的一个区间,什么概念?[0,232-1]中有4294967296个数字,而我们的区间只有114516604,从概率学上讲这将导致97%待路由的服务器都被路由到”192.168.0.0″这个集群点上,简直是糟糕透了!

另外还有一个不好的地方:规定的区间是非负数,String的hashCode()方法却会产生负数(不信用”192.168.1.0:1111″试试看就知道了)。不过这个问题好解决,取绝对值就是一种解决的办法。

综上,String重写的hashCode()方法在一致性Hash算法中没有任何实用价值,得找个算法重新计算HashCode。这种重新计算Hash值的算法有很多,比如CRC32_HASH、FNV1_32_HASH、KETAMA_HASH等,其中KETAMA_HASH是默认的MemCache推荐的一致性Hash算法,用别的Hash算法也可以,比如FNV1_32_HASH算法的计算效率就会高一些。

一致性Hash算法实现版本1:不带虚拟节点

使用一致性Hash算法,尽管增强了系统的伸缩性,但是也有可能导致负载分布不均匀,解决办法就是使用虚拟节点代替真实节点,第一个代码版本,先来个简单的,不带虚拟节点。

下面来看一下不带虚拟节点的一致性Hash算法的Java代码实现:

 1 /**
 2  * 不带虚拟节点的一致性Hash算法
 3  * @author 五月的仓颉http://www.cnblogs.com/xrq730/
 4  *
 5  */
 6 public class ConsistentHashingWithoutVirtualNode
 7 {
 8     /**
 9      * 待添加入Hash环的服务器列表
10      */
11     private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
12             "192.168.0.3:111", "192.168.0.4:111"};
13     
14     /**
15      * key表示服务器的hash值,value表示服务器的名称
16      */
17     private static SortedMap<Integer, String> sortedMap = 
18             new TreeMap<Integer, String>();
19     
20     /**
21      * 程序初始化,将所有的服务器放入sortedMap中
22      */
23     static
24     {
25         for (int i = 0; i < servers.length; i++)
26         {
27             int hash = getHash(servers[i]);
28             System.out.println("[" + servers[i] + "]加入集合中, 其Hash值为" + hash);
29             sortedMap.put(hash, servers[i]);
30         }
31         System.out.println();
32     }
33     
34     /**
35      * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别 
36      */
37     private static int getHash(String str)
38     {
39         final int p = 16777619;
40         int hash = (int)2166136261L;
41         for (int i = 0; i < str.length(); i++)
42             hash = (hash ^ str.charAt(i)) * p;
43         hash += hash << 13;
44         hash ^= hash >> 7;
45         hash += hash << 3;
46         hash ^= hash >> 17;
47         hash += hash << 5;
48         
49         // 如果算出来的值为负数则取其绝对值
50         if (hash < 0)
51             hash = Math.abs(hash);
52         return hash;
53     }
54     
55     /**
56      * 得到应当路由到的结点
57      */
58     private static String getServer(String node)
59     {
60         // 得到带路由的结点的Hash值
61         int hash = getHash(node);
62         // 得到大于该Hash值的所有Map
63         SortedMap<Integer, String> subMap = 
64                 sortedMap.tailMap(hash);
65         // 第一个Key就是顺时针过去离node最近的那个结点
66         Integer i = subMap.firstKey();
67         // 返回对应的服务器名称
68         return subMap.get(i);
69     }
70     
71     public static void main(String[] args)
72     {
73         String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
74         for (int i = 0; i < nodes.length; i++)
75             System.out.println("[" + nodes[i] + "]的hash值为" + 
76                     getHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
77     }
78 }

可以运行一下看一下结果:

[192.168.0.0:111]加入集合中, 其Hash值为575774686
[192.168.0.1:111]加入集合中, 其Hash值为8518713
[192.168.0.2:111]加入集合中, 其Hash值为1361847097
[192.168.0.3:111]加入集合中, 其Hash值为1171828661
[192.168.0.4:111]加入集合中, 其Hash值为1764547046

[127.0.0.1:1111]的hash值为380278925, 被路由到结点[192.168.0.0:111]
[221.226.0.1:2222]的hash值为1493545632, 被路由到结点[192.168.0.4:111]
[10.211.0.1:3333]的hash值为1393836017, 被路由到结点[192.168.0.4:111]

看到经过FNV1_32_HASH算法重新计算过后的Hash值,就比原来String的hashCode()方法好多了。从运行结果来看,也没有问题,三个点路由到的都是顺时针离他们Hash值最近的那台服务器上。

使用虚拟节点来改善一致性Hash算法

上面的一致性Hash算法实现,可以在很大程度上解决很多分布式环境下不好的路由算法导致系统伸缩性差的问题,但是会带来另外一个问题:负载不均。

比如说有Hash环上有A、B、C三个服务器节点,分别有100个请求会被路由到相应服务器上。现在在A与B之间增加了一个节点D,这导致了原来会路由到B上的部分节点被路由到了D上,这样A、C上被路由到的请求明显多于B、D上的,原来三个服务器节点上均衡的负载被打破了。某种程度上来说,这失去了负载均衡的意义,因为负载均衡的目的本身就是为了使得目标服务器均分所有的请求

解决这个问题的办法是引入虚拟节点,其工作原理是:将一个物理节点拆分为多个虚拟节点,并且同一个物理节点的虚拟节点尽量均匀分布在Hash环上。采取这样的方式,就可以有效地解决增加或减少节点时候的负载不均衡的问题。

至于一个物理节点应该拆分为多少虚拟节点,下面可以先看一张图:

横轴表示需要为每台福利服务器扩展的虚拟节点倍数,纵轴表示的是实际物理服务器数。可以看出,物理服务器很少,需要更大的虚拟节点;反之物理服务器比较多,虚拟节点就可以少一些。比如有10台物理服务器,那么差不多需要为每台服务器增加100~200个虚拟节点才可以达到真正的负载均衡。

一致性Hash算法实现版本2:带虚拟节点

在理解了使用虚拟节点来改善一致性Hash算法的理论基础之后,就可以尝试开发代码了。编程方面需要考虑的问题是:

1、一个真实结点如何对应成为多个虚拟节点?

2、虚拟节点找到后如何还原为真实结点?

这两个问题其实有很多解决办法,我这里使用了一种简单的办法,给每个真实结点后面根据虚拟节点加上后缀再取Hash值,比如”192.168.0.0:111″就把它变成”192.168.0.0:111&&VN0″到”192.168.0.0:111&&VN4″,VN就是Virtual Node的缩写,还原的时候只需要从头截取字符串到”&&”的位置就可以了。

下面来看一下带虚拟节点的一致性Hash算法的Java代码实现:

 1 /**
 2  * 带虚拟节点的一致性Hash算法
 3  * @author 五月的仓颉 http://www.cnblogs.com/xrq730/
 4  */
 5 public class ConsistentHashingWithVirtualNode
 6 {
 7     /**
 8      * 待添加入Hash环的服务器列表
 9      */
10     private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
11             "192.168.0.3:111", "192.168.0.4:111"};
12     
13     /**
14      * 真实结点列表,考虑到服务器上线、下线的场景,即添加、删除的场景会比较频繁,这里使用LinkedList会更好
15      */
16     private static List<String> realNodes = new LinkedList<String>();
17     
18     /**
19      * 虚拟节点,key表示虚拟节点的hash值,value表示虚拟节点的名称
20      */
21     private static SortedMap<Integer, String> virtualNodes = 
22             new TreeMap<Integer, String>();
23     
24     /**
25      * 虚拟节点的数目,这里写死,为了演示需要,一个真实结点对应5个虚拟节点
26      */
27     private static final int VIRTUAL_NODES = 5;
28     
29     static
30     {
31         // 先把原始的服务器添加到真实结点列表中
32         for (int i = 0; i < servers.length; i++)
33             realNodes.add(servers[i]);
34         
35         // 再添加虚拟节点,遍历LinkedList使用foreach循环效率会比较高
36         for (String str : realNodes)
37         {
38             for (int i = 0; i < VIRTUAL_NODES; i++)
39             {
40                 String virtualNodeName = str + "&&VN" + String.valueOf(i);
41                 int hash = getHash(virtualNodeName);
42                 System.out.println("虚拟节点[" + virtualNodeName + "]被添加, hash值为" + hash);
43                 virtualNodes.put(hash, virtualNodeName);
44             }
45         }
46         System.out.println();
47     }
48     
49     /**
50      * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别 
51      */
52     private static int getHash(String str)
53     {
54         final int p = 16777619;
55         int hash = (int)2166136261L;
56         for (int i = 0; i < str.length(); i++)
57             hash = (hash ^ str.charAt(i)) * p;
58         hash += hash << 13;
59         hash ^= hash >> 7;
60         hash += hash << 3;
61         hash ^= hash >> 17;
62         hash += hash << 5;
63         
64         // 如果算出来的值为负数则取其绝对值
65         if (hash < 0)
66             hash = Math.abs(hash);
67         return hash;
68     }
69     
70     /**
71      * 得到应当路由到的结点
72      */
73     private static String getServer(String node)
74     {
75         // 得到带路由的结点的Hash值
76         int hash = getHash(node);
77         // 得到大于该Hash值的所有Map
78         SortedMap<Integer, String> subMap = 
79                 virtualNodes.tailMap(hash);
80         // 第一个Key就是顺时针过去离node最近的那个结点
81         Integer i = subMap.firstKey();
82         // 返回对应的虚拟节点名称,这里字符串稍微截取一下
83         String virtualNode = subMap.get(i);
84         return virtualNode.substring(0, virtualNode.indexOf("&&"));
85     }
86     
87     public static void main(String[] args)
88     {
89         String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
90         for (int i = 0; i < nodes.length; i++)
91             System.out.println("[" + nodes[i] + "]的hash值为" + 
92                     getHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
93     }
94 }

关注一下运行结果:

虚拟节点[192.168.0.0:111&&VN0]被添加, hash值为1686427075
虚拟节点[192.168.0.0:111&&VN1]被添加, hash值为354859081
虚拟节点[192.168.0.0:111&&VN2]被添加, hash值为1306497370
虚拟节点[192.168.0.0:111&&VN3]被添加, hash值为817889914
虚拟节点[192.168.0.0:111&&VN4]被添加, hash值为396663629
虚拟节点[192.168.0.1:111&&VN0]被添加, hash值为1032739288
虚拟节点[192.168.0.1:111&&VN1]被添加, hash值为707592309
虚拟节点[192.168.0.1:111&&VN2]被添加, hash值为302114528
虚拟节点[192.168.0.1:111&&VN3]被添加, hash值为36526861
虚拟节点[192.168.0.1:111&&VN4]被添加, hash值为848442551
虚拟节点[192.168.0.2:111&&VN0]被添加, hash值为1452694222
虚拟节点[192.168.0.2:111&&VN1]被添加, hash值为2023612840
虚拟节点[192.168.0.2:111&&VN2]被添加, hash值为697907480
虚拟节点[192.168.0.2:111&&VN3]被添加, hash值为790847074
虚拟节点[192.168.0.2:111&&VN4]被添加, hash值为2010506136
虚拟节点[192.168.0.3:111&&VN0]被添加, hash值为891084251
虚拟节点[192.168.0.3:111&&VN1]被添加, hash值为1725031739
虚拟节点[192.168.0.3:111&&VN2]被添加, hash值为1127720370
虚拟节点[192.168.0.3:111&&VN3]被添加, hash值为676720500
虚拟节点[192.168.0.3:111&&VN4]被添加, hash值为2050578780
虚拟节点[192.168.0.4:111&&VN0]被添加, hash值为586921010
虚拟节点[192.168.0.4:111&&VN1]被添加, hash值为184078390
虚拟节点[192.168.0.4:111&&VN2]被添加, hash值为1331645117
虚拟节点[192.168.0.4:111&&VN3]被添加, hash值为918790803
虚拟节点[192.168.0.4:111&&VN4]被添加, hash值为1232193678

[127.0.0.1:1111]的hash值为380278925, 被路由到结点[192.168.0.0:111]
[221.226.0.1:2222]的hash值为1493545632, 被路由到结点[192.168.0.0:111]
[10.211.0.1:3333]的hash值为1393836017, 被路由到结点[192.168.0.2:111]

从代码运行结果看,每个点路由到的服务器都是Hash值顺时针离它最近的那个服务器节点,没有任何问题。

通过采取虚拟节点的方法,一个真实结点不再固定在Hash换上的某个点,而是大量地分布在整个Hash环上,这样即使上线、下线服务器,也不会造成整体的负载不均衡。

后记

在写本文的时候,很多知识我也是边写边学,难免有很多写得不好、理解得不透彻的地方,而且代码整体也比较糙,未有考虑到可能的各种情况。抛砖引玉,一方面,写得不对的地方,还望网友朋友们指正;另一方面,后续我也将通过自己的工作、学习不断完善上面的代码。

from:http://www.cnblogs.com/xrq730/p/5186728.html