Shane Xu's Home

Life is too short for so much sorrow.

InfluxDB负载测试设计

重新看一遍tsm1的DESIGN.md

数据写入

Write throughput is bounded by the time to process the write on the CPU (parsing, sorting, etc..), adding and evicting to the Cache and appending the write to the WAL. The first two items are CPU bound and can be tuned and optimized if they become a bottleneck. The WAL write can be tuned such that in the worst case every write requires at least 2 IOPS (write + fsync) or batched so that multiple writes are queued and fsync’d in sizes matching one or more disk blocks. Performing more work with each IO will improve throughput

Write latency is minimal for the WAL write since there are no seeks. The latency is bounded by the time to complete any write and fsync calls.

数据查询

Query throughput is directly related to how many blocks can be read in a period of time. The index structure contains enough information to determine if one or multiple blocks can be read in a single IO.

Query latency is determine by how long it takes to find and read the relevant blocks. The in-memory index structure contains the offsets and sizes of all blocks for a key. This allows every block to be read in 2 IOPS (seek + read) regardless of position, structure or size of file.

我粗鄙的结论

我一直认为influxdb的瓶颈不在写,而是在于读。

压力测试工具选择

前几天看到ES的测试工具 rally 中用了Actor模型,于是突发奇想,找一个基于Akka的测试工具,于是找到了 gatling 。我的想法是如果底层使用Akka的话, 利用Akka-cluster应该很容易实现分布式测试。然而这个功能在商业版中才有。https://github.com/gatling/gatling/issues/415

Gatling简单的使用方法

使用Zip bundle

https://repo1.maven.org/maven2/io/gatling/highcharts/gatling-charts-highcharts-bundle/2.2.2/gatling-charts-highcharts-bundle-2.2.2-bundle.zip 下载zip包。并解压到某处,然后在 user-files/simulations/computerdatabase/ 目录里面增加test case。 用这种方式,有个弊端,就是我想添加依赖的jar包的话,就只能手动将包放到 lib 下面。

或者使用SBT Plugin

   git clone https://github.com/gatling/gatling-sbt-plugin-demo.git

从 github clone 官方提供的,使用 gatling-sbt 插件的 demo,然后在 src/it/scala 中增加test case就可以了。 在写完test case 之后,运行 sbt gatling-it:testOnly a.b.c(完整类名) 就可以了。

简单的test case

Gatling自己有一套完整的http调用的api,并且压力测试还需依赖于通过这套api获得的请求结果,所以 influxdb-java 的client在这里没有用武之地。但是这个包的完整的influxdb的dto,却是非常有用。于是在工程里添加依赖。

  libraryDependencies += "org.influxdb" % "influxdb-java" % "2.3"

一个简单的query test case

 1:    package org.xusheng.influxdbloadtest
 2: 
 3:    import io.gatling.core.Predef._
 4:    import io.gatling.http.Predef._
 5:    import scala.concurrent.duration._
 6:    
 7:    class InfluxQuerySimulation extends Simulation {
 8:      val httpConf = http
 9:        .baseURL("http://127.0.0.1:8086")
10:    
11:      val h = http("query").get("/query").queryParam("db", "mytest").queryParam("q", "select sum(value) from cpu where time > now() - 1m")
12:    
13:      val scn = scenario("query").exec(h).pause(1 minute)
14:    
15:      setUp(scn.inject(atOnceUsers(1))).protocols(httpConf)
16:    }

influxdb的查询就是一个普通的带query parameter的get请求,所以这里没什么好说的。

一个简单的write test case

 1:    package org.xusheng.influxdbloadtest
 2: 
 3:    import io.gatling.core.Predef._
 4:    import io.gatling.http.Predef._
 5:    import java.util.concurrent.TimeUnit
 6:    import scala.concurrent.duration._
 7:    import org.influxdb.dto._
 8:    
 9:    class InfluxWriteSimulation extends Simulation {
10:      val httpConf = http
11:        .baseURL("http://127.0.0.1:8086")
12:    
13:      val p = Point.measurement("cpu").addField("value", scala.util.Random.nextInt(100)).tag("partner", "hello").time(System.nanoTime(), TimeUnit.NANOSECONDS).build()
14:    
15:      val h = http("write").post("/write").body(StringBody(p.lineProtocol())).queryParam("db", "mytest")
16:    
17:      val scn = scenario("write")
18:        .exec(h).pause(1 second)
19:    
20:      setUp(scn.inject(atOnceUsers(1))).protocols(httpConf)
21:    }

这里调用了influxdb-java中dto下面的Point来build的了一个点,然后调用lineProtocol方法获得文本,发送给influxdb。

一个Query 和 Write 组合的test case

 1:    package org.xusheng.influxdbloadtest
 2:    
 3:    import io.gatling.core.Predef._
 4:    import io.gatling.http.Predef._
 5:    import scala.concurrent.duration._
 6:    
 7:    class InfluxWriteQuerySimulation extends Simulation {
 8:    
 9:      val httpConf = http.baseURL(Generator.url)
10:    
11:      val hw = http("write")
12:        .post("/write")
13:        .body(StringBody { session =>
14:                Generator.genBatchPoints(session.userId, isWriter = true).lineProtocol()
15:              }).queryParam("db", Generator.db)
16:    
17:      val scnw = scenario("write").exec(forever{
18:                                          exec(hw).pace(1 second, 2 seconds)
19:                                        })
20:    
21:      val hr = http("query").
22:        get("/query").
23:        queryParam("db", Generator.db).
24:        queryParam("q", { session =>
25:                     Generator.getQueryStringByUserId(session.userId, isWriter = false)
26:                   })
27:    
28:      val scnr = scenario("query").
29:        exec(forever{
30:               exec(hr).pause{session => Generator.getIntervalByUserId(session.userId, isWriter = false)}
31:             })
32:    
33:      setUp(
34:        scnw.inject(rampUsers(Generator.userCount).over(Generator.duringSeconds seconds)),
35:        scnr.inject(rampUsers(Generator.userCount).over(Generator.duringSeconds seconds))
36:      ).protocols(httpConf)
37:    }

稍微赘言几句,这个test case里面定义了两套动作,一是write,以1到2秒的停顿时间,生成一个包含1到5000个点的BatchPoint,发送write请求到influxdb;二是query,以1分钟一次或是10分钟一次的速度,向influxdb请求1分钟或是10分钟或是30分钟或是1小时或是1天或是7天的数据。这其实是模拟了公司的一个需求。我们想知道的是到底单台influxdb能够容纳多少个用户实例。

Comments

comments powered by Disqus