Shane Xu's Home

Life is too short for so much sorrow.

提升 influxdb v0.10.2 性能

之前在给 influxdb 做性能测试的时候,得出的结论,v0.10.2 和 v0.13 在大量写入的时候,会占用大量的内存和 cpu,而之前线上之所以 influxdb 突然拒绝服务,长达数秒之久,原来是它直接挂了。既然 v1.0 还没有发布,我就思考能否将 v1.0 在解决性能问题上的修改,直接 apply 到 v0.10.2 上呢。有了这样一个思路之后,我就开始在 v0.13 的版本和 master 的最新提交之间,不断进行二分查找。最后终于被我找到了那次关键的提交。

commit c2370b437b9840363ed3d12638fe0ca0ea5ed296
Author: Jason Wilder <mail@jasonwilder.com>
Date:   Fri Jul 15 23:26:25 2016 -0600

    Limit in-flight wal writes/encodings
    
    A slower disk can can cause excessive allocations to occur when
    writing to the WAL because the slower encoding and compression occurs
    before taking the write lock.  The encoding/compression grabs a large
    byte slice from a pool and ultimately waits until it can acquire the
    write lock.
    
    This adds a throttle to limit how many inflight WAL writes can be queued
    up to prevent OOMing the processess with slower disks and heavy writes.

