Shane Xu's Home

Life is too short for so much sorrow.

influxdb-relay性能瓶颈分析

起因

为了让influxdb能够达到高可用,我便考虑在influxdb外面套一层,比如nginx。然而发现官方已经开发了一个influxdb-relay的东西,于是决定索性使用这个东西。 首先看一张官方README.md中的图。

        ┌─────────────────┐                 
        │writes & queries │                 
        └─────────────────┘                 
                 │                          
                 ▼                          
         ┌───────────────┐                  
         │               │                  
┌────────│ Load Balancer │─────────┐        
│        │               │         │        
│        └──────┬─┬──────┘         │        
│               │ │                │        
│               │ │                │        
│        ┌──────┘ └────────┐       │        
│        │ ┌─────────────┐ │       │┌──────┐
│        │ │/write or UDP│ │       ││/query│
│        ▼ └─────────────┘ ▼       │└──────┘
│  ┌──────────┐      ┌──────────┐  │        
│  │ InfluxDB │      │ InfluxDB │  │        
│  │ Relay    │      │ Relay    │  │        
│  └──┬────┬──┘      └────┬──┬──┘  │        
│     │    |              |  │     │        
│     |  ┌─┼──────────────┘  |     │        
│     │  │ └──────────────┐  │     │        
│     ▼  ▼                ▼  ▼     │        
│  ┌──────────┐      ┌──────────┐  │        
│  │          │      │          │  │        
└─▶│ InfluxDB │      │ InfluxDB │◀─┘        
   │          │      │          │           
   └──────────┘      └──────────┘           

influxdb-relay只做数据的冗余写入,并在后端的influxdb宕机时,将数据存储在内存,当influxdb恢复时,将宕机期间的数据重新写回influxdb。 客户端在访问influxdb的时候,实际访问的是一个 Load Balancer 比如 Nginx , 然后 Load Balancer 根据不同的path,选择influxdb或者influxdb-relay。

然而,这influxdb-relay的性能却很奇怪。

POST /write
Requests      [total, rate]            180000, 3000.02
Duration      [total, attack, wait]    1m20.11586256s, 59.999606572s, 20.116255988s
Latencies     [mean, 50, 95, 99, max]  1.404856127s, 1.214236ms, 5.609988425s, 43.430687195s, 50.347228848s
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            3958680, 21.99
Success       [ratio]                  54.98%
Status Codes  [code:count]             204:98967  0:81033

GET /ping
Requests      [total, rate]            180000, 3000.02
Duration      [total, attack, wait]    59.999785977s, 59.999606599s, 179.378µs
Latencies     [mean, 50, 95, 99, max]  150.317µs, 145.68µs, 166.688µs, 194.65µs, 2.344456ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            0, 0.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:180000

过程

在之前的文章里面提过,最新版influxdb,即便是在 vegeta 5000/s 的攻击下,还是能保证99%以上的成功率, 然而 influxdb-relay 却在 3000/s 的速度的时候就已经撑不住了。作为influxdb的前置程序,至少要达到或者 超过influxdb的处理性能,才算合理吧。所以我做了个对比试验,在处理Http请求的 http.go 里加入了 ping 处理逻辑。经过测试,可以看出,go的http库,应该不慢,所以还是在relay处理write的逻辑上有问题。

/ping 这个url官方的代码里面,没有加入,这是我在 relay/http.go 加的,代码如下:

