Category Archives: 分布式

理解HTTP幂等性

基于HTTP协议的Web API是时下最为流行的一种分布式服务提供方式。无论是在大型互联网应用还是企业级架构中,我们都见到了越来越多的SOA或RESTful的Web API。为什么Web API如此流行呢?我认为很大程度上应归功于简单有效的HTTP协议。HTTP协议是一种分布式的面向资源的网络应用层协议,无论是服务器端提供Web服务,还是客户端消费Web服务都非常简单。再加上浏览器、Javascript、AJAX、JSON以及HTML5等技术和工具的发展,互联网应用架构设计表现出了从传统的PHP、JSP、ASP.NET等服务器端动态网页向Web API + RIA(富互联网应用)过渡的趋势。Web API专注于提供业务服务,RIA专注于用户界面和交互设计,从此两个领域的分工更加明晰。在这种趋势下,Web API设计将成为服务器端程序员的必修课。然而,正如简单的Java语言并不意味着高质量的Java程序,简单的HTTP协议也不意味着高质量的Web API。要想设计出高质量的Web API,还需要深入理解分布式系统及HTTP协议的特性。

幂等性定义

本文所要探讨的正是HTTP协议涉及到的一种重要性质:幂等性(Idempotence)。在HTTP/1.1规范中幂等性的定义是:

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request.

从定义上看,HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用。幂等性属于语义范畴,正如编译器只能帮助检查语法错误一样,HTTP规范也没有办法通过消息格式等语法手段来定义它,这可能是它不太受到重视的原因之一。但实际上,幂等性是分布式系统设计中十分重要的概念,而HTTP的分布式本质也决定了它在HTTP中具有重要地位。

分布式事务 vs 幂等设计

为什么需要幂等性呢?我们先从一个例子说起,假设有一个从账户取钱的远程API(可以是HTTP的,也可以不是),我们暂时用类函数的方式记为:

bool withdraw(account_id, amount)

withdraw的语义是从account_id对应的账户中扣除amount数额的钱;如果扣除成功则返回true,账户余额减少amount;如果扣除失败则返回false,账户余额不变。值得注意的是:和本地环境相比,我们不能轻易假设分布式环境的可靠性。一种典型的情况是withdraw请求已经被服务器端正确处理,但服务器端的返回结果由于网络等原因被掉丢了,导致客户端无法得知处理结果。如果是在网页上,一些不恰当的设计可能会使用户认为上一次操作失败了,然后刷新页面,这就导致了withdraw被调用两次,账户也被多扣了一次钱。如图1所示:

20110810171503575

图1

这个问题的解决方案一是采用分布式事务,通过引入支持分布式事务的中间件来保证withdraw功能的事务性。分布式事务的优点是对于调用者很简单,复杂性都交给了中间件来管理。缺点则是一方面架构太重量级,容易被绑在特定的中间件上,不利于异构系统的集成;另一方面分布式事务虽然能保证事务的ACID性质,而但却无法提供性能和可用性的保证。

另一种更轻量级的解决方案是幂等设计。我们可以通过一些技巧把withdraw变成幂等的,比如:

int create_ticket() 
bool idempotent_withdraw(ticket_id, account_id, amount)

create_ticket的语义是获取一个服务器端生成的唯一的处理号ticket_id,它将用于标识后续的操作。idempotent_withdraw和withdraw的区别在于关联了一个ticket_id,一个ticket_id表示的操作至多只会被处理一次,每次调用都将返回第一次调用时的处理结果。这样,idempotent_withdraw就符合幂等性了,客户端就可以放心地多次调用。

基于幂等性的解决方案中一个完整的取钱流程被分解成了两个步骤:1.调用create_ticket()获取ticket_id;2.调用idempotent_withdraw(ticket_id, account_id, amount)。虽然create_ticket不是幂等的,但在这种设计下,它对系统状态的影响可以忽略,加上idempotent_withdraw是幂等的,所以任何一步由于网络等原因失败或超时,客户端都可以重试,直到获得结果。如图2所示:

201106042051069339

图2

和分布式事务相比,幂等设计的优势在于它的轻量级,容易适应异构环境,以及性能和可用性方面。在某些性能要求比较高的应用,幂等设计往往是唯一的选择。

HTTP的幂等性

HTTP协议本身是一种面向资源的应用层协议,但对HTTP协议的使用实际上存在着两种不同的方式:一种是RESTful的,它把HTTP当成应用层协议,比较忠实地遵守了HTTP协议的各种规定;另一种是SOA的,它并没有完全把HTTP当成应用层协议,而是把HTTP协议作为了传输层协议,然后在HTTP之上建立了自己的应用层协议。本文所讨论的HTTP幂等性主要针对RESTful风格的,不过正如上一节所看到的那样,幂等性并不属于特定的协议,它是分布式系统的一种特性;所以,不论是SOA还是RESTful的Web API设计都应该考虑幂等性。下面将介绍HTTP GET、DELETE、PUT、POST四种主要方法的语义和幂等性。

HTTP GET方法用于获取资源,不应有副作用,所以是幂等的。比如:GET http://www.bank.com/account/123456,不会改变资源的状态,不论调用一次还是N次都没有副作用。请注意,这里强调的是一次和N次具有相同的副作用,而不是每次GET的结果相同。GET http://www.news.com/latest-news这个HTTP请求可能会每次得到不同的结果,但它本身并没有产生任何副作用,因而是满足幂等性的。

HTTP DELETE方法用于删除资源,有副作用,但它应该满足幂等性。比如:DELETE http://www.forum.com/article/4231,调用一次和N次对系统产生的副作用是相同的,即删掉id为4231的帖子;因此,调用者可以多次调用或刷新页面而不必担心引起错误。

比较容易混淆的是HTTP POST和PUT。POST和PUT的区别容易被简单地误认为“POST表示创建资源,PUT表示更新资源”;而实际上,二者均可用于创建资源,更为本质的差别是在幂等性方面。在HTTP规范中对POST和PUT是这样定义的:
The POST method is used to request that the origin server accept the entity enclosed in the request as a new subordinate of the resource identified by the Request-URI in the Request-Line …… If a resource has been created on the origin server, the response SHOULD be 201 (Created) and contain an entity which describes the status of the request and refers to the new resource, and a Location header.

The PUT method requests that the enclosed entity be stored under the supplied Request-URI. If the Request-URI refers to an already existing resource, the enclosed entity SHOULD be considered as a modified version of the one residing on the origin server. If the Request-URI does not point to an existing resource, and that URI is capable of being defined as a new resource by the requesting user agent, the origin server can create the resource with that URI.
POST所对应的URI并非创建的资源本身,而是资源的接收者。比如:POST http://www.forum.com/articles的语义是在http://www.forum.com/articles下创建一篇帖子,HTTP响应中应包含帖子的创建状态以及帖子的URI。两次相同的POST请求会在服务器端创建两份资源,它们具有不同的URI;所以,POST方法不具备幂等性。而PUT所对应的URI是要创建或更新的资源本身。比如:PUT http://www.forum/articles/4231的语义是创建或更新ID为4231的帖子。对同一URI进行多次PUT的副作用和一次PUT是相同的;因此,PUT方法具有幂等性。

在介绍了几种操作的语义和幂等性之后,我们来看看如何通过Web API的形式实现前面所提到的取款功能。很简单,用POST /tickets来实现create_ticket;用PUT /accounts/account_id/ticket_id&amount=xxx来实现idempotent_withdraw。值得注意的是严格来讲amount参数不应该作为URI的一部分,真正的URI应该是/accounts/account_id/ticket_id,而amount应该放在请求的body中。这种模式可以应用于很多场合,比如:论坛网站中防止意外的重复发帖。

总结

上面简单介绍了幂等性的概念,用幂等设计取代分布式事务的方法,以及HTTP主要方法的语义和幂等性特征。其实,如果要追根溯源,幂等性是数学中的一个概念,表达的是N次变换与1次变换的结果相同,有兴趣的读者可以从Wikipedia上进一步了解。

参考

RFC 2616, Hypertext Transfer Protocol — HTTP/1.1, Method Definitions
The Importance of Idempotence
Stackoverflow – PUT vs POST in REST

from:http://www.cnblogs.com/weidagang2046/archive/2011/06/04/2063696.html

保证分布式系统数据一致性的6种方案

编者按:本文由「高可用架构后花园」群讨论整理而成,后花园是一个面向架构师的增值服务,如需了解,请关注「高可用架构」后回复 VIP

有人的地方,就有江湖

有江湖的地方,就有纷争

问题的起源

在电商等业务中,系统一般由多个独立的服务组成,如何解决分布式调用时候数据的一致性?

具体业务场景如下,比如一个业务操作,如果同时调用服务 A、B、C,需要满足要么同时成功;要么同时失败。A、B、C 可能是多个不同部门开发、部署在不同服务器上的远程服务。

在分布式系统来说,如果不想牺牲一致性,CAP 理论告诉我们只能放弃可用性,这显然不能接受。为了便于讨论问题,先简单介绍下数据一致性的基础理论。

强一致
当更新操作完成之后,任何多个后续进程或者线程的访问都会返回最新的更新过的值。这种是对用户最友好的,就是用户上一次写什么,下一次就保证能读到什么。根据 CAP 理论,这种实现需要牺牲可用性。
弱一致性
系统并不保证续进程或者线程的访问都会返回最新的更新过的值。系统在数据写入成功之后,不承诺立即可以读到最新写入的值,也不会具体的承诺多久之后可以读到。
最终一致性
弱一致性的特定形式。系统保证在没有后续更新的前提下,系统最终返回上一次更新操作的值。在没有故障发生的前提下,不一致窗口的时间主要受通信延迟,系统负载和复制副本的个数影响。DNS 是一个典型的最终一致性系统。
在工程实践上,为了保障系统的可用性,互联网系统大多将强一致性需求转换成最终一致性的需求,并通过系统执行幂等性的保证,保证数据的最终一致性。但在电商等场景中,对于数据一致性的解决方法和常见的互联网系统(如 MySQL 主从同步)又有一定区别,群友的讨论分成以下 6 种解决方案。

1. 规避分布式事务——业务整合

业务整合方案主要采用将接口整合到本地执行的方法。拿问题场景来说,则可以将服务 A、B、C 整合为一个服务 D 给业务,这个服务 D 再通过转换为本地事务的方式,比如服务 D 包含本地服务和服务 E,而服务 E 是本地服务 A ~ C 的整合。

优点:解决(规避)了分布式事务。

缺点:显而易见,把本来规划拆分好的业务,又耦合到了一起,业务职责不清晰,不利于维护。

