influxdb-relay性能瓶颈分析
起因
为了让influxdb能够达到高可用,我便考虑在influxdb外面套一层,比如nginx。然而发现官方已经开发了一个influxdb-relay的东西,于是决定索性使用这个东西。 首先看一张官方README.md中的图。
┌─────────────────┐ │writes & queries │ └─────────────────┘ │ ▼ ┌───────────────┐ │ │ ┌────────│ Load Balancer │─────────┐ │ │ │ │ │ └──────┬─┬──────┘ │ │ │ │ │ │ │ │ │ │ ┌──────┘ └────────┐ │ │ │ ┌─────────────┐ │ │┌──────┐ │ │ │/write or UDP│ │ ││/query│ │ ▼ └─────────────┘ ▼ │└──────┘ │ ┌──────────┐ ┌──────────┐ │ │ │ InfluxDB │ │ InfluxDB │ │ │ │ Relay │ │ Relay │ │ │ └──┬────┬──┘ └────┬──┬──┘ │ │ │ | | │ │ │ | ┌─┼──────────────┘ | │ │ │ │ └──────────────┐ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ │ │ │ │ │ │ │ └─▶│ InfluxDB │ │ InfluxDB │◀─┘ │ │ │ │ └──────────┘ └──────────┘
influxdb-relay只做数据的冗余写入,并在后端的influxdb宕机时,将数据存储在内存,当influxdb恢复时,将宕机期间的数据重新写回influxdb。
客户端在访问influxdb的时候,实际访问的是一个 Load Balancer
比如 Nginx
, 然后 Load Balancer
根据不同的path,选择influxdb或者influxdb-relay。
然而,这influxdb-relay的性能却很奇怪。
POST /write Requests [total, rate] 180000, 3000.02 Duration [total, attack, wait] 1m20.11586256s, 59.999606572s, 20.116255988s Latencies [mean, 50, 95, 99, max] 1.404856127s, 1.214236ms, 5.609988425s, 43.430687195s, 50.347228848s Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 3958680, 21.99 Success [ratio] 54.98% Status Codes [code:count] 204:98967 0:81033 GET /ping Requests [total, rate] 180000, 3000.02 Duration [total, attack, wait] 59.999785977s, 59.999606599s, 179.378µs Latencies [mean, 50, 95, 99, max] 150.317µs, 145.68µs, 166.688µs, 194.65µs, 2.344456ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 0, 0.00 Success [ratio] 100.00% Status Codes [code:count] 204:180000
过程
在之前的文章里面提过,最新版influxdb,即便是在 vegeta 5000/s 的攻击下,还是能保证99%以上的成功率,
然而 influxdb-relay
却在 3000/s 的速度的时候就已经撑不住了。作为influxdb的前置程序,至少要达到或者
超过influxdb的处理性能,才算合理吧。所以我做了个对比试验,在处理Http请求的 http.go
里加入了
ping
处理逻辑。经过测试,可以看出,go的http库,应该不慢,所以还是在relay处理write的逻辑上有问题。
/ping
这个url官方的代码里面,没有加入,这是我在 relay/http.go
加的,代码如下:
1: func (h *HTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) { 2: start := time.Now() 3: 4: if r.URL.Path == "/ping" && (r.Method == "GET" || r.Method == "HEAD") { 5: w.Header().Add("X-InfluxDB-Version", "relay") 6: w.WriteHeader(http.StatusNoContent) 7: return 8: }
实际这段代码,根据 influxdb-relay
的设计理念是不需要的。
再回看一下vegeta的report中的Error Set:
Error Set: Post http://127.0.0.1:9096/write?db=test: dial tcp 0.0.0.0:0->127.0.0.1:9096: bind: can't assign requested address Post http://127.0.0.1:9096/write?db=test: dial tcp 0.0.0.0:0->127.0.0.1:9096: socket: too many open files Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59803->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59805->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59806->127.0.0.1:9096: read: connection reset by peer
这里既有 socket: too many open files
, 又有 read: connection reset by peer
, 还有 bind: can't assign requested address
,
然后结合代码。
1: var responses = make(chan *responseData, len(h.backends)) 2: 3: for _, b := range h.backends { 4: b := b 5: go func() { 6: defer wg.Done() 7: resp, err := b.post(outBytes, query, authHeader) 8: if err != nil { 9: log.Printf("Problem posting to relay %q backend %q: %v", h.Name(), b.name, err) 10: } else { 11: if resp.StatusCode/100 == 5 { 12: log.Printf("5xx response for relay %q backend %q: %v", h.Name(), b.name, resp.StatusCode) 13: } 14: responses <- resp 15: } 16: }() 17: } 18: 19: go func() { 20: wg.Wait() 21: close(responses) 22: putBuf(outBuf) 23: }() 24: 25: var errResponse *responseData 26: 27: for resp := range responses {
首先这里,开了一个 channel
, var responses = make(chan *responseData, len(h.backends))
, 只有当
所有的backends都回复了之后,至二个 responses
channel 才会关闭,客户端才能拿到结果,然而一旦某一个
backends卡壳了,就要等待go的http client timeout了,这个timeout默认时间是10s, 相当于说客户端至少要等待
10s,然而实际并不止这样。在看看 retry.go
中的部分代码:
1: interval := r.initialInterval 2: for { 3: resp, err := r.p.post(buf.Bytes(), batch.query, batch.auth) 4: if err == nil && resp.StatusCode/100 != 5 { 5: batch.resp = resp 6: atomic.StoreInt32(&r.buffering, 0) 7: batch.wg.Done() 8: break 9: } 10: 11: if interval != r.maxInterval { 12: interval *= r.multiplier 13: if interval > r.maxInterval { 14: interval = r.maxInterval 15: } 16: } 17: 18: time.Sleep(interval) 19: }
当超时等statusCode >= 500的错误发生时,retry会将这个请求加入bufer中,然后由run方法获取batch并向后端influxdb请求。 这时的逻辑是,一旦请求失败,就sleep一定时间,而这个一定时间就是初始时间乘以一个放大因子,放大因子默认是2,于是客户端 就会在不断等待中,最后超时。而在vegeta疯狂的攻击下,是经不起等待的。所以我改了下http.go中的逻辑,客户端请求后,直接 返回204,让客户端不再等待。
1: (&responseData{ 2: StatusCode: 204, 3: }).Write(w)
删除 responses channel
, 以及对应的代码。
貌似有了一定的改善。
Requests [total, rate] 180000, 3000.02 Duration [total, attack, wait] 1m17.299212505s, 59.999606586s, 17.299605919s Latencies [mean, 50, 95, 99, max] 672.645729ms, 185.598µs, 345.300005ms, 30.003589182s, 36.777965011s Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 6231240, 34.62 Success [ratio] 86.55% Status Codes [code:count] 204:155781 0:24219 Error Set: Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57421->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57406->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57407->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57404->127.0.0.1:9096: write: broken pipe Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57399->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57413->127.0.0.1:9096: write: broken pipe Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57418->127.0.0.1:9096: write: broken pipe Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57416->127.0.0.1:9096: write: broken pipe Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57398->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57396->127.0.0.1:9096: read: connection reset by peer Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57402->127.0.0.1:9096: write: broken pipe Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57415->127.0.0.1:9096: read: connection reset by peer
但是还是很糟糕,毕竟之前influxdb的数据与这个还是有一定差距的。
于是我把目光放到的 retry.go
中
1: func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) { 2: if atomic.LoadInt32(&r.buffering) == 0 { 3: resp, err := r.p.post(buf, query, auth) 4: // TODO A 5xx caused by the point data could cause the relay to buffer forever 5: if err == nil && resp.StatusCode/100 != 5 { 6: return resp, err 7: } 8: atomic.StoreInt32(&r.buffering, 1) 9: } 10: 11: // already buffering or failed request 12: batch, err := r.list.add(buf, query, auth) 13: if err != nil { 14: return nil, err 15: } 16: 17: batch.wg.Wait() 18: return batch.resp, nil 19: }
如果没有buffering那么,直接发送请求给influxdb,不然就把请求放到buffer中,如果buffer满了,就返回错误。既然已经在客户端那边 直接返回了204那么,这个没有buffer的raw的请求就没有必要再单独处理了,索性一并放到buffer中去,buffer有一个好处,就是能把多个 请求合并成一个请求提交给后端的influxdb,这样就能减少请求次数了。代码改成如下:
1: func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) { 2: batch, err := r.list.add(buf, query, auth) 3: if err != nil { 4: return nil, err 5: } 6: 7: batch.wg.Wait() 8: return batch.resp, nil 9: }
用2000/s速度测试,结果如下:
Requests [total, rate] 120000, 2000.02 Duration [total, attack, wait] 1m0.000271382s, 59.999499926s, 771.456µs Latencies [mean, 50, 95, 99, max] 304.395µs, 259.447µs, 460.682µs, 1.044402ms, 42.391318ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 4800000, 40.00 Success [ratio] 100.00% Status Codes [code:count] 204:120000 Error Set:
其实我没法用更快的速度测试,如果是3000/s,那么就会出下面的问题。
2016/08/13 17:52:22 starting relays... 2016/08/13 17:52:22 Starting HTTP relay "example-http" on 127.0.0.1:9096 panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x0 pc=0x837d8] goroutine 38179 [running]: panic(0x370fc0, 0xc820014200) /Users/shane/.gvm/gos/go1.6.2/src/runtime/panic.go:481 +0x3e6 github.com/influxdata/influxdb-relay/relay.(*retryBuffer).post(0xc820010b90, 0xc8202de254, 0x3c, 0x40, 0xc820393700, 0x7, 0x0, 0x0, 0xc82002d500, 0x0, ...) /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/retry.go:56 +0x118 github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP.func1(0xc820393710, 0xc8200c9ce0, 0xc8202de254, 0x3c, 0x40, 0xc820393700, 0x7, 0x0, 0x0, 0xc820022280) /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:210 +0xe8 created by github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:218 +0xce6
这块地方正是我修改的代码,而出错的那行是这样的:
1: batch.wg.Wait()
invalid memory address or nil
, 我在这行代码前面加几行。
1: if batch == nil { 2: log.Print("batch is nil") 3: } 4: batch.wg.Wait()
果然打出了日志
2016/08/13 18:06:28 batch is nil
这个错误很有意思了,batch是通过 bufferList
的 add
方法得到,并且在方法的末尾,有空值检查。
1: if *cur == nil { 2: // new tail element 3: *cur = newBatch(buf, query, auth) 4: } else { 5: // append to current batch 6: b := *cur 7: b.size += len(buf) 8: b.bufs = append(b.bufs, buf) 9: } 10: 11: l.cond.L.Unlock() 12: return *cur, nil
首先要排除,我的修改有没有问题,把代码回退,用2000/s的速度测试。但是很不幸,这个速度会让influxdb-relay直接挂起,所以索性把 http.go
请求influxdb的代码改了。
1: func (b *simplePoster) post(buf []byte, query string, auth string) (*responseData, error) { 2: time.Sleep(time.Microsecond * time.Duration(rand.Intn(400))) 3: if auth == "hello" { 4: return &responseData{ 5: StatusCode: 204, 6: }, nil 7: } else { 8: return &responseData{ 9: StatusCode: 502, 10: }, nil 11: } 12: }
这里要模拟一个场景:第一次请求的时候均失败,在run方法请求的时候均成功,time.Sleep模拟请求耗时。为了甄别请求的调用者,这里在auth这个参数上做了点文章。所以要修改下 retry.go
中的 run
方法的调用,把 "hello"
作为参数传递给 SimplePoster.post
方法。
1: for { 2: resp, err := r.p.post(buf.Bytes(), batch.query, "hello") 3: if err == nil && resp.StatusCode/100 != 5 {
然后用2000/s的速度测试,果然出问题了。
2016/08/14 09:11:40 starting relays... 2016/08/14 09:11:40 Starting HTTP relay "example-http" on 127.0.0.1:9096 panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x0 pc=0x83463] goroutine 77131 [running]: panic(0x370cc0, 0xc820014200) /Users/shane/.gvm/gos/go1.6.2/src/runtime/panic.go:481 +0x3e6 github.com/influxdata/influxdb-relay/relay.(*retryBuffer).post(0xc820010b90, 0xc820164000, 0x3c, 0x200, 0xc8205efbb0, 0x7, 0x0, 0x0, 0xc82002c000, 0x0, ...) /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/retry.go:66 +0x273 github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP.func1(0xc8205efbc0, 0xc8200d5d00, 0xc820164000, 0x3c, 0x200, 0xc8205efbb0, 0x7, 0x0, 0x0, 0xc820022280) /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:211 +0xe8 created by github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP /Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:219 +0xce6
然后把用来模拟http请求耗时的time.Sleep去掉,异常又不发生了。以我这三脚猫的go语言功底,一时间难以发现错误的原因,但是直觉很重要。我在 BufferList.add
的 l.cond.L.Unlock
后面加了一个 time.Sleep
, 情况会怎样呢。
1: func (l *bufferList) add(buf []byte, query string, auth string) (*batch, error) { 2: 3: // ... 4: 5: l.cond.L.Unlock() 6: time.Sleep(time.Microsecond * time.Duration(rand.Intn(100))) 7: return *cur, nil 8: }
启动之后,一请求就把报错。
经过一番仔细思考,我得出一个结论。 BufferList.add
方法返回了执行 Batch
的指针,而 Unlock
之后, BufferList.pop
方法就会改变 BufferList
中数据的,这时候post方法中,获取的地址指向的 Batch
已经被 pop
方法改变,很可能已经是nil,所以就报错了。知道了原因修改起来就相对容易了,把 Unlock
调用置后,在 return
之后,也就是 post
方法中获取到值之后,再 Unlock
。
1: func (l *bufferList) add(buf []byte, query string, auth string) (*batch, error) { 2: 3: // ... 4: 5: defer l.cond.L.Unlock() 6: return *cur, nil 7: }
测试之后果然没有再出现之前的错误了。
回到之前的故事。我把所有的请求都扔到了 BufferList
中,这样由于发送速度相对较快,那么必然出现请求合并的场景,这样减少请求次数,增加influxdb的稳定性。
但是当Buffer满的时候,这种情况在请求速度大于消费速度(比如influxdb宕机)的情况下就会发生。如果按照之前的逻辑,那么客户端是不知道自己的这次请求因为 BufferList
满了,而没有成功。为了解决这个问题,我把 http.go
中用来处理response的代码,加回来,并修改了 retry.go
中的 post
方法。
1: func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) { 2: pb := getBuf() 3: pb.Write(buf) 4: batch, err := r.list.add(pb.Bytes(), query, auth) 5: if err != nil { 6: putBuf(pb) 7: return nil, err 8: } 9: 10: go func() { 11: batch.wg.Wait() 12: putBuf(pb) 13: }() 14: 15: return &responseData{ 16: StatusCode: 204, 17: }, nil 18: }
下面分别是 vegeta 在3000/s, 5000/s, 10000/s的测试结果
Requests [total, rate] 180000, 3000.02 Duration [total, attack, wait] 59.999890163s, 59.999606586s, 283.577µs Latencies [mean, 50, 95, 99, max] 290.602µs, 232.224µs, 402.502µs, 1.371521ms, 16.056569ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 7200000, 40.00 Success [ratio] 100.00% Status Codes [code:count] 204:180000 Error Set: > select count(value) from cpu name: cpu --------- time count 0 180000 Requests [total, rate] 300000, 5000.02 Duration [total, attack, wait] 1m0.000013963s, 59.999799896s, 214.067µs Latencies [mean, 50, 95, 99, max] 258.591µs, 191.622µs, 350.592µs, 1.479882ms, 14.940625ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 12000000, 40.00 Success [ratio] 100.00% Status Codes [code:count] 204:300000 Error Set: > select count(value) from cpu name: cpu --------- time count 0 299997 Requests [total, rate] 600000, 10000.02 Duration [total, attack, wait] 1m0.000158017s, 59.999899912s, 258.105µs Latencies [mean, 50, 95, 99, max] 329.228µs, 185.111µs, 745.028µs, 4.522189ms, 18.195853ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 24000000, 40.00 Success [ratio] 100.00% Status Codes [code:count] 204:600000 Error Set: > select count(value) from cpu name: cpu --------- time count 0 599989
裸的influxdb承受不了vegeta 6000/s以上的攻击,而现在套了influxdb-relay之后就能承受10000/s+的攻击了,虽然真实场景可能更为复杂, 尤其是读和写都会发生的情况,单从上面的实验可以看出修改版的influxdb-relay已经基本能满足需求了。