意思是说限制 wal writes/encodings,在拿到写锁之前,wal 已经 encoding 或者 compression 了,而前面这个操作需要预先,allocation 一块较大的内存,所以很可能造成 OOMing 问题。在看下这次改动的地方。

  1: diff --git a/pkg/throttle/throttle.go b/pkg/throttle/throttle.go
  2: new file mode 100644
  3: index 0000000..f29c4a2
  4: --- /dev/null
  5: +++ b/pkg/throttle/throttle.go
  6: @@ -0,0 +1,18 @@
  7: +package throttle
  8: +
  9: +// Fixed is a simple channel based concurrency limiter.  It uses a fixed
 10: +// size channel to limit callers from proceeding until there is a value avalable
 11: +// in the channel.  If all are in-use, the caller blocks until one is freed.
 12: +type Fixed chan struct{}
 13: +
 14: +func New(limit int) Fixed {
 15: +   return make(Fixed, limit)
 16: +}
 17: +
 18: +func (t Fixed) Take() {
 19: +   t <- struct{}{}
 20: +}
 21: +
 22: +func (t Fixed) Release() {
 23: +   <-t
 24: +}
 25: diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go
 26: index 3926f93..d7880cc 100644
 27: --- a/tsdb/engine/tsm1/wal.go
 28: +++ b/tsdb/engine/tsm1/wal.go
 29: @@ -18,6 +18,7 @@ import (
 30:  
 31:     "github.com/golang/snappy"
 32:     "github.com/influxdata/influxdb/models"
 33: +   "github.com/influxdata/influxdb/pkg/throttle"
 34:  )
 35:  
 36:  const (
 37: @@ -90,7 +91,8 @@ type WAL struct {
 38:     LoggingEnabled bool
 39:  
 40:     // statistics for the WAL
 41: -   stats *WALStatistics
 42: +   stats    *WALStatistics
 43: +   throttle throttle.Fixed
 44:  }
 45:  
 46:  func NewWAL(path string) *WAL {
 47: @@ -103,6 +105,7 @@ func NewWAL(path string) *WAL {
 48:         logger:      log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
 49:         closing:     make(chan struct{}),
 50:         stats:       &WALStatistics{},
 51: +       throttle:    throttle.New(10),
 52:     }
 53:  }
 54:  
 55: @@ -277,6 +280,12 @@ func (l *WAL) LastWriteTime() time.Time {
 56:  }
 57:  
 58:  func (l *WAL) writeToLog(entry WALEntry) (int, error) {
 59: +   // limit how many concurrent encodings can be in flight.  Since we can only
 60: +   // write one at a time to disk, a slow disk can cause the allocations below
 61: +   // to increase quickly.  If we're backed up, wait until others have completed.
 62: +   l.throttle.Take()
 63: +   defer l.throttle.Release()
 64: +
 65:     // encode and compress the entry while we're not locked
 66:     bytes := getBuf(walEncodeBufSize)
 67:     defer putBuf(bytes)
 68: diff --git a/tsdb/store.go b/tsdb/store.go
 69: index 5889577..3076d01 100644
 70: --- a/tsdb/store.go
 71: +++ b/tsdb/store.go
 72: @@ -16,6 +16,7 @@ import (
 73:  
 74:     "github.com/influxdata/influxdb/influxql"
 75:     "github.com/influxdata/influxdb/models"
 76: +   "github.com/influxdata/influxdb/pkg/throttle"
 77:  )
 78:  
 79:  var (
 80: @@ -145,7 +146,7 @@ func (s *Store) loadShards() error {
 81:         err error
 82:     }
 83:  
 84: -   throttle := newthrottle(runtime.GOMAXPROCS(0))
 85: +   t := throttle.New(runtime.GOMAXPROCS(0))
 86:  
 87:     resC := make(chan *res)
 88:     var n int
 89: @@ -171,8 +172,8 @@ func (s *Store) loadShards() error {
 90:             for _, sh := range shards {
 91:                 n++
 92:                 go func(index *DatabaseIndex, db, rp, sh string) {
 93: -                   throttle.take()
 94: -                   defer throttle.release()
 95: +                   t.Take()
 96: +                   defer t.Release()
 97:  
 98:                     start := time.Now()
 99:                     path := filepath.Join(s.path, db, rp, sh)
100: @@ -514,7 +515,7 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
101:         err error
102:     }
103:  
104: -   throttle := newthrottle(runtime.GOMAXPROCS(0))
105: +   t := throttle.New(runtime.GOMAXPROCS(0))
106:  
107:     resC := make(chan res)
108:     var n int
109: @@ -523,8 +524,8 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
110:         n++
111:  
112:         go func(sh *Shard) {
113: -           throttle.take()
114: -           defer throttle.release()
115: +           t.Take()
116: +           defer t.Release()
117:  
118:             if err := fn(sh); err != nil {
119:                 resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
120: @@ -914,20 +915,3 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source)
121:  
122:     return measurements, nil
123:  }
124: -
125: -// throttle is a simple channel based concurrency limiter.  It uses a fixed
126: -// size channel to limit callers from proceeding until there is a value avalable
127: -// in the channel.  If all are in-use, the caller blocks until one is freed.
128: -type throttle chan struct{}
129: -
130: -func newthrottle(limit int) throttle {
131: -   return make(throttle, limit)
132: -}
133: -
134: -func (t throttle) take() {
135: -   t <- struct{}{}
136: -}
137: -
138: -func (t throttle) release() {
139: -   <-t
140: -}
141: 

以 encoding wal 为例,这里增加了一个 throttle.take() 动作,并且在完成操作后,执行 throttle.release() 方法, throttle 里面放了固定数量的值,当调用 take 方法的时候如果 throttle 中如果没有资源,那么调用者就会 block ,直到 throttle 中有了新的资源,也就是其他调用者完成自己动作,并调用 release 方法。

然后将这部分的改动,合并到 v0.10.2 的代码中,并进行测试。结果入下。

Requests      [total, rate]            1500000, 5000.00
Duration      [total, attack, wait]    5m0.000129097s, 4m59.999799858s, 329.239µs
Latencies     [mean, 50, 95, 99, max]  1.227473ms, 230.474µs, 1.237975ms, 33.100274ms, 251.524774ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            59994600, 40.00
Success       [ratio]                  99.99%
Status Codes  [code:count]             204:1499865  0:135

Requests      [total, rate]            1500000, 5000.00
Duration      [total, attack, wait]    5m0.000380718s, 4m59.999799905s, 580.813µs
Latencies     [mean, 50, 95, 99, max]  1.477462ms, 239.262µs, 2.878066ms, 36.939105ms, 328.367048ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            60000000, 40.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:1500000

Requests      [total, rate]            1500000, 5000.00
Duration      [total, attack, wait]    5m0.000769812s, 4m59.999799915s, 969.897µs
Latencies     [mean, 50, 95, 99, max]  1.957877ms, 242.364µs, 3.483565ms, 41.994757ms, 638.448146ms
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            59999880, 40.00
Success       [ratio]                  100.00%
Status Codes  [code:count]             204:1499997  0:3

这已经是跟 v1.0 版本的结果差不多了。

Comments

comments powered by Disqus