由于这个方法存在明显缺点,通常不建议使用。

2. 经典方案 – eBay 模式

此方案的核心是将需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到本地文本、数据库或消息队列,再通过业务规则自动或人工发起重试。人工重试更多的是应用于支付场景,通过对账系统对事后问题的处理。

消息日志方案的核心是保证服务接口的幂等性。

考虑到网络通讯失败、数据丢包等原因,如果接口不能保证幂等性,数据的唯一性将很难保证。

eBay 方式的主要思路如下。

Base:一种 Acid 的替代方案

此方案是 eBay 的架构师 Dan Pritchett 在 2008 年发表给 ACM 的文章,是一篇解释 BASE 原则,或者说最终一致性的经典文章。文中讨论了 BASE 与 ACID 原则在保证数据一致性的基本差异。

如果 ACID 为分区的数据库提供一致性的选择,那么如何实现可用性呢?答案是

BASE (basically available, soft state, eventually consistent)

BASE 的可用性是通过支持局部故障而不是系统全局故障来实现的。下面是一个简单的例子:如果将用户分区在 5 个数据库服务器上,BASE 设计鼓励类似的处理方式,一个用户数据库的故障只影响这台特定主机那 20% 的用户。这里不涉及任何魔法,不过它确实可以带来更高的可感知的系统可用性。

文章中描述了一个最常见的场景,如果产生了一笔交易,需要在交易表增加记录,同时还要修改用户表的金额。这两个表属于不同的远程服务,所以就涉及到分布式事务一致性的问题。

文中提出了一个经典的解决方法,将主要修改操作以及更新用户表的消息放在一个本地事务来完成。同时为了避免重复消费用户表消息带来的问题,达到多次重试的幂等性,增加一个更新记录表 updates_applied来记录已经处理过的消息。

5ad0007faf14d9018dc

系统的执行伪代码如下

5bd0002b66b92973061

(点击可全屏缩放图片)

基于以上方法,在第一阶段,通过本地的数据库的事务保障,增加了 transaction 表及消息队列 。

在第二阶段,分别读出消息队列(但不删除),通过判断更新记录表 updates_applied 来检测相关记录是否被执行,未被执行的记录会修改 user 表,然后增加一条操作记录到 updates_applied,事务执行成功之后再删除队列。

通过以上方法,达到了分布式系统的最终一致性。进一步了解 eBay 的方案可以参考文末链接。

3. 去哪儿网分布式事务方案

随着业务规模不断地扩大,电商网站一般都要面临拆分之路。就是将原来一个单体应用拆分成多个不同职责的子系统。比如以前可能将面向用户、客户和运营的功能都放在一个系统里,现在拆分为订单中心、代理商管理、运营系统、报价中心、库存管理等多个子系统。

拆分首先要面临的是什么呢?

最开始的单体应用所有功能都在一起,存储也在一起。比如运营要取消某个订单,那直接去更新订单表状态,然后更新库存表就 ok 了。因为是单体应用,库在一起,这些都可以在一个事务里,由关系数据库来保证一致性。

但拆分之后就不同了,不同的子系统都有自己的存储。比如订单中心就只管理自己的订单库,而库存管理也有自己的库。那么运营系统取消订单的时候就是通过接口调用等方式来调用订单中心和库存管理的服务了,而不是直接去操作库。这就涉及一个『分布式事务』的问题。

5bd0002b66dc70ed910

分布式事务有两种解决方式

1. 优先使用异步消息。

上文已经说过,使用异步消息 Consumer 端需要实现幂等。

幂等有两种方式,一种方式是业务逻辑保证幂等。比如接到支付成功的消息订单状态变成支付完成,如果当前状态是支付完成,则再收到一个支付成功的消息则说明消息重复了,直接作为消息成功处理。

另外一种方式如果业务逻辑无法保证幂等,则要增加一个去重表或者类似的实现。对于 producer 端在业务数据库的同实例上放一个消息库,发消息和业务操作在同一个本地事务里。发消息的时候消息并不立即发出,而是向消息库插入一条消息记录,然后在事务提交的时候再异步将消息发出,发送消息如果成功则将消息库里的消息删除,如果遇到消息队列服务异常或网络问题,消息没有成功发出那么消息就留在这里了,会有另外一个服务不断地将这些消息扫出重新发送。

2. 有的业务不适合异步消息的方式,事务的各个参与方都需要同步的得到结果。这种情况的实现方式其实和上面类似,每个参与方的本地业务库的同实例上面放一个事务记录库。

比如 A 同步调用 B,C。A 本地事务成功的时候更新本地事务记录状态,B 和 C 同样。如果有一次 A 调用 B 失败了,这个失败可能是 B 真的失败了,也可能是调用超时,实际 B 成功。则由一个中心服务对比三方的事务记录表,做一个最终决定。假设现在三方的事务记录是 A 成功,B 失败,C 成功。那么最终决定有两种方式,根据具体场景:

  1. 重试 B,直到 B 成功,事务记录表里记录了各项调用参数等信息;
  2. 执行 A 和 B 的补偿操作(一种可行的补偿方式是回滚)。

对 b 场景做一个特殊说明:比如 B 是扣库存服务,在第一次调用的时候因为某种原因失败了,但是重试的时候库存已经变为 0,无法重试成功,这个时候只有回滚 A 和 C 了。

那么可能有人觉得在业务库的同实例里放消息库或事务记录库,会对业务侵入,业务还要关心这个库,是否一个合理的设计?

实际上可以依靠运维的手段来简化开发的侵入,我们的方法是让 DBA 在公司所有 MySQL 实例上预初始化这个库,通过框架层(消息的客户端或事务 RPC 框架)透明的在背后操作这个库,业务开发人员只需要关心自己的业务逻辑,不需要直接访问这个库。

总结起来,其实两种方式的根本原理是类似的,也就是将分布式事务转换为多个本地事务,然后依靠重试等方式达到最终一致性

4. 蘑菇街交易创建过程中的分布式一致性方案

交易创建的一般性流程

我们把交易创建流程抽象出一系列可扩展的功能点,每个功能点都可以有多个实现(具体的实现之间有组合/互斥关系)。把各个功能点按照一定流程串起来,就完成了交易创建的过程。

5ad0007faf4df6af7f3

面临的问题

每个功能点的实现都可能会依赖外部服务。那么如何保证各个服务之间的数据是一致的呢?比如锁定优惠券服务调用超时了,不能确定到底有没有锁券成功,该如何处理?再比如锁券成功了,但是扣减库存失败了,该如何处理?

方案选型

服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。

所以在拆分大的流程为多个小的本地事务的前提下,对于非实时、非强一致性的关联业务写入,在本地事务执行成功后,我们选择发消息通知、关联事务异步化执行的方案。

消息通知往往不能保证 100% 成功;且消息通知后,接收方业务是否能执行成功还是未知数。前者问题可以通过重试解决;后者可以选用事务消息来保证。

但是事务消息框架本身会给业务代码带来侵入性和复杂性,所以我们选择基于 DB 事件变化通知到 MQ 的方式做系统间解耦,通过订阅方消费 MQ 消息时的 ACK 机制,保证消息一定消费成功,达到最终一致性。由于消息可能会被重发,消息订阅方业务逻辑处理要做好幂等保证。

所以目前只剩下需要实时同步做、有强一致性要求的业务场景了。在交易创建过程中,锁券和扣减库存是这样的两个典型场景。

要保证多个系统间数据一致,乍一看,必须要引入分布式事务框架才能解决。但引入非常重的类似二阶段提交分布式事务框架会带来复杂性的急剧上升;在电商领域,绝对的强一致是过于理想化的,我们可以选择准实时的最终一致性。

我们在交易创建流程中,首先创建一个不可见订单,然后在同步调用锁券和扣减库存时,针对调用异常(失败或者超时),发出废单消息到MQ。如果消息发送失败,本地会做时间阶梯式的异步重试;优惠券系统和库存系统收到消息后,会进行判断是否需要做业务回滚,这样就准实时地保证了多个本地事务的最终一致性。

5b30008086e9f8d9583

5. 支付宝及蚂蚁金融云的分布式服务 DTS 方案

业界常用的还有支付宝的一种 xts 方案,由支付宝在 2PC 的基础上改进而来。主要思路如下,大部分信息引用自官方网站。

分布式事务服务简介

分布式事务服务 (Distributed Transaction Service, DTS) 是一个分布式事务框架,用来保障在大规模分布式环境下事务的最终一致性。DTS 从架构上分为 xts-client 和 xts-server 两部分,前者是一个嵌入客户端应用的 JAR 包,主要负责事务数据的写入和处理;后者是一个独立的系统,主要负责异常事务的恢复。

核心特性

传统关系型数据库的事务模型必须遵守 ACID 原则。在单数据库模式下,ACID 模型能有效保障数据的完整性,但是在大规模分布式环境下,一个业务往往会跨越多个数据库,如何保证这多个数据库之间的数据一致性,需要其他行之有效的策略。在 JavaEE 规范中使用 2PC (2 Phase Commit, 两阶段提交) 来处理跨 DB 环境下的事务问题,但是 2PC 是反可伸缩模式,也就是说,在事务处理过程中,参与者需要一直持有资源直到整个分布式事务结束。这样,当业务规模达到千万级以上时,2PC 的局限性就越来越明显,系统可伸缩性会变得很差。基于此,我们采用 BASE 的思想实现了一套类似 2PC 的分布式事务方案,这就是 DTS。DTS在充分保障分布式环境下高可用性、高可靠性的同时兼顾数据一致性的要求,其最大的特点是保证数据最终一致 (Eventually consistent)。

简单的说,DTS 框架有如下特性:

  • 最终一致:事务处理过程中,会有短暂不一致的情况,但通过恢复系统,可以让事务的数据达到最终一致的目标。
  • 协议简单:DTS 定义了类似 2PC 的标准两阶段接口,业务系统只需要实现对应的接口就可以使用 DTS 的事务功能。
  • 与 RPC 服务协议无关:在 SOA 架构下,一个或多个 DB 操作往往被包装成一个一个的 Service,Service 与 Service 之间通过 RPC 协议通信。DTS 框架构建在 SOA 架构上,与底层协议无关。
  • 与底层事务实现无关: DTS 是一个抽象的基于 Service 层的概念,与底层事务实现无关,也就是说在 DTS 的范围内,无论是关系型数据库 MySQL,Oracle,还是 KV 存储 MemCache,或者列存数据库 HBase,只要将对其的操作包装成 DTS 的参与者,就可以接入到 DTS 事务范围内。