1: func (h *HTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2:     start := time.Now()
3: 
4:     if r.URL.Path == "/ping" && (r.Method == "GET" || r.Method == "HEAD") {
5:         w.Header().Add("X-InfluxDB-Version", "relay")
6:         w.WriteHeader(http.StatusNoContent)
7:         return
8:     }

实际这段代码,根据 influxdb-relay 的设计理念是不需要的。

再回看一下vegeta的report中的Error Set:

Error Set:
Post http://127.0.0.1:9096/write?db=test: dial tcp 0.0.0.0:0->127.0.0.1:9096: bind: can't assign requested address
Post http://127.0.0.1:9096/write?db=test: dial tcp 0.0.0.0:0->127.0.0.1:9096: socket: too many open files
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59803->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59805->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59806->127.0.0.1:9096: read: connection reset by peer

这里既有 socket: too many open files , 又有 read: connection reset by peer , 还有 bind: can't assign requested address , 然后结合代码。

 1: var responses = make(chan *responseData, len(h.backends))
 2: 
 3: for _, b := range h.backends {
 4:     b := b
 5:     go func() {
 6:         defer wg.Done()
 7:         resp, err := b.post(outBytes, query, authHeader)
 8:         if err != nil {
 9:             log.Printf("Problem posting to relay %q backend %q: %v", h.Name(), b.name, err)
10:         } else {
11:             if resp.StatusCode/100 == 5 {
12:                 log.Printf("5xx response for relay %q backend %q: %v", h.Name(), b.name, resp.StatusCode)
13:             }
14:             responses <- resp
15:         }
16:     }()
17: }
18: 
19: go func() {
20:     wg.Wait()
21:     close(responses)
22:     putBuf(outBuf)
23: }()
24: 
25: var errResponse *responseData
26: 
27: for resp := range responses {

首先这里,开了一个 channel , var responses = make(chan *responseData, len(h.backends)) , 只有当 所有的backends都回复了之后,至二个 responses channel 才会关闭,客户端才能拿到结果,然而一旦某一个 backends卡壳了,就要等待go的http client timeout了,这个timeout默认时间是10s, 相当于说客户端至少要等待 10s,然而实际并不止这样。在看看 retry.go 中的部分代码:

 1: interval := r.initialInterval
 2: for {
 3:     resp, err := r.p.post(buf.Bytes(), batch.query, batch.auth)
 4:     if err == nil && resp.StatusCode/100 != 5 {
 5:         batch.resp = resp
 6:         atomic.StoreInt32(&r.buffering, 0)
 7:         batch.wg.Done()
 8:         break
 9:     }
10: 
11:     if interval != r.maxInterval {
12:         interval *= r.multiplier
13:         if interval > r.maxInterval {
14:             interval = r.maxInterval
15:         }
16:     }
17: 
18:     time.Sleep(interval)
19: }

当超时等statusCode >= 500的错误发生时,retry会将这个请求加入bufer中,然后由run方法获取batch并向后端influxdb请求。 这时的逻辑是,一旦请求失败,就sleep一定时间,而这个一定时间就是初始时间乘以一个放大因子,放大因子默认是2,于是客户端 就会在不断等待中,最后超时。而在vegeta疯狂的攻击下,是经不起等待的。所以我改了下http.go中的逻辑,客户端请求后,直接 返回204,让客户端不再等待。

1: (&responseData{
2:     StatusCode: 204,
3: }).Write(w)

删除 responses channel , 以及对应的代码。 貌似有了一定的改善。

Requests      [total, rate]            180000, 3000.02
Duration      [total, attack, wait]    1m17.299212505s, 59.999606586s, 17.299605919s
Latencies     [mean, 50, 95, 99, max]  672.645729ms, 185.598µs, 345.300005ms, 30.003589182s, 36.777965011s
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            6231240, 34.62
Success       [ratio]                  86.55%
Status Codes  [code:count]             204:155781  0:24219  
Error Set:
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57421->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57406->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57407->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57404->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57399->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57413->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57418->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57416->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57398->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57396->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57402->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57415->127.0.0.1:9096: read: connection reset by peer

但是还是很糟糕,毕竟之前influxdb的数据与这个还是有一定差距的。 于是我把目光放到的 retry.go

 1: func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) {
 2:     if atomic.LoadInt32(&r.buffering) == 0 {
 3:         resp, err := r.p.post(buf, query, auth)
 4:         // TODO A 5xx caused by the point data could cause the relay to buffer forever
 5:         if err == nil && resp.StatusCode/100 != 5 {
 6:             return resp, err
 7:         }
 8:         atomic.StoreInt32(&r.buffering, 1)
 9:     }
10: 
11:     // already buffering or failed request
12:     batch, err := r.list.add(buf, query, auth)
13:     if err != nil {
14:         return nil, err
15:     }
16: 
17:     batch.wg.Wait()
18:     return batch.resp, nil
19: }

如果没有buffering那么,直接发送请求给influxdb,不然就把请求放到buffer中,如果buffer满了,就返回错误。既然已经在客户端那边 直接返回了204那么,这个没有buffer的raw的请求就没有必要再单独处理了,索性一并放到buffer中去,buffer有一个好处,就是能把多个 请求合并成一个请求提交给后端的influxdb,这样就能减少请求次数了。代码改成如下:

1: func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) {
2:     batch, err := r.list.add(buf, query, auth)
3:     if err != nil {
4:         return nil, err
5:     }
6: 
7:     batch.wg.Wait()
8:     return batch.resp, nil
9: }

