Category Archives: JAVA

为 Java 程序员准备的 Go 入门 PPT

这是 Google 的 Go 团队技术主管经理 Sameer Ajmani 分享的 PPT,为 Java 程序员快速入门 Go 而准备的。

视频

这个 PPT 是 2015年4月23日在 NYJavaSIG 中使用的。

前往 YouTube 观看视频

主要内容

1. Go 是什么,谁在使用 Go?
2. 比较 Go 和 Java
3. 代码示例
4. 并发
5. 工具

Go 是什么?

“Go 是开源的编程语言,可以很简单的构建简单,可靠和高效的软件。”

golang.org

Go 的历史

从 2007 后半年开始设计

  • Robert Griesemer, Rob Pike 和 Ken Thompson.
  • Ian Lance Taylor 和 Russ Cox

从 2009 年开始开源,有一个非常活跃的社区。

Go 语言稳定版本 Go 1 是在 2012 年早期发布的。

为什么有 Go?

Go 是解决 Google 规模的一个解决方案。

系统规模

  • 规划的规模为 10⁶⁺ 台机器
  • 每天在几千台机器上作业
  • 在系统中与其他作业进行协作,交互
  • 同一时间进行大量工作

解决方案:对并发的支持非常强大

第二个问题:工程规模

在 2011 年

  • 跨 40+ 办公室的 5000+ 名开发者
  • 每分钟有 20+ 修改
  • 每个月修改 50% 的代码基础库
  • 每天执行 5千万的测试用例
  • 单个代码树

解决方案:为大型代码基础库而设计的语言

谁在 Google 使用 Go?

大量的项目,几千位 Go 程序员,百万行的 Go 代码。

公开的例子:

  • 移动设备的 Chrome SPDY 代理
  • Chrome, ChromeOS, Android SDK, Earth 等等的下载服务器
  • YouTube Vitess MySQL 均衡器

主要任务是网络服务器,但是这是通用的语言。

除了 Google 还有谁在使用 Go?

golang.org/wiki/GoUsers

Apcera, Bitbucket, bitly, Canonical, CloudFlare, Core OS, Digital
Ocean, Docker, Dropbox, Facebook, Getty Images, GitHub, Heroku, Iron.io,
Kubernetes, Medium, MongoDB services, Mozilla services, New York Times,
pool.ntp.org, Secret, SmugMug, SoundCloud, Stripe, Square, Thomson
Reuters, Tumblr, …

比较 Go 和 Java

Go 和 Java 有很多共同之处

  • C 系列 (强类型,括号)
  • 静态类型
  • 垃圾收集
  • 内存安全 (nil 引用,运行时边界检查)
  • 变量总是初始化 (zero/nil/false)
  • 方法
  • 接口
  • 类型断言 (实例)
  • 反射

Go 与 Java 的不同之处

  • 代码程序直接编译成机器码,没有 VM
  • 静态链接二进制
  • 内存布局控制
  • 函数值和词法闭包
  • 内置字符串 (UTF-8)
  • 内置泛型映射和数组/片段
  • 内置并发

Go 特意去掉了大量的特性

  • 没有类
  • 没有构造器
  • 没有继承
  • 没有 final
  • 没有异常
  • 没有注解
  • 没有自定义泛型

为什么 Go 要省去那些特性?

代码清晰明了是首要的

当查看代码时,可以很清晰的知道程序将会做什么

当编写代码的时候,也可以很清晰的让程序做你想做的

有时候这意味着编写出一个循环而不是调用一个模糊的函数。

(不要变的太枯燥)

详细的设计背景请看:

示例

Java程序猿对Go应该很眼熟

Main.java

public class Main {
    public static void main(String[] args) {
        System.out.println("Hello, world!");
    }
}

hello.go

package main
import "fmt"
func main() {
    fmt.Println("Hello, 世界!")
}

Hello, web server(你好,web服务)

package main

import (
    "fmt"
    "log"
    "net/http"
)
func main() {
    http.HandleFunc("/hello", handleHello)
    fmt.Println("serving on http://localhost:7777/hello")
    log.Fatal(http.ListenAndServe("localhost:7777", nil))
}
func handleHello(w http.ResponseWriter, req *http.Request) {
    log.Println("serving", req.URL)
    fmt.Fprintln(w, "Hello, 世界!")
}

(访问权限)类型根据变量名来声明。
公共变量名首字大写,私有变量首字母小写。

示例:Google搜索前端