以下是分布式事务框架的流程图

5c000029fd4a5102725

实现

  1. 一个完整的业务活动由一个主业务服务与若干从业务服务组成。
  2. 主业务服务负责发起并完成整个业务活动。
  3. 从业务服务提供 TCC 型业务操作。
  4. 业务活动管理器控制业务活动的一致性,它登记业务活动中的操作,并在活动提交时确认所有的两阶段事务的 confirm 操作,在业务活动取消时调用所有两阶段事务的 cancel 操作。”

与 2PC 协议比较

  1. 没有单独的 Prepare 阶段,降低协议成本
  2. 系统故障容忍度高,恢复简单

6. 农信网数据一致性方案

1. 电商业务

公司的支付部门,通过接入其它第三方支付系统来提供支付服务给业务部门,支付服务是一个基于 Dubbo 的 RPC 服务。

对于业务部门来说,电商部门的订单支付,需要调用

  1. 支付平台的支付接口来处理订单;
  2. 同时需要调用积分中心的接口,按照业务规则,给用户增加积分。

从业务规则上需要同时保证业务数据的实时性和一致性,也就是支付成功必须加积分。

我们采用的方式是同步调用,首先处理本地事务业务。考虑到积分业务比较单一且业务影响低于支付,由积分平台提供增加与回撤接口。

具体的流程是先调用积分平台增加用户积分,再调用支付平台进行支付处理,如果处理失败,catch 方法调用积分平台的回撤方法,将本次处理的积分订单回撤。

5bd0002b671c5f212f4

2. 用户信息变更

公司的用户信息,统一由用户中心维护,而用户信息的变更需要同步给各业务子系统,业务子系统再根据变更内容,处理各自业务。用户中心作为 MQ 的 producer,添加通知给 MQ。APP Server 订阅该消息,同步本地数据信息,再处理相关业务比如 APP 退出下线等。

我们采用异步消息通知机制,目前主要使用 ActiveMQ,基于 Virtual Topic 的订阅方式,保证单个业务集群订阅的单次消费。

5ad0007faf7a576a640

总结

分布式服务对衍生的配套系统要求比较多,特别是我们基于消息、日志的最终一致性方案,需要考虑消息的积压、消费情况、监控、报警等。

参考资料

  • Base: An Acid Alternative (eBay 方案)

In partitioned databases, trading some consistency for availability can lead to dramatic improvements in scalability.

英文版 : http://queue.acm.org/detail.cfm?id=1394128 

中文版: http://article.yeeyan.org/view/167444/125572

 

感谢李玉福、余昭辉、蘑菇街七公提供方案,其他多位群成员对本文内容亦有贡献。对于上述方案表述不完善之处,欢迎读者留言指出。

from:http://www.cnblogs.com/soundcode/p/5590710.html

一致性Hash算法(Java实现)

一致性Hash算法

关于一致性Hash算法,在我之前的博文中已经有多次提到了,MemCache超详细解读一文中”一致性Hash算法”部分,对于为什么要使用一致性Hash算法、一致性Hash算法的算法原理做了详细的解读。

算法的具体原理这里再次贴上:

先构造一个长度为232的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 232-1])将服务器节点放置在这个Hash环上,然后根据数据的Key值计算得到其Hash值(其分布也为[0, 232-1]),接着在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。

这种算法解决了普通余数Hash算法伸缩性差的问题,可以保证在上线、下线服务器的情况下尽量有多的请求命中原来路由到的服务器。

当然,万事不可能十全十美,一致性Hash算法比普通的余数Hash算法更具有伸缩性,但是同时其算法实现也更为复杂,本文就来研究一下,如何利用Java代码实现一致性Hash算法。在开始之前,先对一致性Hash算法中的几个核心问题进行一些探究。

 

数据结构的选取

一致性Hash算法最先要考虑的一个问题是:构造出一个长度为232的整数环,根据节点名称的Hash值将服务器节点放置在这个Hash环上。

那么,整数环应该使用何种数据结构,才能使得运行时的时间复杂度最低?首先说明一点,关于时间复杂度,常见的时间复杂度与时间效率的关系有如下的经验规则:

O(1) < O(log2N) < O(n) < O(N * log2N) < O(N2) < O(N3) < 2N < 3N < N!

一般来说,前四个效率比较高,中间两个差强人意,后三个比较差(只要N比较大,这个算法就动不了了)。OK,继续前面的话题,应该如何选取数据结构,我认为有以下几种可行的解决方案。

1、解决方案一:排序+List

我想到的第一种思路是:算出所有待加入数据结构的节点名称的Hash值放入一个数组中,然后使用某种排序算法将其从小到大进行排序,最后将排序后的数据放入List中,采用List而不是数组是为了结点的扩展考虑。

之后,待路由的结点,只需要在List中找到第一个Hash值比它大的服务器节点就可以了,比如服务器节点的Hash值是[0,2,4,6,8,10],带路由的结点是7,只需要找到第一个比7大的整数,也就是8,就是我们最终需要路由过去的服务器节点。

如果暂时不考虑前面的排序,那么这种解决方案的时间复杂度:

(1)最好的情况是第一次就找到,时间复杂度为O(1)

(2)最坏的情况是最后一次才找到,时间复杂度为O(N)

平均下来时间复杂度为O(0.5N+0.5),忽略首项系数和常数,时间复杂度为O(N)。

但是如果考虑到之前的排序,我在网上找了张图,提供了各种排序算法的时间复杂度:

看得出来,排序算法要么稳定但是时间复杂度高、要么时间复杂度低但不稳定,看起来最好的归并排序法的时间复杂度仍然有O(N * logN),稍微耗费性能了一些。

2、解决方案二:遍历+List

既然排序操作比较耗性能,那么能不能不排序?可以的,所以进一步的,有了第二种解决方案。

解决方案使用List不变,不过可以采用遍历的方式:

(1)服务器节点不排序,其Hash值全部直接放入一个List中

(2)带路由的节点,算出其Hash值,由于指明了”顺时针”,因此遍历List,比待路由的节点Hash值大的算出差值并记录,比待路由节点Hash值小的忽略

(3)算出所有的差值之后,最小的那个,就是最终需要路由过去的节点

在这个算法中,看一下时间复杂度:

1、最好情况是只有一个服务器节点的Hash值大于带路由结点的Hash值,其时间复杂度是O(N)+O(1)=O(N+1),忽略常数项,即O(N)

2、最坏情况是所有服务器节点的Hash值都大于带路由结点的Hash值,其时间复杂度是O(N)+O(N)=O(2N),忽略首项系数,即O(N)

所以,总的时间复杂度就是O(N)。其实算法还能更改进一些:给一个位置变量X,如果新的差值比原差值小,X替换为新的位置,否则X不变。这样遍历就减少了一轮,不过经过改进后的算法时间复杂度仍为O(N)。

总而言之,这个解决方案和解决方案一相比,总体来看,似乎更好了一些。

3、解决方案三:二叉查找树

抛开List这种数据结构,另一种数据结构则是使用二叉查找树。对于树不是很清楚的朋友可以简单看一下这篇文章树形结构

当然我们不能简单地使用二叉查找树,因为可能出现不平衡的情况。平衡二叉查找树有AVL树、红黑树等,这里使用红黑树,选用红黑树的原因有两点:

1、红黑树主要的作用是用于存储有序的数据,这其实和第一种解决方案的思路又不谋而合了,但是它的效率非常高

2、JDK里面提供了红黑树的代码实现TreeMap和TreeSet

另外,以TreeMap为例,TreeMap本身提供了一个tailMap(K fromKey)方法,支持从红黑树中查找比fromKey大的值的集合,但并不需要遍历整个数据结构。

使用红黑树,可以使得查找的时间复杂度降低为O(logN),比上面两种解决方案,效率大大提升。

为了验证这个说法,我做了一次测试,从大量数据中查找第一个大于其中间值的那个数据,比如10000数据就找第一个大于5000的数据(模拟平均的情况)。看一下O(N)时间复杂度和O(logN)时间复杂度运行效率的对比:

50000 100000 500000 1000000 4000000
ArrayList 1ms 1ms 4ms 4ms 5ms
LinkedList 4ms 7ms 11ms 13ms 17ms
TreeMap 0ms 0ms 0ms 0ms 0ms

因为再大就内存溢出了,所以只测试到4000000数据。可以看到,数据查找的效率,TreeMap是完胜的,其实再增大数据测试也是一样的,红黑树的数据结构决定了任何一个大于N的最小数据,它都只需要几次至几十次查找就可以查到。

当然,明确一点,有利必有弊,根据我另外一次测试得到的结论是,为了维护红黑树,数据插入效率TreeMap在三种数据结构里面是最差的,且插入要慢上5~10倍

 

Hash值重新计算

服务器节点我们肯定用字符串来表示,比如”192.168.1.1″、”192.168.1.2″,根据字符串得到其Hash值,那么另外一个重要的问题就是Hash值要重新计算,这个问题是我在测试String的hashCode()方法的时候发现的,不妨来看一下为什么要重新计算Hash值:

/**
 * String的hashCode()方法运算结果查看
 * @author 五月的仓颉 http://www.cnblogs.com/xrq730/
 *
 */
public class StringHashCodeTest
{
    public static void main(String[] args)
    {
        System.out.println("192.168.0.0:111的哈希值:" + "192.168.0.0:1111".hashCode());
        System.out.println("192.168.0.1:111的哈希值:" + "192.168.0.1:1111".hashCode());
        System.out.println("192.168.0.2:111的哈希值:" + "192.168.0.2:1111".hashCode());
        System.out.println("192.168.0.3:111的哈希值:" + "192.168.0.3:1111".hashCode());
        System.out.println("192.168.0.4:111的哈希值:" + "192.168.0.4:1111".hashCode());
    }
}

我们在做集群的时候,集群点的IP以这种连续的形式存在是很正常的。看一下运行结果为:

192.168.0.0:111的哈希值:1845870087
192.168.0.1:111的哈希值:1874499238
192.168.0.2:111的哈希值:1903128389
192.168.0.3:111的哈希值:1931757540
192.168.0.4:111的哈希值:1960386691

这个就问题大了,[0,232-1]的区间之中,5个HashCode值却只分布在这么小小的一个区间,什么概念?[0,232-1]中有4294967296个数字,而我们的区间只有114516604,从概率学上讲这将导致97%待路由的服务器都被路由到”192.168.0.0″这个集群点上,简直是糟糕透了!

另外还有一个不好的地方:规定的区间是非负数,String的hashCode()方法却会产生负数(不信用”192.168.1.0:1111″试试看就知道了)。不过这个问题好解决,取绝对值就是一种解决的办法。