用2000/s速度测试,结果如下:

Requests      [total, rate]            120000, 2000.02
Duration      [total, attack, wait]    1m0.000271382s, 59.999499926s, 771.456µs
Latencies     [mean, 50, 95, 99, max]  304.395µs, 259.447µs, 460.682µs, 1.044402ms, 42.391318ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            4800000, 40.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:120000  
Error Set:

其实我没法用更快的速度测试,如果是3000/s,那么就会出下面的问题。

2016/08/13 17:52:22 starting relays...
2016/08/13 17:52:22 Starting HTTP relay "example-http" on 127.0.0.1:9096
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x837d8]

goroutine 38179 [running]:
panic(0x370fc0, 0xc820014200)
    /Users/shane/.gvm/gos/go1.6.2/src/runtime/panic.go:481 +0x3e6
github.com/influxdata/influxdb-relay/relay.(*retryBuffer).post(0xc820010b90, 0xc8202de254, 0x3c, 0x40, 0xc820393700, 0x7, 0x0, 0x0, 0xc82002d500, 0x0, ...)
    /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/retry.go:56 +0x118
github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP.func1(0xc820393710, 0xc8200c9ce0, 0xc8202de254, 0x3c, 0x40, 0xc820393700, 0x7, 0x0, 0x0, 0xc820022280)
    /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:210 +0xe8
created by github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP
    /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:218 +0xce6

这块地方正是我修改的代码,而出错的那行是这样的:

1: batch.wg.Wait()

invalid memory address or nil , 我在这行代码前面加几行。

1: if batch == nil {
2:     log.Print("batch is nil")
3: }
4: batch.wg.Wait()

果然打出了日志

2016/08/13 18:06:28 batch is nil

这个错误很有意思了,batch是通过 bufferListadd 方法得到,并且在方法的末尾,有空值检查。

 1: if *cur == nil {
 2:     // new tail element
 3:     *cur = newBatch(buf, query, auth)
 4: } else {
 5:     // append to current batch
 6:     b := *cur
 7:     b.size += len(buf)
 8:     b.bufs = append(b.bufs, buf)
 9: }
10: 
11: l.cond.L.Unlock()
12: return *cur, nil

首先要排除,我的修改有没有问题,把代码回退,用2000/s的速度测试。但是很不幸,这个速度会让influxdb-relay直接挂起,所以索性把 http.go 请求influxdb的代码改了。

 1: func (b *simplePoster) post(buf []byte, query string, auth string) (*responseData, error) {
 2:     time.Sleep(time.Microsecond * time.Duration(rand.Intn(400)))
 3:     if auth == "hello" {
 4:         return &responseData{
 5:             StatusCode: 204,
 6:         }, nil
 7:     } else {
 8:         return &responseData{
 9:             StatusCode: 502,
10:         }, nil
11:     }
12: }

这里要模拟一个场景:第一次请求的时候均失败,在run方法请求的时候均成功,time.Sleep模拟请求耗时。为了甄别请求的调用者,这里在auth这个参数上做了点文章。所以要修改下 retry.go 中的 run 方法的调用,把 "hello" 作为参数传递给 SimplePoster.post 方法。