func main() {
    http.HandleFunc("/search", handleSearch)
    fmt.Println("serving on http://localhost:8080/search")
    log.Fatal(http.ListenAndServe("localhost:8080", nil))
}
// handleSearch handles URLs like "/search?q=golang" by running a
// Google search for "golang" and writing the results as HTML to w.
func handleSearch(w http.ResponseWriter, req *http.Request) {

请求验证

func handleSearch(w http.ResponseWriter, req *http.Request) {
    log.Println("serving", req.URL)
    // Check the search query.
    query := req.FormValue("q")
    if query == "" {
        http.Error(w, `missing "q" URL parameter`, http.StatusBadRequest)
        return
    }

FormValueis 是 *http.Request 的一个方法:

package http
type Request struct {...}
func (r *Request) FormValue(key string) string {...}

query := req.FormValue(“q”)初始化变量query,其变量类型是右边表达式的结果,这里是string类型.

取搜索结果

 // Run the Google search.
    start := time.Now()
    results, err := Search(query)
    elapsed := time.Since(start)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

Search方法有两个返回值,分别为结果results和错误error.

func Search(query string) ([]Result, error) {...}

当error的值为nil时,results有效。

type error interface {
    Error() string // a useful human-readable error message
}

Error类型可能包含额外的信息,可通过断言访问。

渲染搜索结果

// Render the results.
    type templateData struct {
        Results []Result
        Elapsed time.Duration
    }
    if err := resultsTemplate.Execute(w, templateData{
        Results: results,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }

结果results使用Template.Execute生成HTML,并存入一个io.Writer:

type Writer interface {
        Write(p []byte) (n int, err error)
}

http.ResponseWriter实现了io.Writer接口。

Go变量操作HTML模板

// A Result contains the title and URL of a search result.
type Result struct {
    Title, URL string
}
var resultsTemplate = template.Must(template.New("results").Parse(`
<html>
<head/>
<body>
  <ol>
  {{range .Results}}
    <li>{{.Title}} - <a href="{{.URL}}">{{.URL}}</a></li>
  {{end}}
  </ol>
  <p>{{len .Results}} results in {{.Elapsed}}</p>
</body>
</html>
`))

请求Google搜索API

func Search(query string) ([]Result, error) {
    // Prepare the Google Search API request.
    u, err := url.Parse("https://ajax.googleapis.com/ajax/services/search/web?v=1.0")
    if err != nil {
        return nil, err
    }
    q := u.Query()
    q.Set("q", query)
    u.RawQuery = q.Encode()
    // Issue the HTTP request and handle the response.
    resp, err := http.Get(u.String())
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

defer声明使resp.Body.Close运行在Search方法返回时。

解析返回的JSON数据到Go struct类型

developers.google.com/web-search/docs/#fonje

  var jsonResponse struct {
        ResponseData struct {
            Results []struct {
                TitleNoFormatting, URL string
            }
        }
    }
    if err := json.NewDecoder(resp.Body).Decode(&jsonResponse); err != nil {
        return nil, err
    }
    // Extract the Results from jsonResponse and return them.
    var results []Result
    for _, r := range jsonResponse.ResponseData.Results {
        results = append(results, Result{Title: r.TitleNoFormatting, URL: r.URL})
    }
    return results, nil
}

这就是它的前端

所有引用的包都来自标准库:

import (
    "encoding/json"
    "fmt"
    "html/template"
    "log"
    "net/http"
    "net/url"
    "time"
)

Go服务器规模:每一个请求都运行在自己的goroutine里。

让我们谈谈并发。

通信顺序进程(Hoare,1978)

并发程序作为独立进程,通过信息交流的顺序执行。

顺序执行很容易理解,异步则不是。

“不要为共亨内存通信,为通信共享内存。”

Go原理: goroutines, channels, 和 select声明.

Goroutines

Goroutines 就像轻量级线程。

它们通过小栈(tiny stacks)和按需调整运行。

Go 程序可以拥有成千上万个(goroutines)实例

使用go声明启动一个goroutines:

go f(args)

Go运行时把goroutines放进OS线程里。

不要使用线程堵塞goroutines。

Channels

Channels被定义是为了与goroutines之间通信。

c := make(chan string)

// goroutine 1
c <- "hello!"

// goroutine 2
s := <-c
fmt.Println(s) // "hello!"

Select

select声明一个语句块来判断执行。

select {
case n := <-in:
  fmt.Println("received", n)
case out <- v:
  fmt.Println("sent", v)
}

只有条件成立的case块会运行。

示例:Google搜索(后端)

问: Google搜索能做些什么?

答: 提出一个问题,它可以返回一个搜索结果的页面(和一些广告)。

问: 我们怎么得到这些搜索结果?

答: 发送一个问题到网页搜索、图片搜索、YouTube(视频)、地图、新闻,稍等然后检索出结果。

我们该怎么实现它?

Google搜索 : 一个假的框架

We can simulate a Search function with a random timeout up to 100ms.

我们要模拟一个搜索函数,让它随机超时0到100毫秒。

var (
    Web   = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
    return func(query string) Result {
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
}

Google搜索: 测试框架

func main() {
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

Google搜索 (串行)

Google函数获取一个查询,然后返回一个的结果集 (不一定是字符串).

Google按顺序调用Web(网页)、Image(图片)、Video(视频)并将返回加入到结果集中。

func Google(query string) (results []Result) {
    results = append(results, Web(query))
    results = append(results, Image(query))
    results = append(results, Video(query))
    return
}

Google搜索(并行)

同时执行 Web,、Image、 和Video搜索,并等待所有结果。

func方法是在query和c的地方关闭的。

func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()
    for i := 0; i < 3; i++ {
        result := <-c
        results = append(results, result)
    }
    return
}

Google搜索 (超时)

等待慢的服务器。

没有锁,没有条件变量,没有返回值。

    c := make(chan Result, 3)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

防止超时

问: 如何防止丢掉慢的服务的结果?

答: 复制这个服务,然后发送请求到多个复制的服务,并使用第一个响应的结果。

func First(query string, replicas ...Search) Result {
    c := make(chan Result, len(replicas))
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

使用First函数

func main() {
    start := time.Now()
    result := First("golang",
        fakeSearch("replica 1"),
        fakeSearch("replica 2"))
    elapsed := time.Since(start)
    fmt.Println(result)
    fmt.Println(elapsed)
}

Google搜索 (复制)

使用复制的服务以减少多余延迟。

    c := make(chan Result, 3)
    go func() { c <- First(query, Web1, Web2) }()
    go func() { c <- First(query, Image1, Image2) }()
    go func() { c <- First(query, Video1, Video2) }()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

其他

没有锁,没有条件变量,没有调用。

总结

经过一些简单转换,我们使用 Go 的并发原语来转换一个

  • 顺序性的
  • 故障敏感的

程序为一个

  • 并发
  • 可复用的
  • 健壮的

工具

Go 有很多强大的工具

  • gofmt 和 goimports
  • The go tool
  • godoc
  • IDE 和编辑器支持

这语言就是为工具链设计的。

gofmt 和 goimports

Gofmt 可以自动格式化代码,没有选项。

Goimports 基于你的工作空间更新导入声明

大部分人可以安全的使用这些工具。

play.golang.org/p/GPqra77cBK

The go tool

The go tool 可以在一个传统目录布局中用源代码构建 Go 程序。不需要 Makefiles 或者其他配置。

匹配这些工具及其依赖,然后进行构建,安装:

% go get golang.org/x/tools/cmd/present

运行:

% present

godoc

为世界上所有的开源 Go 代码生成文档:

godoc.org

IDE 和编辑器支持

Eclipse, IntelliJ, emacs, vim 等等:

  • gofmt
  • goimports
  • godoclookups
  • code completion
  • code navigation

但是没有 “Go IDE”.

Go 工具无处不在。

Go 的下一步计划

Go 路线在线查看

tour.golang.org

大量的学习资料

golang.org/wiki/Learn

完美的社区

golang.org/project

Thank you

Sameer Ajmani

Tech Lead Manager, Go team

Google

@Sajma

sameer@golang.org

from:http://www.oschina.net/translate/go-for-java-programmers-ppt

一致性Hash算法(Java实现)

一致性Hash算法

关于一致性Hash算法,在我之前的博文中已经有多次提到了,MemCache超详细解读一文中”一致性Hash算法”部分,对于为什么要使用一致性Hash算法、一致性Hash算法的算法原理做了详细的解读。

算法的具体原理这里再次贴上:

先构造一个长度为232的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 232-1])将服务器节点放置在这个Hash环上,然后根据数据的Key值计算得到其Hash值(其分布也为[0, 232-1]),接着在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。

这种算法解决了普通余数Hash算法伸缩性差的问题,可以保证在上线、下线服务器的情况下尽量有多的请求命中原来路由到的服务器。

当然,万事不可能十全十美,一致性Hash算法比普通的余数Hash算法更具有伸缩性,但是同时其算法实现也更为复杂,本文就来研究一下,如何利用Java代码实现一致性Hash算法。在开始之前,先对一致性Hash算法中的几个核心问题进行一些探究。

 

数据结构的选取

一致性Hash算法最先要考虑的一个问题是:构造出一个长度为232的整数环,根据节点名称的Hash值将服务器节点放置在这个Hash环上。

那么,整数环应该使用何种数据结构,才能使得运行时的时间复杂度最低?首先说明一点,关于时间复杂度,常见的时间复杂度与时间效率的关系有如下的经验规则:

O(1) < O(log2N) < O(n) < O(N * log2N) < O(N2) < O(N3) < 2N < 3N < N!

一般来说,前四个效率比较高,中间两个差强人意,后三个比较差(只要N比较大,这个算法就动不了了)。OK,继续前面的话题,应该如何选取数据结构,我认为有以下几种可行的解决方案。

1、解决方案一:排序+List

我想到的第一种思路是:算出所有待加入数据结构的节点名称的Hash值放入一个数组中,然后使用某种排序算法将其从小到大进行排序,最后将排序后的数据放入List中,采用List而不是数组是为了结点的扩展考虑。

之后,待路由的结点,只需要在List中找到第一个Hash值比它大的服务器节点就可以了,比如服务器节点的Hash值是[0,2,4,6,8,10],带路由的结点是7,只需要找到第一个比7大的整数,也就是8,就是我们最终需要路由过去的服务器节点。

如果暂时不考虑前面的排序,那么这种解决方案的时间复杂度:

(1)最好的情况是第一次就找到,时间复杂度为O(1)

(2)最坏的情况是最后一次才找到,时间复杂度为O(N)

平均下来时间复杂度为O(0.5N+0.5),忽略首项系数和常数,时间复杂度为O(N)。

但是如果考虑到之前的排序,我在网上找了张图,提供了各种排序算法的时间复杂度:

看得出来,排序算法要么稳定但是时间复杂度高、要么时间复杂度低但不稳定,看起来最好的归并排序法的时间复杂度仍然有O(N * logN),稍微耗费性能了一些。

2、解决方案二:遍历+List

既然排序操作比较耗性能,那么能不能不排序?可以的,所以进一步的,有了第二种解决方案。

解决方案使用List不变,不过可以采用遍历的方式:

(1)服务器节点不排序,其Hash值全部直接放入一个List中

(2)带路由的节点,算出其Hash值,由于指明了”顺时针”,因此遍历List,比待路由的节点Hash值大的算出差值并记录,比待路由节点Hash值小的忽略

(3)算出所有的差值之后,最小的那个,就是最终需要路由过去的节点

在这个算法中,看一下时间复杂度:

1、最好情况是只有一个服务器节点的Hash值大于带路由结点的Hash值,其时间复杂度是O(N)+O(1)=O(N+1),忽略常数项,即O(N)

2、最坏情况是所有服务器节点的Hash值都大于带路由结点的Hash值,其时间复杂度是O(N)+O(N)=O(2N),忽略首项系数,即O(N)

所以,总的时间复杂度就是O(N)。其实算法还能更改进一些:给一个位置变量X,如果新的差值比原差值小,X替换为新的位置,否则X不变。这样遍历就减少了一轮,不过经过改进后的算法时间复杂度仍为O(N)。

总而言之,这个解决方案和解决方案一相比,总体来看,似乎更好了一些。

3、解决方案三:二叉查找树

抛开List这种数据结构,另一种数据结构则是使用二叉查找树。对于树不是很清楚的朋友可以简单看一下这篇文章树形结构

当然我们不能简单地使用二叉查找树,因为可能出现不平衡的情况。平衡二叉查找树有AVL树、红黑树等,这里使用红黑树,选用红黑树的原因有两点:

1、红黑树主要的作用是用于存储有序的数据,这其实和第一种解决方案的思路又不谋而合了,但是它的效率非常高

2、JDK里面提供了红黑树的代码实现TreeMap和TreeSet

另外,以TreeMap为例,TreeMap本身提供了一个tailMap(K fromKey)方法,支持从红黑树中查找比fromKey大的值的集合,但并不需要遍历整个数据结构。

使用红黑树,可以使得查找的时间复杂度降低为O(logN),比上面两种解决方案,效率大大提升。

为了验证这个说法,我做了一次测试,从大量数据中查找第一个大于其中间值的那个数据,比如10000数据就找第一个大于5000的数据(模拟平均的情况)。看一下O(N)时间复杂度和O(logN)时间复杂度运行效率的对比:

50000 100000 500000 1000000 4000000
ArrayList 1ms 1ms 4ms 4ms 5ms
LinkedList 4ms 7ms 11ms 13ms 17ms
TreeMap 0ms 0ms 0ms 0ms 0ms

因为再大就内存溢出了,所以只测试到4000000数据。可以看到,数据查找的效率,TreeMap是完胜的,其实再增大数据测试也是一样的,红黑树的数据结构决定了任何一个大于N的最小数据,它都只需要几次至几十次查找就可以查到。

当然,明确一点,有利必有弊,根据我另外一次测试得到的结论是,为了维护红黑树,数据插入效率TreeMap在三种数据结构里面是最差的,且插入要慢上5~10倍

 

Hash值重新计算

服务器节点我们肯定用字符串来表示,比如”192.168.1.1″、”192.168.1.2″,根据字符串得到其Hash值,那么另外一个重要的问题就是Hash值要重新计算,这个问题是我在测试String的hashCode()方法的时候发现的,不妨来看一下为什么要重新计算Hash值:

/**
 * String的hashCode()方法运算结果查看
 * @author 五月的仓颉 http://www.cnblogs.com/xrq730/
 *
 */
public class StringHashCodeTest
{
    public static void main(String[] args)
    {
        System.out.println("192.168.0.0:111的哈希值:" + "192.168.0.0:1111".hashCode());
        System.out.println("192.168.0.1:111的哈希值:" + "192.168.0.1:1111".hashCode());
        System.out.println("192.168.0.2:111的哈希值:" + "192.168.0.2:1111".hashCode());
        System.out.println("192.168.0.3:111的哈希值:" + "192.168.0.3:1111".hashCode());
        System.out.println("192.168.0.4:111的哈希值:" + "192.168.0.4:1111".hashCode());
    }
}

我们在做集群的时候,集群点的IP以这种连续的形式存在是很正常的。看一下运行结果为:

192.168.0.0:111的哈希值:1845870087
192.168.0.1:111的哈希值:1874499238
192.168.0.2:111的哈希值:1903128389
192.168.0.3:111的哈希值:1931757540
192.168.0.4:111的哈希值:1960386691

这个就问题大了,[0,232-1]的区间之中,5个HashCode值却只分布在这么小小的一个区间,什么概念?[0,232-1]中有4294967296个数字,而我们的区间只有114516604,从概率学上讲这将导致97%待路由的服务器都被路由到”192.168.0.0″这个集群点上,简直是糟糕透了!

另外还有一个不好的地方:规定的区间是非负数,String的hashCode()方法却会产生负数(不信用”192.168.1.0:1111″试试看就知道了)。不过这个问题好解决,取绝对值就是一种解决的办法。

综上,String重写的hashCode()方法在一致性Hash算法中没有任何实用价值,得找个算法重新计算HashCode。这种重新计算Hash值的算法有很多,比如CRC32_HASH、FNV1_32_HASH、KETAMA_HASH等,其中KETAMA_HASH是默认的MemCache推荐的一致性Hash算法,用别的Hash算法也可以,比如FNV1_32_HASH算法的计算效率就会高一些。

一致性Hash算法实现版本1:不带虚拟节点

使用一致性Hash算法,尽管增强了系统的伸缩性,但是也有可能导致负载分布不均匀,解决办法就是使用虚拟节点代替真实节点,第一个代码版本,先来个简单的,不带虚拟节点。

下面来看一下不带虚拟节点的一致性Hash算法的Java代码实现:

 1 /**
 2  * 不带虚拟节点的一致性Hash算法
 3  * @author 五月的仓颉http://www.cnblogs.com/xrq730/
 4  *
 5  */
 6 public class ConsistentHashingWithoutVirtualNode
 7 {
 8     /**
 9      * 待添加入Hash环的服务器列表
10      */
11     private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
12             "192.168.0.3:111", "192.168.0.4:111"};
13     
14     /**
15      * key表示服务器的hash值,value表示服务器的名称
16      */
17     private static SortedMap<Integer, String> sortedMap = 
18             new TreeMap<Integer, String>();
19     
20     /**
21      * 程序初始化,将所有的服务器放入sortedMap中
22      */
23     static
24     {
25         for (int i = 0; i < servers.length; i++)
26         {
27             int hash = getHash(servers[i]);
28             System.out.println("[" + servers[i] + "]加入集合中, 其Hash值为" + hash);
29             sortedMap.put(hash, servers[i]);
30         }
31         System.out.println();
32     }
33     
34     /**
35      * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别 
36      */
37     private static int getHash(String str)
38     {
39         final int p = 16777619;
40         int hash = (int)2166136261L;
41         for (int i = 0; i < str.length(); i++)
42             hash = (hash ^ str.charAt(i)) * p;
43         hash += hash << 13;
44         hash ^= hash >> 7;
45         hash += hash << 3;
46         hash ^= hash >> 17;
47         hash += hash << 5;
48         
49         // 如果算出来的值为负数则取其绝对值
50         if (hash < 0)
51             hash = Math.abs(hash);
52         return hash;
53     }
54     
55     /**
56      * 得到应当路由到的结点
57      */
58     private static String getServer(String node)
59     {
60         // 得到带路由的结点的Hash值
61         int hash = getHash(node);
62         // 得到大于该Hash值的所有Map
63         SortedMap<Integer, String> subMap = 
64                 sortedMap.tailMap(hash);
65         // 第一个Key就是顺时针过去离node最近的那个结点
66         Integer i = subMap.firstKey();
67         // 返回对应的服务器名称
68         return subMap.get(i);
69     }
70     
71     public static void main(String[] args)
72     {
73         String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
74         for (int i = 0; i < nodes.length; i++)
75             System.out.println("[" + nodes[i] + "]的hash值为" + 
76                     getHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
77     }
78 }

可以运行一下看一下结果:

[192.168.0.0:111]加入集合中, 其Hash值为575774686
[192.168.0.1:111]加入集合中, 其Hash值为8518713
[192.168.0.2:111]加入集合中, 其Hash值为1361847097
[192.168.0.3:111]加入集合中, 其Hash值为1171828661
[192.168.0.4:111]加入集合中, 其Hash值为1764547046

[127.0.0.1:1111]的hash值为380278925, 被路由到结点[192.168.0.0:111]
[221.226.0.1:2222]的hash值为1493545632, 被路由到结点[192.168.0.4:111]
[10.211.0.1:3333]的hash值为1393836017, 被路由到结点[192.168.0.4:111]

看到经过FNV1_32_HASH算法重新计算过后的Hash值,就比原来String的hashCode()方法好多了。从运行结果来看,也没有问题,三个点路由到的都是顺时针离他们Hash值最近的那台服务器上。

使用虚拟节点来改善一致性Hash算法

上面的一致性Hash算法实现,可以在很大程度上解决很多分布式环境下不好的路由算法导致系统伸缩性差的问题,但是会带来另外一个问题:负载不均。

比如说有Hash环上有A、B、C三个服务器节点,分别有100个请求会被路由到相应服务器上。现在在A与B之间增加了一个节点D,这导致了原来会路由到B上的部分节点被路由到了D上,这样A、C上被路由到的请求明显多于B、D上的,原来三个服务器节点上均衡的负载被打破了。某种程度上来说,这失去了负载均衡的意义,因为负载均衡的目的本身就是为了使得目标服务器均分所有的请求

解决这个问题的办法是引入虚拟节点,其工作原理是:将一个物理节点拆分为多个虚拟节点,并且同一个物理节点的虚拟节点尽量均匀分布在Hash环上。采取这样的方式,就可以有效地解决增加或减少节点时候的负载不均衡的问题。

至于一个物理节点应该拆分为多少虚拟节点,下面可以先看一张图:

横轴表示需要为每台福利服务器扩展的虚拟节点倍数,纵轴表示的是实际物理服务器数。可以看出,物理服务器很少,需要更大的虚拟节点;反之物理服务器比较多,虚拟节点就可以少一些。比如有10台物理服务器,那么差不多需要为每台服务器增加100~200个虚拟节点才可以达到真正的负载均衡。

一致性Hash算法实现版本2:带虚拟节点

在理解了使用虚拟节点来改善一致性Hash算法的理论基础之后,就可以尝试开发代码了。编程方面需要考虑的问题是:

1、一个真实结点如何对应成为多个虚拟节点?

2、虚拟节点找到后如何还原为真实结点?

这两个问题其实有很多解决办法,我这里使用了一种简单的办法,给每个真实结点后面根据虚拟节点加上后缀再取Hash值,比如”192.168.0.0:111″就把它变成”192.168.0.0:111&&VN0″到”192.168.0.0:111&&VN4″,VN就是Virtual Node的缩写,还原的时候只需要从头截取字符串到”&&”的位置就可以了。

下面来看一下带虚拟节点的一致性Hash算法的Java代码实现:

 1 /**
 2  * 带虚拟节点的一致性Hash算法
 3  * @author 五月的仓颉 http://www.cnblogs.com/xrq730/
 4  */
 5 public class ConsistentHashingWithVirtualNode
 6 {
 7     /**
 8      * 待添加入Hash环的服务器列表
 9      */
10     private static String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
11             "192.168.0.3:111", "192.168.0.4:111"};
12     
13     /**
14      * 真实结点列表,考虑到服务器上线、下线的场景,即添加、删除的场景会比较频繁,这里使用LinkedList会更好
15      */
16     private static List<String> realNodes = new LinkedList<String>();
17     
18     /**
19      * 虚拟节点,key表示虚拟节点的hash值,value表示虚拟节点的名称
20      */
21     private static SortedMap<Integer, String> virtualNodes = 
22             new TreeMap<Integer, String>();
23     
24     /**
25      * 虚拟节点的数目,这里写死,为了演示需要,一个真实结点对应5个虚拟节点
26      */
27     private static final int VIRTUAL_NODES = 5;
28     
29     static
30     {
31         // 先把原始的服务器添加到真实结点列表中
32         for (int i = 0; i < servers.length; i++)
33             realNodes.add(servers[i]);
34         
35         // 再添加虚拟节点,遍历LinkedList使用foreach循环效率会比较高
36         for (String str : realNodes)
37         {
38             for (int i = 0; i < VIRTUAL_NODES; i++)
39             {
40                 String virtualNodeName = str + "&&VN" + String.valueOf(i);
41                 int hash = getHash(virtualNodeName);
42                 System.out.println("虚拟节点[" + virtualNodeName + "]被添加, hash值为" + hash);
43                 virtualNodes.put(hash, virtualNodeName);
44             }
45         }
46         System.out.println();
47     }
48     
49     /**
50      * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别 
51      */
52     private static int getHash(String str)
53     {
54         final int p = 16777619;
55         int hash = (int)2166136261L;
56         for (int i = 0; i < str.length(); i++)
57             hash = (hash ^ str.charAt(i)) * p;
58         hash += hash << 13;
59         hash ^= hash >> 7;
60         hash += hash << 3;
61         hash ^= hash >> 17;
62         hash += hash << 5;
63         
64         // 如果算出来的值为负数则取其绝对值
65         if (hash < 0)
66             hash = Math.abs(hash);
67         return hash;
68     }
69     
70     /**
71      * 得到应当路由到的结点
72      */
73     private static String getServer(String node)
74     {
75         // 得到带路由的结点的Hash值
76         int hash = getHash(node);
77         // 得到大于该Hash值的所有Map
78         SortedMap<Integer, String> subMap = 
79                 virtualNodes.tailMap(hash);
80         // 第一个Key就是顺时针过去离node最近的那个结点
81         Integer i = subMap.firstKey();
82         // 返回对应的虚拟节点名称,这里字符串稍微截取一下
83         String virtualNode = subMap.get(i);
84         return virtualNode.substring(0, virtualNode.indexOf("&&"));
85     }
86     
87     public static void main(String[] args)
88     {
89         String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
90         for (int i = 0; i < nodes.length; i++)
91             System.out.println("[" + nodes[i] + "]的hash值为" + 
92                     getHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
93     }
94 }

关注一下运行结果:

虚拟节点[192.168.0.0:111&&VN0]被添加, hash值为1686427075
虚拟节点[192.168.0.0:111&&VN1]被添加, hash值为354859081
虚拟节点[192.168.0.0:111&&VN2]被添加, hash值为1306497370
虚拟节点[192.168.0.0:111&&VN3]被添加, hash值为817889914
虚拟节点[192.168.0.0:111&&VN4]被添加, hash值为396663629
虚拟节点[192.168.0.1:111&&VN0]被添加, hash值为1032739288
虚拟节点[192.168.0.1:111&&VN1]被添加, hash值为707592309
虚拟节点[192.168.0.1:111&&VN2]被添加, hash值为302114528
虚拟节点[192.168.0.1:111&&VN3]被添加, hash值为36526861
虚拟节点[192.168.0.1:111&&VN4]被添加, hash值为848442551
虚拟节点[192.168.0.2:111&&VN0]被添加, hash值为1452694222
虚拟节点[192.168.0.2:111&&VN1]被添加, hash值为2023612840
虚拟节点[192.168.0.2:111&&VN2]被添加, hash值为697907480
虚拟节点[192.168.0.2:111&&VN3]被添加, hash值为790847074
虚拟节点[192.168.0.2:111&&VN4]被添加, hash值为2010506136
虚拟节点[192.168.0.3:111&&VN0]被添加, hash值为891084251
虚拟节点[192.168.0.3:111&&VN1]被添加, hash值为1725031739
虚拟节点[192.168.0.3:111&&VN2]被添加, hash值为1127720370
虚拟节点[192.168.0.3:111&&VN3]被添加, hash值为676720500
虚拟节点[192.168.0.3:111&&VN4]被添加, hash值为2050578780
虚拟节点[192.168.0.4:111&&VN0]被添加, hash值为586921010
虚拟节点[192.168.0.4:111&&VN1]被添加, hash值为184078390
虚拟节点[192.168.0.4:111&&VN2]被添加, hash值为1331645117
虚拟节点[192.168.0.4:111&&VN3]被添加, hash值为918790803
虚拟节点[192.168.0.4:111&&VN4]被添加, hash值为1232193678

[127.0.0.1:1111]的hash值为380278925, 被路由到结点[192.168.0.0:111]
[221.226.0.1:2222]的hash值为1493545632, 被路由到结点[192.168.0.0:111]
[10.211.0.1:3333]的hash值为1393836017, 被路由到结点[192.168.0.2:111]

从代码运行结果看,每个点路由到的服务器都是Hash值顺时针离它最近的那个服务器节点,没有任何问题。

通过采取虚拟节点的方法,一个真实结点不再固定在Hash换上的某个点,而是大量地分布在整个Hash环上,这样即使上线、下线服务器,也不会造成整体的负载不均衡。

后记

在写本文的时候,很多知识我也是边写边学,难免有很多写得不好、理解得不透彻的地方,而且代码整体也比较糙,未有考虑到可能的各种情况。抛砖引玉,一方面,写得不对的地方,还望网友朋友们指正;另一方面,后续我也将通过自己的工作、学习不断完善上面的代码。

from:http://www.cnblogs.com/xrq730/p/5186728.html

系统负载能力浅析

一. 衡量指标

用什么来衡量一个系统的负载能力呢?有一个概念叫做每秒请求数(Requests per second),指的是每秒能够成功处理请求的数目。比如说,你可以配置tomcat服务器的maxConnection为无限大,但是受限于服务器系统或者硬件限制,很多请求是不会在一定的时间内得到响应的,这并不作为一个成功的请求,其中成功得到响应的请求数即为每秒请求数,反应出系统的负载能力。

通常的,对于一个系统,增加并发用户数量时每秒请求数量也会增加。然而,我们最终会达到这样一个点,此时并发用户数量开始“压倒”服务器。如果继续增加并发用户数量,每秒请求数量开始下降,而反应时间则会增加。这个并发用户数量开始“压倒”服务器的临界点非常重要,此时的并发用户数量可以认为是当前系统的最大负载能力。

二. 相关因素

一般的,和系统并发访问量相关的几个因素如下:

  • 带宽
  • 硬件配置
  • 系统配置
  • 应用服务器配置
  • 程序逻辑
  • 系统架构

其中,带宽和硬件配置是决定系统负载能力的决定性因素。这些只能依靠扩展和升级提高。我们需要重点关注的是在一定带宽和硬件配置的基础上,怎么使系统的负载能力达到最大。

2.1 带宽

毋庸置疑,带宽是决定系统负载能力的一个至关重要的因素,就好比水管一样,细的水管同一时间通过的水量自然就少(这个比喻解释带宽可能不是特别合适)。一个系统的带宽首先就决定了这个系统的负载能力,其单位为Mbps,表示数据的发送速度。

2.2 硬件配置

系统部署所在的服务器的硬件决定了一个系统的最大负载能力,也是上限。一般说来,以下几个配置起着关键作用:

  • cpu频率/核数:cpu频率关系着cpu的运算速度,核数则影响线程调度、资源分配的效率。
  • 内存大小以及速度:内存越大,那么可以在内存中运行的数据也就越大,速度自然而然就快;内存的速度从原来的几百hz到现在几千hz,决定了数据读取存储的速度。
  • 硬盘速度:传统的硬盘是使用磁头进行寻址的,io速度比较慢,使用了SSD的硬盘,其寻址速度大大较快。

很多系统的架构设计、系统优化,最终都会加上这么一句:使用ssd存储解决了这些问题。

可见,硬件配置是决定一个系统的负载能力的最关键因素。

2.3 系统配置

一般来说,目前后端系统都是部署在Linux主机上的。所以抛开win系列不谈,对于Linux系统来说一般有以下配置关系着系统的负载能力。

  • 文件描述符数限制:Linux中所有东西都是文件,一个socket就对应着一个文件描述符,因此系统配置的最大打开文件数以及单个进程能够打开的最大文件数就决定了socket的数目上限。
  • 进程/线程数限制: 对于apache使用的prefork等多进程模式,其负载能力由进程数目所限制。对tomcat多线程模式则由线程数所限制。
  • tcp内核参数:网络应用的底层自然离不开tcp/ip,Linux内核有一些与此相关的配置也决定了系统的负载能力。

2.3.1 文件描述符数限制

  • 系统最大打开文件描述符数:/proc/sys/fs/file-max中保存了这个数目,修改此值
    1
    2
    3
    4
    临时性:
     echo 1000000 > /proc/sys/fs/file-max
    永久性:
    在/etc/sysctl.conf中设置 fs.file-max = 1000000
  • 进程最大打开文件描述符数:这个是配单个进程能够打开的最大文件数目。可以通过ulimit -n查看/修改。如果想要永久修改,则需要修改/etc/security/limits.conf中的nofile。

通过读取/proc/sys/fs/file-nr可以看到当前使用的文件描述符总数。另外,对于文件描述符的配置,需要注意以下几点:

  • 所有进程打开的文件描述符数不能超过/proc/sys/fs/file-max
  • 单个进程打开的文件描述符数不能超过user limit中nofile的soft limit
  • nofile的soft limit不能超过其hard limit
  • nofile的hard limit不能超过/proc/sys/fs/nr_open

2.3.2 进程/线程数限制

  • 进程数限制:ulimit -u可以查看/修改单个用户能够打开的最大进程数。/etc/security/limits.conf中的noproc则是系统的最大进程数。
  • 线程数限制
    • 可以通过/proc/sys/kernel/threads-max查看系统总共可以打开的最大线程数。
    • 单个进程的最大线程数和PTHREAD_THREADS_MAX有关,此限制可以在/usr/include/bits/local_lim.h中查看,但是如果想要修改的话,需要重新编译。
    • 这里需要提到一点的是,Linux内核2.4的线程实现方式为linux threads,是轻量级进程,都会首先创建一个管理线程,线程数目的大小是受PTHREAD_THREADS_MAX影响的。但Linux2.6内核的线程实现方式为NPTL,是一个改进的LWP实现,最大一个区别就是,线程公用进程的pid(tgid),线程数目大小只受制于资源。
    • 线程数的大小还受线程栈大小的制约:使用ulimit -s可以查看/修改线程栈的大小,即每开启一个新的线程需要分配给此线程的一部分内存。减小此值可以增加可以打开的线程数目。

2.3.3 tcp内核参数

在一台服务器CPU和内存资源额定有限的情况下,最大的压榨服务器的性能,是最终的目的。在节省成本的情况下,可以考虑修改Linux的内核TCP/IP参数,来最大的压榨服务器的性能。如果通过修改内核参数也无法解决的负载问题,也只能考虑升级服务器了,这是硬件所限,没有办法的事。

1
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'

使用上面的命令,可以得到当前系统的各个状态的网络连接的数目。如下:

1
2
3
4
5
6
7
LAST_ACK 13
SYN_RECV 468
ESTABLISHED 90
FIN_WAIT1 259
FIN_WAIT2 40
CLOSING 34
TIME_WAIT 28322

这里,TIME_WAIT的连接数是需要注意的一点。此值过高会占用大量连接,影响系统的负载能力。需要调整参数,以尽快的释放time_wait连接。

一般tcp相关的内核参数在/etc/sysctl.conf文件中。为了能够尽快释放time_wait状态的连接,可以做以下配置:

  • net.ipv4.tcp_syncookies = 1 //表示开启SYN Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭;
  • net.ipv4.tcp_tw_reuse = 1 //表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭;
  • net.ipv4.tcp_tw_recycle = 1 //表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭;
  • net.ipv4.tcp_fin_timeout = 30 //修改系統默认的 TIMEOUT 时间。

这里需要注意的一点就是当打开了tcp_tw_recycle,就会检查时间戳,移动环境下的发来的包的时间戳有些时候是乱跳的,会把带了“倒退”的时间戳的包当作是“recycle的tw连接的重传数据,不是新的请求”,于是丢掉不回包,造成大量丢包。另外,当前面有LVS,并且采用的是NAT机制时,开启tcp_tw_recycle会造成一些异常,可见:http://www.pagefault.info/?p=416。如果这种情况下仍然需要开启此选项,那么可以考虑设置net.ipv4.tcp_timestamps=0,忽略掉报文的时间戳即可。

此外,还可以通过优化tcp/ip的可使用端口的范围,进一步提升负载能力。,如下:

  • net.ipv4.tcp_keepalive_time = 1200 //表示当keepalive起用的时候,TCP发送keepalive消息的频度。缺省是2小时,改为20分钟。
  • net.ipv4.ip_local_port_range = 10000 65000 //表示用于向外连接的端口范围。缺省情况下很小:32768到61000,改为10000到65000。(注意:这里不要将最低值设的太低,否则可能会占用掉正常的端口!)
  • net.ipv4.tcp_max_syn_backlog = 8192 //表示SYN队列的长度,默认为1024,加大队列长度为8192,可以容纳更多等待连接的网络连接数。
  • net.ipv4.tcp_max_tw_buckets = 5000 //表示系统同时保持TIME_WAIT的最大数量,如果超过这个数字,TIME_WAIT将立刻被清除并打印警告信息。默认为180000,改为5000。对于Apache、Nginx等服务器,上几行的参数可以很好地减少TIME_WAIT套接字数量,但是对于Squid,效果却不大。此项参数可以控制TIME_WAIT的最大数量,避免Squid服务器被大量的TIME_WAIT拖死。

2.4 应用服务器配置

说到应用服务器配置,这里需要提到应用服务器的几种工作模式,也叫并发策略。

  • multi process:多进程方式,一个进程处理一个请求。
  • prefork:类似于多进程的方式,但是会预先fork出一些进程供后续使用,是一种进程池的理念。
  • worker:一个线程对应一个请求,相比多进程的方式,消耗资源变少,但同时一个线程的崩溃会引起整个进程的崩溃,稳定性不如多进程。
  • master/worker:采用的是非阻塞IO的方式,只有两种进程:worker和master,master负责worker进程的创建、管理等,worker进程采用基于事件驱动的多路复用IO处理请求。mater进程只需要一个,woker进程根据cpu核数设置数目。

前三者是传统应用服务器apache和tomcat采用的方式,最后一种是nginx采用的方式。当然这里需要注意的是应用服务器和nginx这种做反向代理服务器(暂且忽略nginx+cgi做应用服务器的功能)的区别。应用服务器是需要处理应用逻辑的,有时候是耗cup资源的;而反向代理主要用作IO,是IO密集型的应用。使用事件驱动的这种网络模型,比较适合IO密集型应用,而并不适合CPU密集型应用。对于后者,多进程/线程则是一个更好地选择。

当然,由于nginx采用的基于事件驱动的多路IO复用的模型,其作为反向代理服务器时,可支持的并发是非常大的。淘宝tengine团队曾有一个测试结果是“24G内存机器上,处理并发请求可达200万”。

2.4.1 nginx/tengine

ngixn是目前使用最广泛的反向代理软件,而tengine是阿里开源的一个加强版nginx,其基本实现了nginx收费版本的一些功能,如:主动健康检查、session sticky等。对于nginx的配置,需要注意的有这么几点:

  • worker数目要和cpu(核)的数目相适应
  • keepalive timout要设置适当
  • worker_rlimit_nofile最大文件描述符要增大
  • upstream可以使用http 1.1的keepalive

典型配置可见:https://github.com/superhj1987/awesome-config/blob/master/nginx/nginx.conf

2.4.2 tomcat

tomcat的关键配置总体上有两大块:jvm参数配置和connector参数配置。

  • jvm参数配置:
    • 堆的最小值:Xms
    • 堆的最大值:Xmx
    • 新生代大小: Xmn
    • 永久代大小: XX:PermSize:
    • 永久代最大大小: XX:MaxPermSize:
    • 栈大小:-Xss或-XX:ThreadStackSize

    这里对于栈大小有一点需要注意的是:在Linux x64上ThreadStackSize的默认值就是1024KB,给Java线程创建栈会用这个参数指定的大小。如果把-Xss或者-XX:ThreadStackSize设为0,就是使用“系统默认值”。而在Linux x64上HotSpot VM给Java栈定义的“系统默认”大小也是1MB。所以普通Java线程的默认栈大小怎样都是1MB。这里有一个需要注意的地方就是java的栈大小和之前提到过的操作系统的操作系统栈大小(ulimit -s):这个配置只影响进程的初始线程;后续用pthread_create创建的线程都可以指定栈大小。HotSpot VM为了能精确控制Java线程的栈大小,特意不使用进程的初始线程(primordial thread)作为Java线程。

    其他还要根据业务场景,选择使用那种垃圾回收器,回收的策略。另外,当需要保留GC信息时,也需要做一些设置。

    典型配置可见:https://github.com/superhj1987/awesome-config/blob/master/tomcat/java_opts.conf

  • connector参数配置
    • protocol: 有三个选项:bio;nio;apr。建议使用apr选项,性能为最高。
    • connectionTimeout:连接的超时时间
    • maxThreads:最大线程数,此值限制了bio的最大连接数
    • minSpareThreads: 最大空闲线程数
    • acceptCount:可以接受的最大请求数目(未能得到处理的请求排队)
    • maxConnection: 使用nio或者apr时,最大连接数受此值影响。

    典型配置可见:https://github.com/superhj1987/awesome-config/blob/master/tomcat/connector.conf

    一般的当一个进程有500个线程在跑的话,那性能已经是很低很低了。Tomcat默认配置的最大请求数是150。当某个应用拥有250个以上并发的时候,应考虑应用服务器的集群。

    另外,并非是无限调大maxTreads和maxConnection就能无限调高并发能力的。线程越多,那么cpu花费在线程调度上的时间越多,同时,内存消耗也就越大,那么就极大影响处理用户的请求。受限于硬件资源,并发值是需要设置合适的值的。

对于tomcat这里有一个争论就是:使用大内存tomcat好还是多个小的tomcat集群好?(针对64位服务器以及tomcat来说)

其实,这个要根据业务场景区别对待的。通常,大内存tomcat有以下问题:

  • 一旦发生full gc,那么会非常耗时
  • 一旦gc,dump出的堆快照太大,无法分析

因此,如果可以保证一定程度上程序的对象大部分都是朝生夕死的,老年代不会发生gc,那么使用大内存tomcat也是可以的。但是在伸缩性和高可用却比不上使用小内存(相对来说)tomcat集群。

使用小内存tomcat集群则有以下优势:

  • 可以根据系统的负载调整tc的数量,以达到资源的最大利用率,
  • 可以防止单点故障。

2.4.3 数据库

mysql

mysql是目前最常用的关系型数据库,支持复杂的查询。但是其负载能力一般,很多时候一个系统的瓶颈就发生在mysql这一点,当然有时候也和sql语句的效率有关。比如,牵扯到联表的查询一般说来效率是不会太高的。

影响数据库性能的因素一般有以下几点:

  • 硬件配置:这个无需多说
  • 数据库设置:max_connection的一些配置会影响数据库的连接数
  • 数据表的设计:使用冗余字段避免联表查询;使用索引提高查询效率
  • 查询语句是否合理:这个牵扯到的是个人的编码素质。比如,查询符合某个条件的记录,我见过有人把记录全部查出来,再去逐条对比
  • 引擎的选择:myisam和innodb两者的适用场景不同,不存在绝对的优劣

抛开以上因素,当数据量单表突破千万甚至百万时(和具体的数据有关),需要对mysql数据库进行优化,一种常见的方案就是分表:

  • 垂直分表:在列维度的拆分
  • 水平分表:行维度的拆分

此外,对于数据库,可以使用读写分离的方式提高性能,尤其是对那种读频率远大于写频率的业务场景。这里一般采用master/slave的方式实现读写分离,前面用程序控制或者加一个proxy层。可以选择使用MySQL Proxy,编写lua脚本来实现基于proxy的mysql读写分离;也可以通过程序来控制,根据不同的sql语句选择相应的数据库来操作,这个也是笔者公司目前在用的方案。由于此方案和业务强绑定,是很难有一个通用的方案的,其中比较成熟的是阿里的TDDL,但是由于未全部开源且对其他组件有依赖性,不推荐使用。

现在很多大的公司对这些分表、主从分离、分布式都基于mysql做了自己的二次开发,形成了自己公司的一套分布式数据库系统。比如阿里的Cobar、网易的DDB、360的Atlas等。当然,很多大公司也研发了自己的mysql分支,比较出名的就是姜承尧带领研发的InNoSQL

redis

当然,对于系统中并发很高并且访问很频繁的数据,关系型数据库还是不能妥妥应对。这时候就需要缓存数据库出马以隔离对mysql的访问,防止mysql崩溃。

其中,redis是目前用的比较多的缓存数据库(当然,也有直接把redis当做数据库使用的)。redis是单线程基于内存的数据库,读写性能远远超过mysql。一般情况下,对redis做读写分离主从同步就可以应对大部分场景的应用。但是这样的方案缺少ha,尤其对于分布式应用,是不可接受的。目前,redis集群的实现方案有以下几个:

  • redis cluster:这是一种去中心化的方案,是redis的官方实现。是一种非常“重”的方案,已经不是Redis单实例的“简单、可依赖”了。目前应用案例还很少,貌似国内的芒果台用了,结局不知道如何。
  • twemproxy:这是twitter开源的redis和memcached的proxy方案。比较成熟,目前的应用案例比较多,但也有一些缺陷,尤其在运维方面。比如无法平滑的扩容/缩容,运维不友好等。
  • codis: 这个是豌豆荚开源的redis proxy方案,能够兼容twemproxy,并且对其做了很多改进。由豌豆荚于2014年11月开源,基于Go和C开发。现已广泛用于豌豆荚的各种Redis业务场景。现在比Twemproxy快近100%。目前据我所知除了豌豆荚之外,hulu也在使用这套方案。当然,其升级项目reborndb号称比codis还要厉害。

2.5 系统架构

影响性能的系统架构一般会有这几方面:

  • 负载均衡
  • 同步 or 异步
  • 28原则

2.5.1 负载均衡

负载均衡在服务端领域中是一个很关键的技术。可以分为以下两种:

  • 硬件负载均衡
  • 软件负载均衡

其中,硬件负载均衡的性能无疑是最优的,其中以F5为代表。但是,与高性能并存的是其成本的昂贵。所以对于很多初创公司来说,一般是选用软件负载均衡的方案。

软件负载均衡中又可以分为四层负载均衡和七层负载均衡。 上文在应用服务器配置部分讲了nginx的反向代理功能即七层的一种成熟解决方案,主要针对的是七层http协议(虽然最新的发布版本已经支持四层负载均衡)。对于四层负载均衡,目前应用最广泛的是lvs。其是阿里的章文嵩博士带领的团队所研发的一款linux下的负载均衡软件,本质上是基于iptables实现的。分为三种工作模式:

  • NAT: 修改数据包destination ip,in和out都要经过lvs。
  • DR:修改数据包mac地址,lvs和realserver需要在一个vlan。
  • IP TUUNEL:修改数据包destination ip和源ip,realserver需要支持ip tunnel协议。lvs和realserver不需要在一个vlan。

三种模式各有优缺点,目前还有阿里开源的一个FULL NAT是在NAT原来的DNAT上加入了SNAT的功能。

此外,haproxy也是一款常用的负载均衡软件。但限于对此使用较少,在此不做讲述。

2.5.2 同步 or 异步

对于一个系统,很多业务需要面对使用同步机制或者是异步机制的选择。比如,对于一篇帖子,一个用户对其分享后,需要记录用户的分享记录。如果你使用同步模式(分享的同时记录此行为),那么响应速度肯定会受到影响。而如果你考虑到分享过后,用户并不会立刻去查看自己的分享记录,牺牲这一点时效性,可以先完成分享的动作,然后异步记录此行为,会提高分享请求的响应速度(当然,这里可能会有事务准确性的问题)。有时候在某些业务逻辑上,在充分理解用户诉求的基础上,是可以牺牲某些特性来满足用户需求的。

这里值得一提的是,很多时候对于一个业务流程,是可以拆开划分为几个步骤的,然后有些步骤完全可以异步并发执行,能够极大提高处理速度。

2.5.3 28原则

对于一个系统,20%的功能会带来80%的流量。这就是28原则的意思,当然也是我自己的一种表述。因此在设计系统的时候,对于80%的功能,其面对的请求压力是很小的,是没有必要进行过度设计的。但是对于另外20%的功能则是需要设计再设计、reivew再review,能够做负载均衡就做负载均衡,能够缓存就缓存,能够做分布式就分布式,能够把流程拆开异步化就异步化。

当然,这个原则适用于生活中很多事物。

三. 一般架构

一般的Java后端系统应用架构如下图所示:LVS+Nginx+Tomcat+MySql/DDB+Redis/Codis

web-arch

其中,虚线部分是数据库层,采用的是主从模式。也可以使用redis cluster(codis等)以及mysql cluster(Cobar等)来替换。

from:http://www.rowkey.me/blog/2015/09/09/load-analysis/

Java中的纤程库 – Quasar

最近遇到的一个问题大概是微服务架构中经常会遇到的一个问题:

服务 A 是我们开发的系统,它的业务需要调用 BCD 等多个服务,这些服务是通过http的访问提供的。 问题是 BCD 这些服务都是第三方提供的,不能保证它们的响应时间,快的话十几毫秒,慢的话甚至1秒多,所以这些服务的Latency比较长。幸运地是这些服务都是集群部署的,容错率和并发支持都比较高,所以不担心它们的并发性能,唯一不爽的就是就是它们的Latency太高了。

简化的微服务架构简化的微服务架构

系统A会从Client接收Request, 每个Request的处理都需要多次调用B、C、D的服务,所以完成一个Request可能需要1到2秒的时间。为了让A能更好地支持并发数,系统中使用线程池处理这些Request。当然这是一个非常简化的模型,实际的业务处理比较复杂。

可以预见,因为系统B、C、D的延迟,导致整个业务处理都很慢,即使使用线程池,但是每个线程还是会阻塞在B、C、D的调用上,导致I/O阻塞了这些线程, CPU利用率相对来说不是那么高。

当然在测试的时候使用的是B、C、D的模拟器,没有预想到它们的响应是那么慢,因此测试数据的结果还不错,吞吐率还可以,但是在实际环境中问题就暴露出来了。

概述

最开始线程池设置的是200,然后用HttpUrlConnection作为http client发送请求到B、C、D。当然HttpUrlConnection也有一些坑,比如Persistent ConnectionsCaveats of HttpURLConnection,跳出坑后性能依然不行。

通过测试,如果B、C、D等服务延迟接近0毫秒,则HttpUrlConnection的吞吐率(线程池的大小为200)能到40000 requests/秒,但是随着第三方服务的响应时间变慢,它的吞吐率急剧下降,B、C、D的服务的延迟为100毫秒的时候,则HttpUrlConnection的吞吐率降到1800 requests/秒,而B、C、D的服务的延迟为100毫秒的时候HttpUrlConnection的吞吐率降到550 requests/秒。

增加http.maxConnections系统属性并不能显著增加吞吐率。

如果增加调用HttpUrlConnection的线程池的大小,比如增加到2000,性能会好一些,但是B、C、D的服务的延迟为500毫秒的时候,吞吐率为3800 requests/秒,延迟为1秒的时候,吞吐率为1900 requests/秒。

虽然线程池的增大能带来性能的提升,但是线程池也不能无限制的增大,因为每个线程都会占用一定的资源,而且随着线程的增多,线程之间的切换也更加的频繁,对CPU等资源也是一种浪费。

切换成netty(channel pool),与B、C、D通讯的性能还不错, latency为500ms的时候吞吐率能达到10000 requests/秒,通讯不成问题,问题是需要将业务代码改成异步的方式,异步地接收到这些response后在一个线程池中处理这些消息。

下面列出了一些常用的http client:

  • JDK’s URLConnection uses traditional thread-blocking I/O.
  • Apache HTTP Client uses traditional thread-blocking I/O with thread-pools.
  • Apache Async HTTP Client uses NIO.
  • Jersey is a ReST client/server framework; the client API can use several HTTP client backends including URLConnection and Apache HTTP Client.
  • OkHttp uses traditional thread-blocking I/O with thread-pools.
  • Retrofit turns your HTTP API into a Java interface and can use several HTTP client backends including Apache HTTP Client.
  • Grizzly is network framework with low-level HTTP support; it was using NIO but it switched to AIO .
  • Netty is a network framework with HTTP support (low-level), multi-transport, includes NIO and native (the latter uses epoll on Linux).
  • Jetty Async HTTP Client uses NIO.
  • Async HTTP Client wraps either Netty, Grizzly or JDK’s HTTP support.
  • clj-http wraps the Apache HTTP Client.
  • http-kit is an async subset of clj-http implemented partially in Java directly on top of NIO.
  • http async client wraps the Async HTTP Client for Java.

这个列表摘自 High-Concurrency HTTP Clients on the JVM,不止于此,这篇文章重点介绍基于java纤程库quasar的实现的http client库,并比较了性能。我们待会再说。

回到我前面所说的系统,如何能更好的提供性能?有一种方案是借助其它语言的优势,比如Go,让Go来代理完成和B、C、D的请求,系统A通过一个TCP连接与Go程序交流。第三方服务B、C、D的Response结果可以异步地返回给系统A。

Go的优势在于可以实现request-per-goroutine,整个系统中可以有成千上万个goroutine。 goroutine是轻量级的,而且在I/O阻塞的时候可以不占用线程,这让Go可以轻松地处理上万个链接,即使I/O阻塞也没问题。Go和Java之间的通讯协议可以通过Protobuffer来实现,而且它们之间只保留一个TCP连接即可。

当然这种架构的修改带来系统稳定性的降低,服务A和服务B、C、D之间的通讯增加了复杂性。同时,因为是异步方式,服务A的业务也要实现异步方式,否则200个线程依然等待Response的话,还是一个阻塞的架构。

通过测试,这种架构可以带来稳定的吞吐率。 不管服务B、C、D的延迟有多久,A的吞吐率能维持15000 requests/秒。当然Go到B、C、D的并发连接数也有限制,我把最大值调高到20000。

这种曲折的方案的最大的两个弊病就是架构的复杂性以及对原有系统需要进行大的重构。 高复杂性带来的是系统的稳定性的降低,包括部署、维护、网络状况、系统资源等。同时系统要改成异步模型,因为系统业务线程发送Request后不能等待Go返回Response,它需要从Client接收更多的Request,而收到Response之后它才继续执行剩下的业务,只有这样才不会阻塞,进而提到系统的吞吐率。

将系统A改成异步,然后使用HttpUrlConnection线程池行不行?
HttpUrlConnection线程池还是导致和B、C、D通讯的吞吐率下降,但是Go这种方案和B、C、D通讯的吞吐率可以维持一个较高的水平。

考虑到Go的优势,那么能不能在Java中使用类似Go的这种goroutine模型呢?那就是本文要介绍的Java纤程库: [Quasar](http://docs.paralleluniverse.co/quasar/)。

实际测试结果表明Go和Netty都是两种比较好的解决方案,而且Netty的性能惊人的好,不好的地方正如前面所讲,我们需要将代码改成异步的处理。线程池中的业务单元用Netty发送完Request之后,不要等待Response, Response的处理交给另外的线程来处理,同时注意不要在Netty的Handler里面处理业务逻辑。要解决的问题就变成如何更高效的处理Response了,而不是第三方系统阻塞的问题。

quasar初步

以下介绍Java的另一个解决方案,也就是Java中的coroutine库,因为最近刚刚看这个库,感觉挺不错的,而且用它替换Thread改动较少。

Java官方并没有纤程库。但是伟大的社区提供了一个优秀的库,它就是Quasar。

创始人是Ron Pressler和Dafna Pressler,由Y Combinator孵化。

Quasar is a library that provides high-performance lightweight threads, Go-like channels, Erlang-like actors, and other asynchronous programming tools for Java and Kotlin.

Quasar提供了高性能轻量级的线程,提供了类似Go的channel,Erlang风格的actor,以及其它的异步编程的工具,可以用在Java和Kotlin编程语言中。Scala目前的支持还不完善,我想如果这个公司能快速的发展壮大,或者被一些大公司收购的话,对Scala的支持才能提上日程。

你需要把下面的包加入到你的依赖中:

  • Core (必须) co.paralleluniverse:quasar-core:0.7.5[:jdk8] (对于 JDK 8,需要增加jdk8 classifier)
  • Actor co.paralleluniverse:quasar-actors:0.7.5
  • Clustering co.paralleluniverse:quasar-galaxy:0.7.5
  • Reactive Stream co.paralleluniverse:quasar-reactive-streams:0.7.5
  • Kotlin co.paralleluniverse:quasar-kotlin:0.7.5

Quasar fiber依赖java instrumentation修改你的代码,可以在运行时通过java Agent实现,也可以在编译时使用ant task实现。

通过java agent很简单,在程序启动的时候将下面的指令加入到命令行:

1
-javaagent:path-to-quasar-jar.jar

对于maven来说,你可以使用插件maven-dependency-plugin,它会为你的每个依赖设置一个属性,以便在其它地方引用,我们主要想使用 ${co.paralleluniverse:quasar-core:jar}:

1
2
3
4
5
6
7
8
9
10
11
12
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>

然后你可以配置exec-maven-plugin或者maven-surefire-plugin加上agent参数,在执行maven任务的时候久可以使用Quasar了。

官方提供了一个Quasar Maven archetype,你可以通过下面的命令生成一个quasar应用原型:

1
2
3
4
5
6
7
8
git clone https://github.com/puniverse/quasar-mvn-archetype
cd quasar-mvn-archetype
mvn install
cd ..
mvn archetype:generate -DarchetypeGroupId=co.paralleluniverse -DarchetypeArtifactId=quasar-mvn-archetype -DarchetypeVersion=0.7.4 -DgroupId=testgrp -DartifactId=testprj
cd testprj
mvn test
mvn clean compile dependency:properties exec:exec

如果你使用gradle,可以看一下gradle项目模版:Quasar Gradle template project

最容易使用Quasar的方案就是使用Java Agent,它可以在运行时instrument程序。如果你想编译的时候就使用AOT instrumentation(Ahead-of-Time),可以使用Ant任务co.paralleluniverse.fibers.instrument.InstrumentationTask,它包含在quasar-core.jar中。

Quasar最主要的贡献就是提供了轻量级线程的实现,叫做fiber(纤程)。Fiber的功能和使用类似Thread, API接口也类似,所以使用起来没有违和感,但是它们不是被操作系统管理的,它们是由一个或者多个ForkJoinPool调度。一个idle fiber只占用400K内存,切换的时候占用更少的CPU,你的应用中可以有上百万的fiber,显然Thread做不到这一点。这一点和Go的goroutine类似。

Fiber并不意味着它可以在所有的场景中都可以替换Thread。当fiber的代码经常会被等待其它fiber阻塞的时候,就应该使用fiber。
对于那些需要CPU长时间计算的代码,很少遇到阻塞的时候,就应该首选thread

以上两条是选择fiber还是thread的判断条件,主要还是看任务是I/O blocking相关还是CPU相关。幸运地是,fiber API使用和thread使用类似,所以代码略微修改久就可以兼容。

Fiber特别适合替换哪些异步回调的代码。使用FiberAsync异步回调很简单,而且性能很好,扩展性也更高。

类似Thread, fiber也是用Fiber类表示:

1
2
3
4
5
6
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();

与Thread类似,但也有些不同。Fiber可以有一个返回值,类型为泛型V,也可以为空Void。run也可以抛出异常InterruptedException

你可以传递SuspendableRunnableSuspendableCallable 给Fiber的构造函数:

1
2
3
4
5
new Fiber<Void>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();

甚至你可以调用Fiber的join方法等待它完成,调用get方法得到它的结果。

Fiber继承Strand类。Strand类代表一个Fiber或者Thread,提供了一些底层的方法。

逃逸的Fiber(Runaway Fiber)是指那些陷入循环而没有block、或者block fiber本身运行的线程的Fiber。偶尔有逃逸的fiber没有问题,但是太频繁会导致性能的下降,因为需要调度器的线程可能都忙于逃逸fiber了。Quasar会监控这些逃逸fiber,你可以通过JMX监控。如果你不想监控,可以设置系统属性co.paralleluniverse.fibers.detectRunawayFibersfalse

fiber中的ThreadLocal是fiber local的。InheritableThreadLocal继承父fiber的值。

Fiber、SuspendableRunnable 、SuspendableCallable 的run方法会抛出SuspendExecution异常。但这并不是真正意义的异常,而是fiber内部工作的机制,通过这个异常暂停因block而需要暂停的fiber。

任何在Fiber中运行的方法,需要声明这个异常(或者标记@Suspendable),都被称为suspendable method。

反射调用通常都被认为是suspendable, Java8 lambda 也被认为是suspendable。不应该将类构造函数或类初始化器标记为suspendable。

synchronized语句块或者方法会阻塞操作系统线程,所以它们不应该标记为suspendable。Blocking线程调用默认也不被quasar允许。但是这两种情况都可以被quasar处理,你需要在Quasar javaagent中分别加上mb参数,或者ant任务中加上allowMonitorsallowBlocking属性。

quasar原理

Quasar最初fork自Continuations Library

如果你了解其它语言的coroutine, 比如Lua,你久比较容易理解quasar的fiber了。 Fiber实质上是 continuation, continuation可以捕获一个计算的状态,可以暂停当前的计算,等隔一段时间可以继续执行。Quasar通过instrument修改suspendable方法。Quasar的调度器使用ForkJoinPool调度这些fiber。

Fiber调度器FiberScheduler是一个高效的、work-stealing、多线程的调度器。

默认的调度器是FiberForkJoinScheduler,但是你可以使用自己的线程池去调度,请参考FiberExecutorScheduler

当一个类被加载时,Quasar的instrumentation模块 (使用 Java agent时) 搜索suspendable 方法。每一个suspendable 方法 f通过下面的方式 instrument:
它搜索对其它suspendable方法的调用。对suspendable方法g的调用,一些代码会在这个调用g的前后被插入,它们会保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,我们会发现对Fiber.park的调用。park暂停这个fiber,扔出 SuspendExecution异常。

g block的时候,SuspendExecution异常会被Fiber捕获。 当Fiber被唤醒(使用unpark), 方法f会被调用, 执行记录显示它被block在g的调用上,所以程序会立即跳到f调用g的那一行,然后调用它。最终我们会到达暂停点,然后继续执行。当g返回时, f中插入的代码会恢复f的本地变量。

过程听起来很复杂,但是它只会带来3% ~ 5%的性能的损失。

下面看一个简单的例子, 方法m2声明抛出SuspendExecution异常,方法m1调用m2和m3,所以也声明抛出这个异常,最后这个异常会被Fiber所捕获:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = “m1”;
System.out.println(“m1 begin”);
m = m2();
m = m3();
System.out.println(“m1 end”);
System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return “m2”;
}
static String m3() throws SuspendExecution, InterruptedException {
return “m3”;
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber<Void>(“Caller”, new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
m1();
}
}).start();
}
}