综上,String重写的hashCode()方法在一致性Hash算法中没有任何实用价值,得找个算法重新计算HashCode。这种重新计算Hash值的算法有很多,比如CRC32_HASH、FNV1_32_HASH、KETAMA_HASH等,其中KETAMA_HASH是默认的MemCache推荐的一致性Hash算法,用别的Hash算法也可以,比如FNV1_32_HASH算法的计算效率就会高一些。

一致性Hash算法实现版本1:不带虚拟节点

使用一致性Hash算法,尽管增强了系统的伸缩性,但是也有可能导致负载分布不均匀,解决办法就是使用虚拟节点代替真实节点,第一个代码版本,先来个简单的,不带虚拟节点。

下面来看一下不带虚拟节点的一致性Hash算法的Java代码实现:

 1 /**
 2  * 不带虚拟节点的一致性Hash算法
 3  * @author 五月的仓颉http://www.cnblogs.com/xrq730/
 4  *
 5  */
 6 public class ConsistentHashingWithoutVirtualNode
 7 {
 8     /**
 9      * 待添加入Hash环的服务器列表
10      */
11     private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
12             "192.168.0.3:111", "192.168.0.4:111"};
13     
14     /**
15      * key表示服务器的hash值,value表示服务器的名称
16      */
17     private static SortedMap<Integer, String> sortedMap = 
18             new TreeMap<Integer, String>();
19     
20     /**
21      * 程序初始化,将所有的服务器放入sortedMap中
22      */
23     static
24     {
25         for (int i = 0; i < servers.length; i++)
26         {
27             int hash = getHash(servers[i]);
28             System.out.println("[" + servers[i] + "]加入集合中, 其Hash值为" + hash);
29             sortedMap.put(hash, servers[i]);
30         }
31         System.out.println();
32     }
33     
34     /**
35      * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别 
36      */
37     private static int getHash(String str)
38     {
39         final int p = 16777619;
40         int hash = (int)2166136261L;
41         for (int i = 0; i < str.length(); i++)
42             hash = (hash ^ str.charAt(i)) * p;
43         hash += hash << 13;
44         hash ^= hash >> 7;
45         hash += hash << 3;
46         hash ^= hash >> 17;
47         hash += hash << 5;
48         
49         // 如果算出来的值为负数则取其绝对值
50         if (hash < 0)
51             hash = Math.abs(hash);
52         return hash;
53     }
54     
55     /**
56      * 得到应当路由到的结点
57      */
58     private static String getServer(String node)
59     {
60         // 得到带路由的结点的Hash值
61         int hash = getHash(node);
62         // 得到大于该Hash值的所有Map
63         SortedMap<Integer, String> subMap = 
64                 sortedMap.tailMap(hash);
65         // 第一个Key就是顺时针过去离node最近的那个结点
66         Integer i = subMap.firstKey();
67         // 返回对应的服务器名称
68         return subMap.get(i);
69     }
70     
71     public static void main(String[] args)
72     {
73         String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
74         for (int i = 0; i < nodes.length; i++)
75             System.out.println("[" + nodes[i] + "]的hash值为" + 
76                     getHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
77     }
78 }

可以运行一下看一下结果:

[192.168.0.0:111]加入集合中, 其Hash值为575774686
[192.168.0.1:111]加入集合中, 其Hash值为8518713
[192.168.0.2:111]加入集合中, 其Hash值为1361847097
[192.168.0.3:111]加入集合中, 其Hash值为1171828661
[192.168.0.4:111]加入集合中, 其Hash值为1764547046

[127.0.0.1:1111]的hash值为380278925, 被路由到结点[192.168.0.0:111]
[221.226.0.1:2222]的hash值为1493545632, 被路由到结点[192.168.0.4:111]
[10.211.0.1:3333]的hash值为1393836017, 被路由到结点[192.168.0.4:111]

看到经过FNV1_32_HASH算法重新计算过后的Hash值,就比原来String的hashCode()方法好多了。从运行结果来看,也没有问题,三个点路由到的都是顺时针离他们Hash值最近的那台服务器上。

使用虚拟节点来改善一致性Hash算法

上面的一致性Hash算法实现,可以在很大程度上解决很多分布式环境下不好的路由算法导致系统伸缩性差的问题,但是会带来另外一个问题:负载不均。

比如说有Hash环上有A、B、C三个服务器节点,分别有100个请求会被路由到相应服务器上。现在在A与B之间增加了一个节点D,这导致了原来会路由到B上的部分节点被路由到了D上,这样A、C上被路由到的请求明显多于B、D上的,原来三个服务器节点上均衡的负载被打破了。某种程度上来说,这失去了负载均衡的意义,因为负载均衡的目的本身就是为了使得目标服务器均分所有的请求

解决这个问题的办法是引入虚拟节点,其工作原理是:将一个物理节点拆分为多个虚拟节点,并且同一个物理节点的虚拟节点尽量均匀分布在Hash环上。采取这样的方式,就可以有效地解决增加或减少节点时候的负载不均衡的问题。

至于一个物理节点应该拆分为多少虚拟节点,下面可以先看一张图:

横轴表示需要为每台福利服务器扩展的虚拟节点倍数,纵轴表示的是实际物理服务器数。可以看出,物理服务器很少,需要更大的虚拟节点;反之物理服务器比较多,虚拟节点就可以少一些。比如有10台物理服务器,那么差不多需要为每台服务器增加100~200个虚拟节点才可以达到真正的负载均衡。

一致性Hash算法实现版本2:带虚拟节点

在理解了使用虚拟节点来改善一致性Hash算法的理论基础之后,就可以尝试开发代码了。编程方面需要考虑的问题是:

1、一个真实结点如何对应成为多个虚拟节点?

2、虚拟节点找到后如何还原为真实结点?

这两个问题其实有很多解决办法,我这里使用了一种简单的办法,给每个真实结点后面根据虚拟节点加上后缀再取Hash值,比如”192.168.0.0:111″就把它变成”192.168.0.0:111&&VN0″到”192.168.0.0:111&&VN4″,VN就是Virtual Node的缩写,还原的时候只需要从头截取字符串到”&&”的位置就可以了。

下面来看一下带虚拟节点的一致性Hash算法的Java代码实现:

 1 /**
 2  * 带虚拟节点的一致性Hash算法
 3  * @author 五月的仓颉 http://www.cnblogs.com/xrq730/
 4  */
 5 public class ConsistentHashingWithVirtualNode
 6 {
 7     /**
 8      * 待添加入Hash环的服务器列表
 9      */
10     private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
11             "192.168.0.3:111", "192.168.0.4:111"};
12     
13     /**
14      * 真实结点列表,考虑到服务器上线、下线的场景,即添加、删除的场景会比较频繁,这里使用LinkedList会更好
15      */
16     private static List<String> realNodes = new LinkedList<String>();
17     
18     /**
19      * 虚拟节点,key表示虚拟节点的hash值,value表示虚拟节点的名称
20      */
21     private static SortedMap<Integer, String> virtualNodes = 
22             new TreeMap<Integer, String>();
23     
24     /**
25      * 虚拟节点的数目,这里写死,为了演示需要,一个真实结点对应5个虚拟节点
26      */
27     private static final int VIRTUAL_NODES = 5;
28     
29     static
30     {
31         // 先把原始的服务器添加到真实结点列表中
32         for (int i = 0; i < servers.length; i++)
33             realNodes.add(servers[i]);
34         
35         // 再添加虚拟节点,遍历LinkedList使用foreach循环效率会比较高
36         for (String str : realNodes)
37         {
38             for (int i = 0; i < VIRTUAL_NODES; i++)
39             {
40                 String virtualNodeName = str + "&&VN" + String.valueOf(i);
41                 int hash = getHash(virtualNodeName);
42                 System.out.println("虚拟节点[" + virtualNodeName + "]被添加, hash值为" + hash);
43                 virtualNodes.put(hash, virtualNodeName);
44             }
45         }
46         System.out.println();
47     }
48     
49     /**
50      * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别 
51      */
52     private static int getHash(String str)
53     {
54         final int p = 16777619;
55         int hash = (int)2166136261L;
56         for (int i = 0; i < str.length(); i++)
57             hash = (hash ^ str.charAt(i)) * p;
58         hash += hash << 13;
59         hash ^= hash >> 7;
60         hash += hash << 3;
61         hash ^= hash >> 17;
62         hash += hash << 5;
63         
64         // 如果算出来的值为负数则取其绝对值
65         if (hash < 0)
66             hash = Math.abs(hash);
67         return hash;
68     }
69     
70     /**
71      * 得到应当路由到的结点
72      */
73     private static String getServer(String node)
74     {
75         // 得到带路由的结点的Hash值
76         int hash = getHash(node);
77         // 得到大于该Hash值的所有Map
78         SortedMap<Integer, String> subMap = 
79                 virtualNodes.tailMap(hash);
80         // 第一个Key就是顺时针过去离node最近的那个结点
81         Integer i = subMap.firstKey();
82         // 返回对应的虚拟节点名称,这里字符串稍微截取一下
83         String virtualNode = subMap.get(i);
84         return virtualNode.substring(0, virtualNode.indexOf("&&"));
85     }
86     
87     public static void main(String[] args)
88     {
89         String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
90         for (int i = 0; i < nodes.length; i++)
91             System.out.println("[" + nodes[i] + "]的hash值为" + 
92                     getHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
93     }
94 }

关注一下运行结果:

虚拟节点[192.168.0.0:111&&VN0]被添加, hash值为1686427075
虚拟节点[192.168.0.0:111&&VN1]被添加, hash值为354859081
虚拟节点[192.168.0.0:111&&VN2]被添加, hash值为1306497370
虚拟节点[192.168.0.0:111&&VN3]被添加, hash值为817889914
虚拟节点[192.168.0.0:111&&VN4]被添加, hash值为396663629
虚拟节点[192.168.0.1:111&&VN0]被添加, hash值为1032739288
虚拟节点[192.168.0.1:111&&VN1]被添加, hash值为707592309
虚拟节点[192.168.0.1:111&&VN2]被添加, hash值为302114528
虚拟节点[192.168.0.1:111&&VN3]被添加, hash值为36526861
虚拟节点[192.168.0.1:111&&VN4]被添加, hash值为848442551
虚拟节点[192.168.0.2:111&&VN0]被添加, hash值为1452694222
虚拟节点[192.168.0.2:111&&VN1]被添加, hash值为2023612840
虚拟节点[192.168.0.2:111&&VN2]被添加, hash值为697907480
虚拟节点[192.168.0.2:111&&VN3]被添加, hash值为790847074
虚拟节点[192.168.0.2:111&&VN4]被添加, hash值为2010506136
虚拟节点[192.168.0.3:111&&VN0]被添加, hash值为891084251
虚拟节点[192.168.0.3:111&&VN1]被添加, hash值为1725031739
虚拟节点[192.168.0.3:111&&VN2]被添加, hash值为1127720370
虚拟节点[192.168.0.3:111&&VN3]被添加, hash值为676720500
虚拟节点[192.168.0.3:111&&VN4]被添加, hash值为2050578780
虚拟节点[192.168.0.4:111&&VN0]被添加, hash值为586921010
虚拟节点[192.168.0.4:111&&VN1]被添加, hash值为184078390
虚拟节点[192.168.0.4:111&&VN2]被添加, hash值为1331645117
虚拟节点[192.168.0.4:111&&VN3]被添加, hash值为918790803
虚拟节点[192.168.0.4:111&&VN4]被添加, hash值为1232193678

