一个分布式KV的实现
2020 Oct 28
See all posts
业务中急需一个KV(key-value)中间件,要求如下:
- 性能不能太拉跨,速度不能太低,不奢望能跟redis比,但是也不能差太多
- 数据最好能落盘,断电也没有影响
- 分布式可扩展,这是必须的
那他们为什么不用redis
cluster呢,当然已经在用了,为什么不继续用,只有一个原因——穷,没有大内存,内存吃紧的话,用redis毫无意义。
于是站在巨人的肩膀上,终于造好一个轮子,线上运行至今非常稳定,几个月下来有近1亿个key分在12个shard里,server负载也不高,没有掉过链子。
思路是根据向业务端提供kv client,
客户端api如下所示,与couchbase
的接口习惯保持一致,而且除了支持同步接口外,还支持异步接口,非常实用:
public interface ThorBucket<T> {
T load(String key);
Map<String, T> loads(final Collection<String> keys);
Long ttl(String key);
Boolean set(String key, int expirationInSeconds, T value);
Boolean touch(String key, int expirationInSeconds);
Long incr(String key, long delta, long initialValue, int expirationInSeconds);
Long decr(String key, long delta, long initialValue, int expirationInSeconds);
GoblinFuture<Boolean> asyncSet(String key, int expirationInSeconds, T value);
GoblinFuture<Boolean> asyncTouch(String key, int expirationInSeconds);
GoblinFuture<Long> asyncIncr(String key, long delta, long initialValue, int expirationInSeconds);
GoblinFuture<Long> asyncDecr(String key, long delta, long initialValue, int expirationInSeconds);
}
业务方使用起来超级简单:
1.客户端配置文件
只需要指定zk就行了(实际线线上zk是个集群):
"thor": {
"client": {
},
"registry": {
"zookeeper": "127.0.0.1",
"encoder": "hessian2"
}
}
2. 客户端示例代码(kotlin):
val client = ThorClientBuilder.INSTANCE.getBucket(databaseName, bucketName)
val i = RandomUtils.nextInt(10, 11)
client.set("$bucketName-key-$i", 0, "aaa-$i")
...
bucket有两个维度,通常来说一个业务部门是一个databaseName,具体某个业务是bucketName。
thor client 通过zookeeper服务注册中心查询到thor
server列表,并维护服务路由表,具体某个请求选择其中某个thor
server建立连接,tcp将请求发出去。
这其中使用了公司的公共transport库,里面包含压缩协议,序列化协议等等。
thor server收到请求,会根据databaseName进行分库, 根据bucketName,
memhash(key)进行分表:
@DocumentConnection(configName = "magneto")
@DocumentDatabase(database = "xxx-magneto-{}", dynamic = true) //库名模版
@DocumentCollection(collection = "thor_{}_{}", dynamic = true) //表名模版
class ThorData : Serializable {
companion object {
internal var serialVersionUID = 7067361027867576239L
}
@DocumentId(autoGenerator = DocumentIdAutoGenerator.NONE)
@DocumentField("id")
var key: String? = null
@DocumentField("val")
var value: String? = null
@DocumentField("rev")
var revision: Int? = null
@DocumentField("et")
var expireAt: Date? = null
@DocumentField("ct")
var createAt: Date? = null
@DocumentField("ut")
var updateAt: Date? = null
}
ThorDataDao的分库分表实现:
@Override
protected String calculateDatabase(String template, ThorData document) {
MongoNamespace.checkDatabaseNameValidity(this.databaseName);
return StringUtils.formatMessage(template, this.databaseName);
}
@Override
protected String calculateCollection(String template, ThorData document) {
MongoNamespace.checkCollectionNameValidity(this.bucketName);
PersistenceConnectionSupplier connectionSupplier = PersistenceConnectionSupplier.generate(getClass());
if (connectionSupplier == null) {
connectionSupplier = () -> getMongoDocument().getMongoClient();
}
String mongoClient = connectionSupplier.config();
MongoShardClientPool pool = MongoShardClientPoolManager.getInstance().getMongoShardClientPool(mongoClient);
MongoShardCalculator calculator = getShardCalculator(pool.getShardCount());
int index = calculator.calculate(getDocumentId(document));
return StringUtils.formatMessage(template, this.bucketName, index);
}
protected MongoShardCalculator getShardCalculator(int shardSize) {
MongoShardCalculatorFactory calculatorFactory = MongoShardCalculatorFactory.getInstance();
return calculatorFactory.getCalculator(shardSize);
}
conclusions
- 完全凭借
MongoDB
的分库分表来做扩展,DB操作上还有一层缓存,数据量大的时候毫无压力;
- 写好一个存储非常不容易,这块绕开了这个问题,大大加快了开发上线进度——我一个人两周就开发完成了;
- 用成熟数据库带来的另外一个好处是,借助MongoDB数据库工具能让查询浏览
key-value
变得非常直观容易。
一个分布式KV的实现
2020 Oct 28 See all posts业务中急需一个KV(key-value)中间件,要求如下:
那他们为什么不用redis cluster呢,当然已经在用了,为什么不继续用,只有一个原因——穷,没有大内存,内存吃紧的话,用redis毫无意义。
于是站在巨人的肩膀上,终于造好一个轮子,线上运行至今非常稳定,几个月下来有近1亿个key分在12个shard里,server负载也不高,没有掉过链子。
思路是根据向业务端提供kv client, 客户端api如下所示,与
couchbase
的接口习惯保持一致,而且除了支持同步接口外,还支持异步接口,非常实用:业务方使用起来超级简单:
1.客户端配置文件
只需要指定zk就行了(实际线线上zk是个集群):
2. 客户端示例代码(kotlin):
bucket有两个维度,通常来说一个业务部门是一个databaseName,具体某个业务是bucketName。
thor client 通过zookeeper服务注册中心查询到thor server列表,并维护服务路由表,具体某个请求选择其中某个thor server建立连接,tcp将请求发出去。
这其中使用了公司的公共transport库,里面包含压缩协议,序列化协议等等。
thor server收到请求,会根据databaseName进行分库, 根据bucketName, memhash(key)进行分表:
ThorDataDao的分库分表实现:
conclusions
MongoDB
的分库分表来做扩展,DB操作上还有一层缓存,数据量大的时候毫无压力;key-value
变得非常直观容易。