反编译这段代码 (一般的反编译软件如jd-gui不能把这段代码反编译java文件,Procyon虽然能反编译,但是感觉反编译有错。所以我们还是看字节码吧):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}

这段反编译的代码显示了方法m被instrument后的样子,虽然我们不能很清楚的看到代码执行的样子,但是也可以大概地看到它实际在方法的最开始加入了此方法的栈信息的检查(#0 ~ #49,如果是第一次运行这个方法,则直接运行,
然后在一些暂停点上加上一些栈压入的处理,并且可以在下次执行的时候直接跳到上次的暂停点上。

官方的工程师关于Quasar的instrument操作如下:

  • Fully analyze the bytecode to find all the calls into suspendable methods. A method that (potentially) calls into other suspendable methods is itself considered suspendable, transitively.
  • Inject minimal bytecode in suspendable methods (and only them) that will manage an user-mode stack, in the following places:
    • At the beginning we’ll check if we’re resuming the fiber and only in this case we’ll jump into the relevant bytecode index.
    • Before a call into another suspendable method we’ll push a snapshot of the current activation frame, including the resume bytecode index; we can do it because we know the structure statically from the analysis phase.
    • After a call into another suspendable method we’ll pop the top activation frame and, if resumed, we’ll restore it in the current fiber.

我并没有更深入的去了解Quasar的实现细节以及调度算法,有兴趣的读者可以翻翻它的代码。如果你有更深入的剖析,请留下相关的地址,以便我加到参考文档中。

曾经, 陆陆续续也有一些Java coroutine的实现(coroutine-libraries), 但是目前来说最好的应该还是Quasar。

Oracle会实现一个官方的纤程库吗?目前来说没有看到这方面的计划,而且从Java的开发进度上来看,这个特性可能是遥遥无期的,所以目前还只能借助社区的力量,从第三方库如Quasar中寻找解决方案。

更多的Quasar知识,比如Channel、Actor、Reactive Stream 的使用可以参考官方的文档,官方也提供了多个例子

Comsat介绍

Comsat又是什么?

Comsat还是Parallel Universe提供的集成Quasar的一套开源库,可以提供web或者企业级的技术,如HTTP服务和数据库访问。

Comsat并不是一套web框架。它并不提供新的API,只是为现有的技术如Servlet、JAX-RS、JDBC等提供Quasar fiber的集成。

它包含非常多的库,比如Spring、ApacheHttpClient、OkHttp、Undertow、Netty、Kafka等。

性能对比

刘小溪在CSDN上写了一篇关于Quasar的文章:次时代Java编程(一):Java里的协程,写的挺好,建议读者读一读。

它参考Skynet的测试写了代码进行对比,这个测试是并发执行整数的累加:
测试结果是Golang花了261毫秒,Quasar花了612毫秒。其实结果还不错,但是文中指出这个测试没有发挥Quasar的性能。因为quasar的性能主要在于阻塞代码的调度上。
虽然文中加入了排序的功能,显示Java要比Golang要好,但是我觉得这又陷入了另外一种错误的比较, Java的排序算法使用TimSort,排序效果相当好,Go的排序效果显然比不上Java的实现,所以最后的测试主要测试排序算法上。 真正要体现Quasar的性能还是测试在有阻塞的情况下fiber的调度性能。

HttpClient

话题扯的越来越远了,拉回来。我最初的目的是要解决的是在第三方服务响应慢的情况下提高系统 A 的吞吐率。最初A是使用200个线程处理业务逻辑,调用第三方服务。因为线程总是被第三方服务阻塞,所以系统A的吞吐率总是很低。

虽然使用Go可以解决这个问题,但是对于系统A的改造比较大,还增加了系统的复杂性。Netty性能好,改动量还可以接受,但是不妨看一下这个场景,系统的问题是由http阻塞引起。

这正是Quasar fiber适合的场景,如果一个Fiber被阻塞,它可以暂时放弃线程,以便线程可以用来执行其它的Fiber。虽然整个集成系统的吞吐率依然很低,这是无法避免的,但是系统的吞吐率确很高。

Comsat提供了Apache Http Client的实现: FiberHttpClientBuilder

1
2
3
4
final CloseableHttpClient client = FiberHttpClientBuilder.
create(2). // use 2 io threads
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).build();