[127.0.0.1:1111]的hash值为380278925, 被路由到结点[192.168.0.0:111]
[221.226.0.1:2222]的hash值为1493545632, 被路由到结点[192.168.0.0:111]
[10.211.0.1:3333]的hash值为1393836017, 被路由到结点[192.168.0.2:111]

从代码运行结果看,每个点路由到的服务器都是Hash值顺时针离它最近的那个服务器节点,没有任何问题。

通过采取虚拟节点的方法,一个真实结点不再固定在Hash换上的某个点,而是大量地分布在整个Hash环上,这样即使上线、下线服务器,也不会造成整体的负载不均衡。

后记

在写本文的时候,很多知识我也是边写边学,难免有很多写得不好、理解得不透彻的地方,而且代码整体也比较糙,未有考虑到可能的各种情况。抛砖引玉,一方面,写得不对的地方,还望网友朋友们指正;另一方面,后续我也将通过自己的工作、学习不断完善上面的代码。

from:http://www.cnblogs.com/xrq730/p/5186728.html

系统负载能力浅析

一. 衡量指标

用什么来衡量一个系统的负载能力呢?有一个概念叫做每秒请求数(Requests per second),指的是每秒能够成功处理请求的数目。比如说,你可以配置tomcat服务器的maxConnection为无限大,但是受限于服务器系统或者硬件限制,很多请求是不会在一定的时间内得到响应的,这并不作为一个成功的请求,其中成功得到响应的请求数即为每秒请求数,反应出系统的负载能力。

通常的,对于一个系统,增加并发用户数量时每秒请求数量也会增加。然而,我们最终会达到这样一个点,此时并发用户数量开始“压倒”服务器。如果继续增加并发用户数量,每秒请求数量开始下降,而反应时间则会增加。这个并发用户数量开始“压倒”服务器的临界点非常重要,此时的并发用户数量可以认为是当前系统的最大负载能力。

二. 相关因素

一般的,和系统并发访问量相关的几个因素如下:

  • 带宽
  • 硬件配置
  • 系统配置
  • 应用服务器配置
  • 程序逻辑
  • 系统架构

其中,带宽和硬件配置是决定系统负载能力的决定性因素。这些只能依靠扩展和升级提高。我们需要重点关注的是在一定带宽和硬件配置的基础上,怎么使系统的负载能力达到最大。

2.1 带宽

毋庸置疑,带宽是决定系统负载能力的一个至关重要的因素,就好比水管一样,细的水管同一时间通过的水量自然就少(这个比喻解释带宽可能不是特别合适)。一个系统的带宽首先就决定了这个系统的负载能力,其单位为Mbps,表示数据的发送速度。

2.2 硬件配置

系统部署所在的服务器的硬件决定了一个系统的最大负载能力,也是上限。一般说来,以下几个配置起着关键作用:

  • cpu频率/核数:cpu频率关系着cpu的运算速度,核数则影响线程调度、资源分配的效率。
  • 内存大小以及速度:内存越大,那么可以在内存中运行的数据也就越大,速度自然而然就快;内存的速度从原来的几百hz到现在几千hz,决定了数据读取存储的速度。
  • 硬盘速度:传统的硬盘是使用磁头进行寻址的,io速度比较慢,使用了SSD的硬盘,其寻址速度大大较快。

很多系统的架构设计、系统优化,最终都会加上这么一句:使用ssd存储解决了这些问题。

可见,硬件配置是决定一个系统的负载能力的最关键因素。

2.3 系统配置

一般来说,目前后端系统都是部署在Linux主机上的。所以抛开win系列不谈,对于Linux系统来说一般有以下配置关系着系统的负载能力。

  • 文件描述符数限制:Linux中所有东西都是文件,一个socket就对应着一个文件描述符,因此系统配置的最大打开文件数以及单个进程能够打开的最大文件数就决定了socket的数目上限。
  • 进程/线程数限制: 对于apache使用的prefork等多进程模式,其负载能力由进程数目所限制。对tomcat多线程模式则由线程数所限制。
  • tcp内核参数:网络应用的底层自然离不开tcp/ip,Linux内核有一些与此相关的配置也决定了系统的负载能力。

2.3.1 文件描述符数限制

  • 系统最大打开文件描述符数:/proc/sys/fs/file-max中保存了这个数目,修改此值
    1
    2
    3
    4
    临时性:
     echo 1000000 > /proc/sys/fs/file-max
    永久性:
    在/etc/sysctl.conf中设置 fs.file-max = 1000000
  • 进程最大打开文件描述符数:这个是配单个进程能够打开的最大文件数目。可以通过ulimit -n查看/修改。如果想要永久修改,则需要修改/etc/security/limits.conf中的nofile。

通过读取/proc/sys/fs/file-nr可以看到当前使用的文件描述符总数。另外,对于文件描述符的配置,需要注意以下几点:

  • 所有进程打开的文件描述符数不能超过/proc/sys/fs/file-max
  • 单个进程打开的文件描述符数不能超过user limit中nofile的soft limit
  • nofile的soft limit不能超过其hard limit
  • nofile的hard limit不能超过/proc/sys/fs/nr_open

2.3.2 进程/线程数限制

  • 进程数限制:ulimit -u可以查看/修改单个用户能够打开的最大进程数。/etc/security/limits.conf中的noproc则是系统的最大进程数。
  • 线程数限制
    • 可以通过/proc/sys/kernel/threads-max查看系统总共可以打开的最大线程数。
    • 单个进程的最大线程数和PTHREAD_THREADS_MAX有关,此限制可以在/usr/include/bits/local_lim.h中查看,但是如果想要修改的话,需要重新编译。
    • 这里需要提到一点的是,Linux内核2.4的线程实现方式为linux threads,是轻量级进程,都会首先创建一个管理线程,线程数目的大小是受PTHREAD_THREADS_MAX影响的。但Linux2.6内核的线程实现方式为NPTL,是一个改进的LWP实现,最大一个区别就是,线程公用进程的pid(tgid),线程数目大小只受制于资源。
    • 线程数的大小还受线程栈大小的制约:使用ulimit -s可以查看/修改线程栈的大小,即每开启一个新的线程需要分配给此线程的一部分内存。减小此值可以增加可以打开的线程数目。

2.3.3 tcp内核参数

在一台服务器CPU和内存资源额定有限的情况下,最大的压榨服务器的性能,是最终的目的。在节省成本的情况下,可以考虑修改Linux的内核TCP/IP参数,来最大的压榨服务器的性能。如果通过修改内核参数也无法解决的负载问题,也只能考虑升级服务器了,这是硬件所限,没有办法的事。

1
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'

使用上面的命令,可以得到当前系统的各个状态的网络连接的数目。如下:

1
2
3
4
5
6
7
LAST_ACK 13
SYN_RECV 468
ESTABLISHED 90
FIN_WAIT1 259
FIN_WAIT2 40
CLOSING 34
TIME_WAIT 28322

这里,TIME_WAIT的连接数是需要注意的一点。此值过高会占用大量连接,影响系统的负载能力。需要调整参数,以尽快的释放time_wait连接。

一般tcp相关的内核参数在/etc/sysctl.conf文件中。为了能够尽快释放time_wait状态的连接,可以做以下配置:

  • net.ipv4.tcp_syncookies = 1 //表示开启SYN Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭;
  • net.ipv4.tcp_tw_reuse = 1 //表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭;
  • net.ipv4.tcp_tw_recycle = 1 //表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭;
  • net.ipv4.tcp_fin_timeout = 30 //修改系統默认的 TIMEOUT 时间。

这里需要注意的一点就是当打开了tcp_tw_recycle,就会检查时间戳,移动环境下的发来的包的时间戳有些时候是乱跳的,会把带了“倒退”的时间戳的包当作是“recycle的tw连接的重传数据,不是新的请求”,于是丢掉不回包,造成大量丢包。另外,当前面有LVS,并且采用的是NAT机制时,开启tcp_tw_recycle会造成一些异常,可见:http://www.pagefault.info/?p=416。如果这种情况下仍然需要开启此选项,那么可以考虑设置net.ipv4.tcp_timestamps=0,忽略掉报文的时间戳即可。

此外,还可以通过优化tcp/ip的可使用端口的范围,进一步提升负载能力。,如下:

  • net.ipv4.tcp_keepalive_time = 1200 //表示当keepalive起用的时候,TCP发送keepalive消息的频度。缺省是2小时,改为20分钟。
  • net.ipv4.ip_local_port_range = 10000 65000 //表示用于向外连接的端口范围。缺省情况下很小:32768到61000,改为10000到65000。(注意:这里不要将最低值设的太低,否则可能会占用掉正常的端口!)
  • net.ipv4.tcp_max_syn_backlog = 8192 //表示SYN队列的长度,默认为1024,加大队列长度为8192,可以容纳更多等待连接的网络连接数。
  • net.ipv4.tcp_max_tw_buckets = 5000 //表示系统同时保持TIME_WAIT的最大数量,如果超过这个数字,TIME_WAIT将立刻被清除并打印警告信息。默认为180000,改为5000。对于Apache、Nginx等服务器,上几行的参数可以很好地减少TIME_WAIT套接字数量,但是对于Squid,效果却不大。此项参数可以控制TIME_WAIT的最大数量,避免Squid服务器被大量的TIME_WAIT拖死。

2.4 应用服务器配置

