一个分布式KV的实现

2020 Oct 28 See all posts


业务中急需一个KV(key-value)中间件,要求如下:

那他们为什么不用redis cluster呢,当然已经在用了,为什么不继续用,只有一个原因——穷,没有大内存,内存吃紧的话,用redis毫无意义。

于是站在巨人的肩膀上,终于造好一个轮子,线上运行至今非常稳定,几个月下来有近1亿个key分在12个shard里,server负载也不高,没有掉过链子。

思路是根据向业务端提供kv client, 客户端api如下所示,与couchbase的接口习惯保持一致,而且除了支持同步接口外,还支持异步接口,非常实用:

public interface ThorBucket<T> {

  T load(String key);

  Map<String, T> loads(final Collection<String> keys);

  Long ttl(String key);

  Boolean set(String key, int expirationInSeconds, T value);

  Boolean touch(String key, int expirationInSeconds);

  Long incr(String key, long delta, long initialValue, int expirationInSeconds);

  Long decr(String key, long delta, long initialValue, int expirationInSeconds);

  GoblinFuture<Boolean> asyncSet(String key, int expirationInSeconds, T value);

  GoblinFuture<Boolean> asyncTouch(String key, int expirationInSeconds);

  GoblinFuture<Long> asyncIncr(String key, long delta, long initialValue, int expirationInSeconds);

  GoblinFuture<Long> asyncDecr(String key, long delta, long initialValue, int expirationInSeconds);
}

业务方使用起来超级简单:

1.客户端配置文件

只需要指定zk就行了(实际线线上zk是个集群):

 "thor": {
    "client": {
    },
    "registry": {
      "zookeeper": "127.0.0.1",
      "encoder": "hessian2"
    }
  }

2. 客户端示例代码(kotlin):

    val client = ThorClientBuilder.INSTANCE.getBucket(databaseName, bucketName)
    val i = RandomUtils.nextInt(10, 11)
    client.set("$bucketName-key-$i", 0, "aaa-$i")
    ...

bucket有两个维度,通常来说一个业务部门是一个databaseName,具体某个业务是bucketName。

thor client 通过zookeeper服务注册中心查询到thor server列表,并维护服务路由表,具体某个请求选择其中某个thor server建立连接,tcp将请求发出去。

这其中使用了公司的公共transport库,里面包含压缩协议,序列化协议等等。

thor server收到请求,会根据databaseName进行分库, 根据bucketName, memhash(key)进行分表:

@DocumentConnection(configName = "magneto")
@DocumentDatabase(database = "xxx-magneto-{}", dynamic = true)   //库名模版
@DocumentCollection(collection = "thor_{}_{}", dynamic = true)   //表名模版
class ThorData : Serializable {
  companion object {
    internal var serialVersionUID = 7067361027867576239L
  }

  @DocumentId(autoGenerator = DocumentIdAutoGenerator.NONE)
  @DocumentField("id")
  var key: String? = null

  @DocumentField("val")
  var value: String? = null

  @DocumentField("rev")
  var revision: Int? = null

  @DocumentField("et")
  var expireAt: Date? = null

  @DocumentField("ct")
  var createAt: Date? = null

  @DocumentField("ut")
  var updateAt: Date? = null
}

ThorDataDao的分库分表实现:

@Override
  protected String calculateDatabase(String template, ThorData document) {
    MongoNamespace.checkDatabaseNameValidity(this.databaseName);
    return StringUtils.formatMessage(template, this.databaseName);
  }

  @Override
  protected String calculateCollection(String template, ThorData document) {
    MongoNamespace.checkCollectionNameValidity(this.bucketName);
    PersistenceConnectionSupplier connectionSupplier = PersistenceConnectionSupplier.generate(getClass());
    if (connectionSupplier == null) {
      connectionSupplier = () -> getMongoDocument().getMongoClient();
    }
    String mongoClient = connectionSupplier.config();
    MongoShardClientPool pool = MongoShardClientPoolManager.getInstance().getMongoShardClientPool(mongoClient);
    MongoShardCalculator calculator = getShardCalculator(pool.getShardCount());
    int index = calculator.calculate(getDocumentId(document));
    return StringUtils.formatMessage(template, this.bucketName, index);
  }
  
  protected MongoShardCalculator getShardCalculator(int shardSize) {
    MongoShardCalculatorFactory calculatorFactory = MongoShardCalculatorFactory.getInstance();
    return calculatorFactory.getCalculator(shardSize);
  }  

conclusions

Back to top