然后在Fiber中久可以调用:

1
String response = client.execute(new HttpGet(“http://localhost:8080”), BASIC_RESPONSE_HANDLER);

你也可以使用异步的HttpClient:

1
2
3
4
5
6
final CloseableHttpAsyncClient client = FiberCloseableHttpAsyncClient.wrap(HttpAsyncClients.
custom().
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).
build());
client.start();

Comsat还提供了Jersey Http Client: AsyncClientBuilder.newClient()

甚至提供了RetrofitOkHttp的实现。

经过测试,虽然随着系统B、C、D的响应时间的拉长,吞吐率有所降低,但是在latency为100毫秒的时候吞吐率依然能达到9900 requests/秒,可以满足我们的需求,而我们的代码改动的比较小。

综上所述,如果想彻底改造系统A,则可以使用Go库重写,或者使用Netty + Rx的方式去处理,都能达到比较好的效果。如果想改动比较小,可以考虑使用quasar替换线程对代码进行维护。

我希望本文不要给读者造成误解,以为Java NIO/Selector这种方式不能解决本文的问题,也就是第三方阻塞的问题。 事实上Java NIO也正是适合解决这样的问题, 比如Netty性能就不错,但是你需要小心的是, 不要让你的这个client对外又变成阻塞的方式,而是程序应该异步的去发送request和处理response。当然本文重点不是介绍这种实现,而是介绍Java的线程库,它可以改造传统的代码,即使有阻塞,也只是阻塞Fiber,而不是阻塞线程,这是另一个解决问题的思路。

另一篇关于Quasar的文档: 继续了解Java的纤程库 – Quasar

参考文档

from:http://colobu.com/2016/07/14/Java-Fiber-Quasar/