说到应用服务器配置,这里需要提到应用服务器的几种工作模式,也叫并发策略。

  • multi process:多进程方式,一个进程处理一个请求。
  • prefork:类似于多进程的方式,但是会预先fork出一些进程供后续使用,是一种进程池的理念。
  • worker:一个线程对应一个请求,相比多进程的方式,消耗资源变少,但同时一个线程的崩溃会引起整个进程的崩溃,稳定性不如多进程。
  • master/worker:采用的是非阻塞IO的方式,只有两种进程:worker和master,master负责worker进程的创建、管理等,worker进程采用基于事件驱动的多路复用IO处理请求。mater进程只需要一个,woker进程根据cpu核数设置数目。

前三者是传统应用服务器apache和tomcat采用的方式,最后一种是nginx采用的方式。当然这里需要注意的是应用服务器和nginx这种做反向代理服务器(暂且忽略nginx+cgi做应用服务器的功能)的区别。应用服务器是需要处理应用逻辑的,有时候是耗cup资源的;而反向代理主要用作IO,是IO密集型的应用。使用事件驱动的这种网络模型,比较适合IO密集型应用,而并不适合CPU密集型应用。对于后者,多进程/线程则是一个更好地选择。

当然,由于nginx采用的基于事件驱动的多路IO复用的模型,其作为反向代理服务器时,可支持的并发是非常大的。淘宝tengine团队曾有一个测试结果是“24G内存机器上,处理并发请求可达200万”。

2.4.1 nginx/tengine

ngixn是目前使用最广泛的反向代理软件,而tengine是阿里开源的一个加强版nginx,其基本实现了nginx收费版本的一些功能,如:主动健康检查、session sticky等。对于nginx的配置,需要注意的有这么几点:

  • worker数目要和cpu(核)的数目相适应
  • keepalive timout要设置适当
  • worker_rlimit_nofile最大文件描述符要增大
  • upstream可以使用http 1.1的keepalive

典型配置可见:https://github.com/superhj1987/awesome-config/blob/master/nginx/nginx.conf

2.4.2 tomcat

tomcat的关键配置总体上有两大块:jvm参数配置和connector参数配置。

  • jvm参数配置:
    • 堆的最小值:Xms
    • 堆的最大值:Xmx
    • 新生代大小: Xmn
    • 永久代大小: XX:PermSize:
    • 永久代最大大小: XX:MaxPermSize:
    • 栈大小:-Xss或-XX:ThreadStackSize

    这里对于栈大小有一点需要注意的是:在Linux x64上ThreadStackSize的默认值就是1024KB,给Java线程创建栈会用这个参数指定的大小。如果把-Xss或者-XX:ThreadStackSize设为0,就是使用“系统默认值”。而在Linux x64上HotSpot VM给Java栈定义的“系统默认”大小也是1MB。所以普通Java线程的默认栈大小怎样都是1MB。这里有一个需要注意的地方就是java的栈大小和之前提到过的操作系统的操作系统栈大小(ulimit -s):这个配置只影响进程的初始线程;后续用pthread_create创建的线程都可以指定栈大小。HotSpot VM为了能精确控制Java线程的栈大小,特意不使用进程的初始线程(primordial thread)作为Java线程。

    其他还要根据业务场景,选择使用那种垃圾回收器,回收的策略。另外,当需要保留GC信息时,也需要做一些设置。

    典型配置可见:https://github.com/superhj1987/awesome-config/blob/master/tomcat/java_opts.conf

  • connector参数配置
    • protocol: 有三个选项:bio;nio;apr。建议使用apr选项,性能为最高。
    • connectionTimeout:连接的超时时间
    • maxThreads:最大线程数,此值限制了bio的最大连接数
    • minSpareThreads: 最大空闲线程数
    • acceptCount:可以接受的最大请求数目(未能得到处理的请求排队)
    • maxConnection: 使用nio或者apr时,最大连接数受此值影响。

    典型配置可见:https://github.com/superhj1987/awesome-config/blob/master/tomcat/connector.conf

    一般的当一个进程有500个线程在跑的话,那性能已经是很低很低了。Tomcat默认配置的最大请求数是150。当某个应用拥有250个以上并发的时候,应考虑应用服务器的集群。

    另外,并非是无限调大maxTreads和maxConnection就能无限调高并发能力的。线程越多,那么cpu花费在线程调度上的时间越多,同时,内存消耗也就越大,那么就极大影响处理用户的请求。受限于硬件资源,并发值是需要设置合适的值的。

对于tomcat这里有一个争论就是:使用大内存tomcat好还是多个小的tomcat集群好?(针对64位服务器以及tomcat来说)

其实,这个要根据业务场景区别对待的。通常,大内存tomcat有以下问题:

  • 一旦发生full gc,那么会非常耗时
  • 一旦gc,dump出的堆快照太大,无法分析

因此,如果可以保证一定程度上程序的对象大部分都是朝生夕死的,老年代不会发生gc,那么使用大内存tomcat也是可以的。但是在伸缩性和高可用却比不上使用小内存(相对来说)tomcat集群。

使用小内存tomcat集群则有以下优势:

  • 可以根据系统的负载调整tc的数量,以达到资源的最大利用率,
  • 可以防止单点故障。

2.4.3 数据库

mysql

mysql是目前最常用的关系型数据库,支持复杂的查询。但是其负载能力一般,很多时候一个系统的瓶颈就发生在mysql这一点,当然有时候也和sql语句的效率有关。比如,牵扯到联表的查询一般说来效率是不会太高的。

影响数据库性能的因素一般有以下几点:

  • 硬件配置:这个无需多说
  • 数据库设置:max_connection的一些配置会影响数据库的连接数
  • 数据表的设计:使用冗余字段避免联表查询;使用索引提高查询效率
  • 查询语句是否合理:这个牵扯到的是个人的编码素质。比如,查询符合某个条件的记录,我见过有人把记录全部查出来,再去逐条对比
  • 引擎的选择:myisam和innodb两者的适用场景不同,不存在绝对的优劣

抛开以上因素,当数据量单表突破千万甚至百万时(和具体的数据有关),需要对mysql数据库进行优化,一种常见的方案就是分表:

  • 垂直分表:在列维度的拆分
  • 水平分表:行维度的拆分

此外,对于数据库,可以使用读写分离的方式提高性能,尤其是对那种读频率远大于写频率的业务场景。这里一般采用master/slave的方式实现读写分离,前面用程序控制或者加一个proxy层。可以选择使用MySQL Proxy,编写lua脚本来实现基于proxy的mysql读写分离;也可以通过程序来控制,根据不同的sql语句选择相应的数据库来操作,这个也是笔者公司目前在用的方案。由于此方案和业务强绑定,是很难有一个通用的方案的,其中比较成熟的是阿里的TDDL,但是由于未全部开源且对其他组件有依赖性,不推荐使用。

现在很多大的公司对这些分表、主从分离、分布式都基于mysql做了自己的二次开发,形成了自己公司的一套分布式数据库系统。比如阿里的Cobar、网易的DDB、360的Atlas等。当然,很多大公司也研发了自己的mysql分支,比较出名的就是姜承尧带领研发的InNoSQL

redis

当然,对于系统中并发很高并且访问很频繁的数据,关系型数据库还是不能妥妥应对。这时候就需要缓存数据库出马以隔离对mysql的访问,防止mysql崩溃。

其中,redis是目前用的比较多的缓存数据库(当然,也有直接把redis当做数据库使用的)。redis是单线程基于内存的数据库,读写性能远远超过mysql。一般情况下,对redis做读写分离主从同步就可以应对大部分场景的应用。但是这样的方案缺少ha,尤其对于分布式应用,是不可接受的。目前,redis集群的实现方案有以下几个:

  • redis cluster:这是一种去中心化的方案,是redis的官方实现。是一种非常“重”的方案,已经不是Redis单实例的“简单、可依赖”了。目前应用案例还很少,貌似国内的芒果台用了,结局不知道如何。
  • twemproxy:这是twitter开源的redis和memcached的proxy方案。比较成熟,目前的应用案例比较多,但也有一些缺陷,尤其在运维方面。比如无法平滑的扩容/缩容,运维不友好等。
  • codis: 这个是豌豆荚开源的redis proxy方案,能够兼容twemproxy,并且对其做了很多改进。由豌豆荚于2014年11月开源,基于Go和C开发。现已广泛用于豌豆荚的各种Redis业务场景。现在比Twemproxy快近100%。目前据我所知除了豌豆荚之外,hulu也在使用这套方案。当然,其升级项目reborndb号称比codis还要厉害。

2.5 系统架构

影响性能的系统架构一般会有这几方面:

  • 负载均衡
  • 同步 or 异步
  • 28原则

2.5.1 负载均衡

负载均衡在服务端领域中是一个很关键的技术。可以分为以下两种:

  • 硬件负载均衡
  • 软件负载均衡

其中,硬件负载均衡的性能无疑是最优的,其中以F5为代表。但是,与高性能并存的是其成本的昂贵。所以对于很多初创公司来说,一般是选用软件负载均衡的方案。

软件负载均衡中又可以分为四层负载均衡和七层负载均衡。 上文在应用服务器配置部分讲了nginx的反向代理功能即七层的一种成熟解决方案,主要针对的是七层http协议(虽然最新的发布版本已经支持四层负载均衡)。对于四层负载均衡,目前应用最广泛的是lvs。其是阿里的章文嵩博士带领的团队所研发的一款linux下的负载均衡软件,本质上是基于iptables实现的。分为三种工作模式:

  • NAT: 修改数据包destination ip,in和out都要经过lvs。
  • DR:修改数据包mac地址,lvs和realserver需要在一个vlan。
  • IP TUUNEL:修改数据包destination ip和源ip,realserver需要支持ip tunnel协议。lvs和realserver不需要在一个vlan。

三种模式各有优缺点,目前还有阿里开源的一个FULL NAT是在NAT原来的DNAT上加入了SNAT的功能。

此外,haproxy也是一款常用的负载均衡软件。但限于对此使用较少,在此不做讲述。

2.5.2 同步 or 异步

对于一个系统,很多业务需要面对使用同步机制或者是异步机制的选择。比如,对于一篇帖子,一个用户对其分享后,需要记录用户的分享记录。如果你使用同步模式(分享的同时记录此行为),那么响应速度肯定会受到影响。而如果你考虑到分享过后,用户并不会立刻去查看自己的分享记录,牺牲这一点时效性,可以先完成分享的动作,然后异步记录此行为,会提高分享请求的响应速度(当然,这里可能会有事务准确性的问题)。有时候在某些业务逻辑上,在充分理解用户诉求的基础上,是可以牺牲某些特性来满足用户需求的。

这里值得一提的是,很多时候对于一个业务流程,是可以拆开划分为几个步骤的,然后有些步骤完全可以异步并发执行,能够极大提高处理速度。