1: for {
2:     resp, err := r.p.post(buf.Bytes(), batch.query, "hello")
3:     if err == nil && resp.StatusCode/100 != 5 {

然后用2000/s的速度测试,果然出问题了。

2016/08/14 09:11:40 starting relays...
2016/08/14 09:11:40 Starting HTTP relay "example-http" on 127.0.0.1:9096
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x83463]

goroutine 77131 [running]:
panic(0x370cc0, 0xc820014200)
    /Users/shane/.gvm/gos/go1.6.2/src/runtime/panic.go:481 +0x3e6
github.com/influxdata/influxdb-relay/relay.(*retryBuffer).post(0xc820010b90, 0xc820164000, 0x3c, 0x200, 0xc8205efbb0, 0x7, 0x0, 0x0, 0xc82002c000, 0x0, ...)
    /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/retry.go:66 +0x273
github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP.func1(0xc8205efbc0, 0xc8200d5d00, 0xc820164000, 0x3c, 0x200, 0xc8205efbb0, 0x7, 0x0, 0x0, 0xc820022280)
    /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:211 +0xe8
created by github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP
    /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:219 +0xce6

然后把用来模拟http请求耗时的time.Sleep去掉,异常又不发生了。以我这三脚猫的go语言功底,一时间难以发现错误的原因,但是直觉很重要。我在 BufferList.addl.cond.L.Unlock 后面加了一个 time.Sleep , 情况会怎样呢。

1: func (l *bufferList) add(buf []byte, query string, auth string) (*batch, error) {
2: 
3:   // ...
4: 
5:     l.cond.L.Unlock()
6:     time.Sleep(time.Microsecond * time.Duration(rand.Intn(100)))
7:     return *cur, nil
8: }

启动之后,一请求就把报错。 经过一番仔细思考,我得出一个结论。 BufferList.add 方法返回了执行 Batch 的指针,而 Unlock 之后, BufferList.pop 方法就会改变 BufferList 中数据的,这时候post方法中,获取的地址指向的 Batch 已经被 pop 方法改变,很可能已经是nil,所以就报错了。知道了原因修改起来就相对容易了,把 Unlock 调用置后,在 return 之后,也就是 post 方法中获取到值之后,再 Unlock

1: func (l *bufferList) add(buf []byte, query string, auth string) (*batch, error) {
2: 
3:   // ...
4: 
5:     defer l.cond.L.Unlock()
6:     return *cur, nil
7: }

测试之后果然没有再出现之前的错误了。

回到之前的故事。我把所有的请求都扔到了 BufferList 中,这样由于发送速度相对较快,那么必然出现请求合并的场景,这样减少请求次数,增加influxdb的稳定性。 但是当Buffer满的时候,这种情况在请求速度大于消费速度(比如influxdb宕机)的情况下就会发生。如果按照之前的逻辑,那么客户端是不知道自己的这次请求因为 BufferList 满了,而没有成功。为了解决这个问题,我把 http.go 中用来处理response的代码,加回来,并修改了 retry.go 中的 post 方法。

 1: func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) {
 2:     pb := getBuf()
 3:     pb.Write(buf)
 4:     batch, err := r.list.add(pb.Bytes(), query, auth)
 5:     if err != nil {
 6:         putBuf(pb)
 7:         return nil, err
 8:     }
 9: 
10:     go func() {
11:         batch.wg.Wait()
12:         putBuf(pb)
13:     }()
14: 
15:     return &responseData{
16:         StatusCode: 204,
17:     }, nil
18: }

下面分别是 vegeta 在3000/s, 5000/s, 10000/s的测试结果

Requests      [total, rate]            180000, 3000.02
Duration      [total, attack, wait]    59.999890163s, 59.999606586s, 283.577µs
Latencies     [mean, 50, 95, 99, max]  290.602µs, 232.224µs, 402.502µs, 1.371521ms, 16.056569ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            7200000, 40.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:180000  
Error Set:

> select count(value) from cpu
name: cpu
---------
time    count
0       180000


Requests      [total, rate]            300000, 5000.02
Duration      [total, attack, wait]    1m0.000013963s, 59.999799896s, 214.067µs
Latencies     [mean, 50, 95, 99, max]  258.591µs, 191.622µs, 350.592µs, 1.479882ms, 14.940625ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            12000000, 40.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:300000  
Error Set:

> select count(value) from cpu
name: cpu
---------
time    count
0       299997


Requests      [total, rate]            600000, 10000.02
Duration      [total, attack, wait]    1m0.000158017s, 59.999899912s, 258.105µs
Latencies     [mean, 50, 95, 99, max]  329.228µs, 185.111µs, 745.028µs, 4.522189ms, 18.195853ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            24000000, 40.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:600000  
Error Set:

> select count(value) from cpu
name: cpu
---------
time    count
0       599989

裸的influxdb承受不了vegeta 6000/s以上的攻击,而现在套了influxdb-relay之后就能承受10000/s+的攻击了,虽然真实场景可能更为复杂, 尤其是读和写都会发生的情况,单从上面的实验可以看出修改版的influxdb-relay已经基本能满足需求了。

Comments

comments powered by Disqus