提升 influxdb v0.10.2 性能
之前在给 influxdb 做性能测试的时候,得出的结论,v0.10.2 和 v0.13 在大量写入的时候,会占用大量的内存和 cpu,而之前线上之所以 influxdb 突然拒绝服务,长达数秒之久,原来是它直接挂了。既然 v1.0 还没有发布,我就思考能否将 v1.0 在解决性能问题上的修改,直接 apply 到 v0.10.2 上呢。有了这样一个思路之后,我就开始在 v0.13 的版本和 master 的最新提交之间,不断进行二分查找。最后终于被我找到了那次关键的提交。
commit c2370b437b9840363ed3d12638fe0ca0ea5ed296 Author: Jason Wilder <mail@jasonwilder.com> Date: Fri Jul 15 23:26:25 2016 -0600 Limit in-flight wal writes/encodings A slower disk can can cause excessive allocations to occur when writing to the WAL because the slower encoding and compression occurs before taking the write lock. The encoding/compression grabs a large byte slice from a pool and ultimately waits until it can acquire the write lock. This adds a throttle to limit how many inflight WAL writes can be queued up to prevent OOMing the processess with slower disks and heavy writes.
意思是说限制 wal writes/encodings,在拿到写锁之前,wal 已经 encoding 或者 compression 了,而前面这个操作需要预先,allocation 一块较大的内存,所以很可能造成 OOMing 问题。在看下这次改动的地方。
1: diff --git a/pkg/throttle/throttle.go b/pkg/throttle/throttle.go 2: new file mode 100644 3: index 0000000..f29c4a2 4: --- /dev/null 5: +++ b/pkg/throttle/throttle.go 6: @@ -0,0 +1,18 @@ 7: +package throttle 8: + 9: +// Fixed is a simple channel based concurrency limiter. It uses a fixed 10: +// size channel to limit callers from proceeding until there is a value avalable 11: +// in the channel. If all are in-use, the caller blocks until one is freed. 12: +type Fixed chan struct{} 13: + 14: +func New(limit int) Fixed { 15: + return make(Fixed, limit) 16: +} 17: + 18: +func (t Fixed) Take() { 19: + t <- struct{}{} 20: +} 21: + 22: +func (t Fixed) Release() { 23: + <-t 24: +} 25: diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go 26: index 3926f93..d7880cc 100644 27: --- a/tsdb/engine/tsm1/wal.go 28: +++ b/tsdb/engine/tsm1/wal.go 29: @@ -18,6 +18,7 @@ import ( 30: 31: "github.com/golang/snappy" 32: "github.com/influxdata/influxdb/models" 33: + "github.com/influxdata/influxdb/pkg/throttle" 34: ) 35: 36: const ( 37: @@ -90,7 +91,8 @@ type WAL struct { 38: LoggingEnabled bool 39: 40: // statistics for the WAL 41: - stats *WALStatistics 42: + stats *WALStatistics 43: + throttle throttle.Fixed 44: } 45: 46: func NewWAL(path string) *WAL { 47: @@ -103,6 +105,7 @@ func NewWAL(path string) *WAL { 48: logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags), 49: closing: make(chan struct{}), 50: stats: &WALStatistics{}, 51: + throttle: throttle.New(10), 52: } 53: } 54: 55: @@ -277,6 +280,12 @@ func (l *WAL) LastWriteTime() time.Time { 56: } 57: 58: func (l *WAL) writeToLog(entry WALEntry) (int, error) { 59: + // limit how many concurrent encodings can be in flight. Since we can only 60: + // write one at a time to disk, a slow disk can cause the allocations below 61: + // to increase quickly. If we're backed up, wait until others have completed. 62: + l.throttle.Take() 63: + defer l.throttle.Release() 64: + 65: // encode and compress the entry while we're not locked 66: bytes := getBuf(walEncodeBufSize) 67: defer putBuf(bytes) 68: diff --git a/tsdb/store.go b/tsdb/store.go 69: index 5889577..3076d01 100644 70: --- a/tsdb/store.go 71: +++ b/tsdb/store.go 72: @@ -16,6 +16,7 @@ import ( 73: 74: "github.com/influxdata/influxdb/influxql" 75: "github.com/influxdata/influxdb/models" 76: + "github.com/influxdata/influxdb/pkg/throttle" 77: ) 78: 79: var ( 80: @@ -145,7 +146,7 @@ func (s *Store) loadShards() error { 81: err error 82: } 83: 84: - throttle := newthrottle(runtime.GOMAXPROCS(0)) 85: + t := throttle.New(runtime.GOMAXPROCS(0)) 86: 87: resC := make(chan *res) 88: var n int 89: @@ -171,8 +172,8 @@ func (s *Store) loadShards() error { 90: for _, sh := range shards { 91: n++ 92: go func(index *DatabaseIndex, db, rp, sh string) { 93: - throttle.take() 94: - defer throttle.release() 95: + t.Take() 96: + defer t.Release() 97: 98: start := time.Now() 99: path := filepath.Join(s.path, db, rp, sh) 100: @@ -514,7 +515,7 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { 101: err error 102: } 103: 104: - throttle := newthrottle(runtime.GOMAXPROCS(0)) 105: + t := throttle.New(runtime.GOMAXPROCS(0)) 106: 107: resC := make(chan res) 108: var n int 109: @@ -523,8 +524,8 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { 110: n++ 111: 112: go func(sh *Shard) { 113: - throttle.take() 114: - defer throttle.release() 115: + t.Take() 116: + defer t.Release() 117: 118: if err := fn(sh); err != nil { 119: resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)} 120: @@ -914,20 +915,3 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) 121: 122: return measurements, nil 123: } 124: - 125: -// throttle is a simple channel based concurrency limiter. It uses a fixed 126: -// size channel to limit callers from proceeding until there is a value avalable 127: -// in the channel. If all are in-use, the caller blocks until one is freed. 128: -type throttle chan struct{} 129: - 130: -func newthrottle(limit int) throttle { 131: - return make(throttle, limit) 132: -} 133: - 134: -func (t throttle) take() { 135: - t <- struct{}{} 136: -} 137: - 138: -func (t throttle) release() { 139: - <-t 140: -} 141:
以 encoding wal 为例,这里增加了一个 throttle.take()
动作,并且在完成操作后,执行 throttle.release()
方法, throttle
里面放了固定数量的值,当调用 take 方法的时候如果 throttle
中如果没有资源,那么调用者就会 block
,直到 throttle
中有了新的资源,也就是其他调用者完成自己动作,并调用 release
方法。
然后将这部分的改动,合并到 v0.10.2 的代码中,并进行测试。结果入下。
Requests [total, rate] 1500000, 5000.00 Duration [total, attack, wait] 5m0.000129097s, 4m59.999799858s, 329.239µs Latencies [mean, 50, 95, 99, max] 1.227473ms, 230.474µs, 1.237975ms, 33.100274ms, 251.524774ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 59994600, 40.00 Success [ratio] 99.99% Status Codes [code:count] 204:1499865 0:135 Requests [total, rate] 1500000, 5000.00 Duration [total, attack, wait] 5m0.000380718s, 4m59.999799905s, 580.813µs Latencies [mean, 50, 95, 99, max] 1.477462ms, 239.262µs, 2.878066ms, 36.939105ms, 328.367048ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 60000000, 40.00 Success [ratio] 100.00% Status Codes [code:count] 204:1500000 Requests [total, rate] 1500000, 5000.00 Duration [total, attack, wait] 5m0.000769812s, 4m59.999799915s, 969.897µs Latencies [mean, 50, 95, 99, max] 1.957877ms, 242.364µs, 3.483565ms, 41.994757ms, 638.448146ms Bytes In [total, mean] 0, 0.00 Bytes Out [total, mean] 59999880, 40.00 Success [ratio] 100.00% Status Codes [code:count] 204:1499997 0:3
这已经是跟 v1.0 版本的结果差不多了。