2.5.3 28原则

对于一个系统,20%的功能会带来80%的流量。这就是28原则的意思,当然也是我自己的一种表述。因此在设计系统的时候,对于80%的功能,其面对的请求压力是很小的,是没有必要进行过度设计的。但是对于另外20%的功能则是需要设计再设计、reivew再review,能够做负载均衡就做负载均衡,能够缓存就缓存,能够做分布式就分布式,能够把流程拆开异步化就异步化。

当然,这个原则适用于生活中很多事物。

三. 一般架构

一般的Java后端系统应用架构如下图所示:LVS+Nginx+Tomcat+MySql/DDB+Redis/Codis

web-arch

其中,虚线部分是数据库层,采用的是主从模式。也可以使用redis cluster(codis等)以及mysql cluster(Cobar等)来替换。

from:http://www.rowkey.me/blog/2015/09/09/load-analysis/

Java中的纤程库 – Quasar

最近遇到的一个问题大概是微服务架构中经常会遇到的一个问题:

服务 A 是我们开发的系统,它的业务需要调用 BCD 等多个服务,这些服务是通过http的访问提供的。 问题是 BCD 这些服务都是第三方提供的,不能保证它们的响应时间,快的话十几毫秒,慢的话甚至1秒多,所以这些服务的Latency比较长。幸运地是这些服务都是集群部署的,容错率和并发支持都比较高,所以不担心它们的并发性能,唯一不爽的就是就是它们的Latency太高了。

简化的微服务架构简化的微服务架构

系统A会从Client接收Request, 每个Request的处理都需要多次调用B、C、D的服务,所以完成一个Request可能需要1到2秒的时间。为了让A能更好地支持并发数,系统中使用线程池处理这些Request。当然这是一个非常简化的模型,实际的业务处理比较复杂。

可以预见,因为系统B、C、D的延迟,导致整个业务处理都很慢,即使使用线程池,但是每个线程还是会阻塞在B、C、D的调用上,导致I/O阻塞了这些线程, CPU利用率相对来说不是那么高。

当然在测试的时候使用的是B、C、D的模拟器,没有预想到它们的响应是那么慢,因此测试数据的结果还不错,吞吐率还可以,但是在实际环境中问题就暴露出来了。

概述

最开始线程池设置的是200,然后用HttpUrlConnection作为http client发送请求到B、C、D。当然HttpUrlConnection也有一些坑,比如Persistent ConnectionsCaveats of HttpURLConnection,跳出坑后性能依然不行。

通过测试,如果B、C、D等服务延迟接近0毫秒,则HttpUrlConnection的吞吐率(线程池的大小为200)能到40000 requests/秒,但是随着第三方服务的响应时间变慢,它的吞吐率急剧下降,B、C、D的服务的延迟为100毫秒的时候,则HttpUrlConnection的吞吐率降到1800 requests/秒,而B、C、D的服务的延迟为100毫秒的时候HttpUrlConnection的吞吐率降到550 requests/秒。

增加http.maxConnections系统属性并不能显著增加吞吐率。

如果增加调用HttpUrlConnection的线程池的大小,比如增加到2000,性能会好一些,但是B、C、D的服务的延迟为500毫秒的时候,吞吐率为3800 requests/秒,延迟为1秒的时候,吞吐率为1900 requests/秒。

虽然线程池的增大能带来性能的提升,但是线程池也不能无限制的增大,因为每个线程都会占用一定的资源,而且随着线程的增多,线程之间的切换也更加的频繁,对CPU等资源也是一种浪费。

切换成netty(channel pool),与B、C、D通讯的性能还不错, latency为500ms的时候吞吐率能达到10000 requests/秒,通讯不成问题,问题是需要将业务代码改成异步的方式,异步地接收到这些response后在一个线程池中处理这些消息。

下面列出了一些常用的http client:

  • JDK’s URLConnection uses traditional thread-blocking I/O.
  • Apache HTTP Client uses traditional thread-blocking I/O with thread-pools.
  • Apache Async HTTP Client uses NIO.
  • Jersey is a ReST client/server framework; the client API can use several HTTP client backends including URLConnection and Apache HTTP Client.
  • OkHttp uses traditional thread-blocking I/O with thread-pools.
  • Retrofit turns your HTTP API into a Java interface and can use several HTTP client backends including Apache HTTP Client.
  • Grizzly is network framework with low-level HTTP support; it was using NIO but it switched to AIO .
  • Netty is a network framework with HTTP support (low-level), multi-transport, includes NIO and native (the latter uses epoll on Linux).
  • Jetty Async HTTP Client uses NIO.
  • Async HTTP Client wraps either Netty, Grizzly or JDK’s HTTP support.
  • clj-http wraps the Apache HTTP Client.
  • http-kit is an async subset of clj-http implemented partially in Java directly on top of NIO.
  • http async client wraps the Async HTTP Client for Java.

这个列表摘自 High-Concurrency HTTP Clients on the JVM,不止于此,这篇文章重点介绍基于java纤程库quasar的实现的http client库,并比较了性能。我们待会再说。

回到我前面所说的系统,如何能更好的提供性能?有一种方案是借助其它语言的优势,比如Go,让Go来代理完成和B、C、D的请求,系统A通过一个TCP连接与Go程序交流。第三方服务B、C、D的Response结果可以异步地返回给系统A。

Go的优势在于可以实现request-per-goroutine,整个系统中可以有成千上万个goroutine。 goroutine是轻量级的,而且在I/O阻塞的时候可以不占用线程,这让Go可以轻松地处理上万个链接,即使I/O阻塞也没问题。Go和Java之间的通讯协议可以通过Protobuffer来实现,而且它们之间只保留一个TCP连接即可。

当然这种架构的修改带来系统稳定性的降低,服务A和服务B、C、D之间的通讯增加了复杂性。同时,因为是异步方式,服务A的业务也要实现异步方式,否则200个线程依然等待Response的话,还是一个阻塞的架构。

通过测试,这种架构可以带来稳定的吞吐率。 不管服务B、C、D的延迟有多久,A的吞吐率能维持15000 requests/秒。当然Go到B、C、D的并发连接数也有限制,我把最大值调高到20000。

这种曲折的方案的最大的两个弊病就是架构的复杂性以及对原有系统需要进行大的重构。 高复杂性带来的是系统的稳定性的降低,包括部署、维护、网络状况、系统资源等。同时系统要改成异步模型,因为系统业务线程发送Request后不能等待Go返回Response,它需要从Client接收更多的Request,而收到Response之后它才继续执行剩下的业务,只有这样才不会阻塞,进而提到系统的吞吐率。

将系统A改成异步,然后使用HttpUrlConnection线程池行不行?
HttpUrlConnection线程池还是导致和B、C、D通讯的吞吐率下降,但是Go这种方案和B、C、D通讯的吞吐率可以维持一个较高的水平。

考虑到Go的优势,那么能不能在Java中使用类似Go的这种goroutine模型呢?那就是本文要介绍的Java纤程库: [Quasar](http://docs.paralleluniverse.co/quasar/)。

实际测试结果表明Go和Netty都是两种比较好的解决方案,而且Netty的性能惊人的好,不好的地方正如前面所讲,我们需要将代码改成异步的处理。线程池中的业务单元用Netty发送完Request之后,不要等待Response, Response的处理交给另外的线程来处理,同时注意不要在Netty的Handler里面处理业务逻辑。要解决的问题就变成如何更高效的处理Response了,而不是第三方系统阻塞的问题。

quasar初步

以下介绍Java的另一个解决方案,也就是Java中的coroutine库,因为最近刚刚看这个库,感觉挺不错的,而且用它替换Thread改动较少。

Java官方并没有纤程库。但是伟大的社区提供了一个优秀的库,它就是Quasar。

创始人是Ron Pressler和Dafna Pressler,由Y Combinator孵化。

Quasar is a library that provides high-performance lightweight threads, Go-like channels, Erlang-like actors, and other asynchronous programming tools for Java and Kotlin.

Quasar提供了高性能轻量级的线程,提供了类似Go的channel,Erlang风格的actor,以及其它的异步编程的工具,可以用在Java和Kotlin编程语言中。Scala目前的支持还不完善,我想如果这个公司能快速的发展壮大,或者被一些大公司收购的话,对Scala的支持才能提上日程。

你需要把下面的包加入到你的依赖中:

  • Core (必须) co.paralleluniverse:quasar-core:0.7.5[:jdk8] (对于 JDK 8,需要增加jdk8 classifier)
  • Actor co.paralleluniverse:quasar-actors:0.7.5
  • Clustering co.paralleluniverse:quasar-galaxy:0.7.5
  • Reactive Stream co.paralleluniverse:quasar-reactive-streams:0.7.5
  • Kotlin co.paralleluniverse:quasar-kotlin:0.7.5

Quasar fiber依赖java instrumentation修改你的代码,可以在运行时通过java Agent实现,也可以在编译时使用ant task实现。

通过java agent很简单,在程序启动的时候将下面的指令加入到命令行:

1
-javaagent:path-to-quasar-jar.jar

对于maven来说,你可以使用插件maven-dependency-plugin,它会为你的每个依赖设置一个属性,以便在其它地方引用,我们主要想使用 ${co.paralleluniverse:quasar-core:jar}:

1
2
3
4
5
6
7
8
9
10
11
12
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>

然后你可以配置exec-maven-plugin或者maven-surefire-plugin加上agent参数,在执行maven任务的时候久可以使用Quasar了。

官方提供了一个Quasar Maven archetype,你可以通过下面的命令生成一个quasar应用原型:

1
2
3
4
5
6
7
8
git clone https://github.com/puniverse/quasar-mvn-archetype
cd quasar-mvn-archetype
mvn install
cd ..
mvn archetype:generate -DarchetypeGroupId=co.paralleluniverse -DarchetypeArtifactId=quasar-mvn-archetype -DarchetypeVersion=0.7.4 -DgroupId=testgrp -DartifactId=testprj
cd testprj
mvn test
mvn clean compile dependency:properties exec:exec

如果你使用gradle,可以看一下gradle项目模版:Quasar Gradle template project

最容易使用Quasar的方案就是使用Java Agent,它可以在运行时instrument程序。如果你想编译的时候就使用AOT instrumentation(Ahead-of-Time),可以使用Ant任务co.paralleluniverse.fibers.instrument.InstrumentationTask,它包含在quasar-core.jar中。

Quasar最主要的贡献就是提供了轻量级线程的实现,叫做fiber(纤程)。Fiber的功能和使用类似Thread, API接口也类似,所以使用起来没有违和感,但是它们不是被操作系统管理的,它们是由一个或者多个ForkJoinPool调度。一个idle fiber只占用400K内存,切换的时候占用更少的CPU,你的应用中可以有上百万的fiber,显然Thread做不到这一点。这一点和Go的goroutine类似。

Fiber并不意味着它可以在所有的场景中都可以替换Thread。当fiber的代码经常会被等待其它fiber阻塞的时候,就应该使用fiber。
对于那些需要CPU长时间计算的代码,很少遇到阻塞的时候,就应该首选thread

以上两条是选择fiber还是thread的判断条件,主要还是看任务是I/O blocking相关还是CPU相关。幸运地是,fiber API使用和thread使用类似,所以代码略微修改久就可以兼容。

Fiber特别适合替换哪些异步回调的代码。使用FiberAsync异步回调很简单,而且性能很好,扩展性也更高。

类似Thread, fiber也是用Fiber类表示:

1
2
3
4
5
6
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();

与Thread类似,但也有些不同。Fiber可以有一个返回值,类型为泛型V,也可以为空Void。run也可以抛出异常InterruptedException

你可以传递SuspendableRunnableSuspendableCallable 给Fiber的构造函数:

1
2
3
4
5
new Fiber<Void>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();

甚至你可以调用Fiber的join方法等待它完成,调用get方法得到它的结果。

Fiber继承Strand类。Strand类代表一个Fiber或者Thread,提供了一些底层的方法。

逃逸的Fiber(Runaway Fiber)是指那些陷入循环而没有block、或者block fiber本身运行的线程的Fiber。偶尔有逃逸的fiber没有问题,但是太频繁会导致性能的下降,因为需要调度器的线程可能都忙于逃逸fiber了。Quasar会监控这些逃逸fiber,你可以通过JMX监控。如果你不想监控,可以设置系统属性co.paralleluniverse.fibers.detectRunawayFibersfalse

fiber中的ThreadLocal是fiber local的。InheritableThreadLocal继承父fiber的值。

Fiber、SuspendableRunnable 、SuspendableCallable 的run方法会抛出SuspendExecution异常。但这并不是真正意义的异常,而是fiber内部工作的机制,通过这个异常暂停因block而需要暂停的fiber。

任何在Fiber中运行的方法,需要声明这个异常(或者标记@Suspendable),都被称为suspendable method。

反射调用通常都被认为是suspendable, Java8 lambda 也被认为是suspendable。不应该将类构造函数或类初始化器标记为suspendable。

synchronized语句块或者方法会阻塞操作系统线程,所以它们不应该标记为suspendable。Blocking线程调用默认也不被quasar允许。但是这两种情况都可以被quasar处理,你需要在Quasar javaagent中分别加上mb参数,或者ant任务中加上allowMonitorsallowBlocking属性。

quasar原理

Quasar最初fork自Continuations Library

如果你了解其它语言的coroutine, 比如Lua,你久比较容易理解quasar的fiber了。 Fiber实质上是 continuation, continuation可以捕获一个计算的状态,可以暂停当前的计算,等隔一段时间可以继续执行。Quasar通过instrument修改suspendable方法。Quasar的调度器使用ForkJoinPool调度这些fiber。

Fiber调度器FiberScheduler是一个高效的、work-stealing、多线程的调度器。

默认的调度器是FiberForkJoinScheduler,但是你可以使用自己的线程池去调度,请参考FiberExecutorScheduler

当一个类被加载时,Quasar的instrumentation模块 (使用 Java agent时) 搜索suspendable 方法。每一个suspendable 方法 f通过下面的方式 instrument:
它搜索对其它suspendable方法的调用。对suspendable方法g的调用,一些代码会在这个调用g的前后被插入,它们会保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,我们会发现对Fiber.park的调用。park暂停这个fiber,扔出 SuspendExecution异常。

g block的时候,SuspendExecution异常会被Fiber捕获。 当Fiber被唤醒(使用unpark), 方法f会被调用, 执行记录显示它被block在g的调用上,所以程序会立即跳到f调用g的那一行,然后调用它。最终我们会到达暂停点,然后继续执行。当g返回时, f中插入的代码会恢复f的本地变量。

过程听起来很复杂,但是它只会带来3% ~ 5%的性能的损失。

下面看一个简单的例子, 方法m2声明抛出SuspendExecution异常,方法m1调用m2和m3,所以也声明抛出这个异常,最后这个异常会被Fiber所捕获:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = “m1”;
System.out.println(“m1 begin”);
m = m2();
m = m3();
System.out.println(“m1 end”);
System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return “m2”;
}
static String m3() throws SuspendExecution, InterruptedException {
return “m3”;
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber<Void>(“Caller”, new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
m1();
}
}).start();
}
}

反编译这段代码 (一般的反编译软件如jd-gui不能把这段代码反编译java文件,Procyon虽然能反编译,但是感觉反编译有错。所以我们还是看字节码吧):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}

这段反编译的代码显示了方法m被instrument后的样子,虽然我们不能很清楚的看到代码执行的样子,但是也可以大概地看到它实际在方法的最开始加入了此方法的栈信息的检查(#0 ~ #49,如果是第一次运行这个方法,则直接运行,
然后在一些暂停点上加上一些栈压入的处理,并且可以在下次执行的时候直接跳到上次的暂停点上。

官方的工程师关于Quasar的instrument操作如下:

  • Fully analyze the bytecode to find all the calls into suspendable methods. A method that (potentially) calls into other suspendable methods is itself considered suspendable, transitively.
  • Inject minimal bytecode in suspendable methods (and only them) that will manage an user-mode stack, in the following places:
    • At the beginning we’ll check if we’re resuming the fiber and only in this case we’ll jump into the relevant bytecode index.
    • Before a call into another suspendable method we’ll push a snapshot of the current activation frame, including the resume bytecode index; we can do it because we know the structure statically from the analysis phase.
    • After a call into another suspendable method we’ll pop the top activation frame and, if resumed, we’ll restore it in the current fiber.

我并没有更深入的去了解Quasar的实现细节以及调度算法,有兴趣的读者可以翻翻它的代码。如果你有更深入的剖析,请留下相关的地址,以便我加到参考文档中。

曾经, 陆陆续续也有一些Java coroutine的实现(coroutine-libraries), 但是目前来说最好的应该还是Quasar。

Oracle会实现一个官方的纤程库吗?目前来说没有看到这方面的计划,而且从Java的开发进度上来看,这个特性可能是遥遥无期的,所以目前还只能借助社区的力量,从第三方库如Quasar中寻找解决方案。

更多的Quasar知识,比如Channel、Actor、Reactive Stream 的使用可以参考官方的文档,官方也提供了多个例子

Comsat介绍

Comsat又是什么?

Comsat还是Parallel Universe提供的集成Quasar的一套开源库,可以提供web或者企业级的技术,如HTTP服务和数据库访问。

Comsat并不是一套web框架。它并不提供新的API,只是为现有的技术如Servlet、JAX-RS、JDBC等提供Quasar fiber的集成。

它包含非常多的库,比如Spring、ApacheHttpClient、OkHttp、Undertow、Netty、Kafka等。

性能对比

刘小溪在CSDN上写了一篇关于Quasar的文章:次时代Java编程(一):Java里的协程,写的挺好,建议读者读一读。

它参考Skynet的测试写了代码进行对比,这个测试是并发执行整数的累加:
测试结果是Golang花了261毫秒,Quasar花了612毫秒。其实结果还不错,但是文中指出这个测试没有发挥Quasar的性能。因为quasar的性能主要在于阻塞代码的调度上。
虽然文中加入了排序的功能,显示Java要比Golang要好,但是我觉得这又陷入了另外一种错误的比较, Java的排序算法使用TimSort,排序效果相当好,Go的排序效果显然比不上Java的实现,所以最后的测试主要测试排序算法上。 真正要体现Quasar的性能还是测试在有阻塞的情况下fiber的调度性能。

HttpClient

话题扯的越来越远了,拉回来。我最初的目的是要解决的是在第三方服务响应慢的情况下提高系统 A 的吞吐率。最初A是使用200个线程处理业务逻辑,调用第三方服务。因为线程总是被第三方服务阻塞,所以系统A的吞吐率总是很低。

虽然使用Go可以解决这个问题,但是对于系统A的改造比较大,还增加了系统的复杂性。Netty性能好,改动量还可以接受,但是不妨看一下这个场景,系统的问题是由http阻塞引起。

这正是Quasar fiber适合的场景,如果一个Fiber被阻塞,它可以暂时放弃线程,以便线程可以用来执行其它的Fiber。虽然整个集成系统的吞吐率依然很低,这是无法避免的,但是系统的吞吐率确很高。

Comsat提供了Apache Http Client的实现: FiberHttpClientBuilder

1
2
3
4
final CloseableHttpClient client = FiberHttpClientBuilder.
create(2). // use 2 io threads
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).build();

然后在Fiber中久可以调用:

1
String response = client.execute(new HttpGet(“http://localhost:8080”), BASIC_RESPONSE_HANDLER);

你也可以使用异步的HttpClient:

1
2
3
4
5
6
final CloseableHttpAsyncClient client = FiberCloseableHttpAsyncClient.wrap(HttpAsyncClients.
custom().
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).
build());
client.start();

Comsat还提供了Jersey Http Client: AsyncClientBuilder.newClient()

甚至提供了RetrofitOkHttp的实现。

经过测试,虽然随着系统B、C、D的响应时间的拉长,吞吐率有所降低,但是在latency为100毫秒的时候吞吐率依然能达到9900 requests/秒,可以满足我们的需求,而我们的代码改动的比较小。

综上所述,如果想彻底改造系统A,则可以使用Go库重写,或者使用Netty + Rx的方式去处理,都能达到比较好的效果。如果想改动比较小,可以考虑使用quasar替换线程对代码进行维护。

我希望本文不要给读者造成误解,以为Java NIO/Selector这种方式不能解决本文的问题,也就是第三方阻塞的问题。 事实上Java NIO也正是适合解决这样的问题, 比如Netty性能就不错,但是你需要小心的是, 不要让你的这个client对外又变成阻塞的方式,而是程序应该异步的去发送request和处理response。当然本文重点不是介绍这种实现,而是介绍Java的线程库,它可以改造传统的代码,即使有阻塞,也只是阻塞Fiber,而不是阻塞线程,这是另一个解决问题的思路。

另一篇关于Quasar的文档: 继续了解Java的纤程库 – Quasar

参考文档

from:http://colobu.com/2016/07/14/Java-Fiber-Quasar/