Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

Yveltals Blog

Protobuf vs FlatBuffers

FlatBuffers

FlatBuffers在序列化时计算了各字段在数据体的偏移量,并存储在数据体中。因此,反序列化时,先读取字段的偏移量再读取数据即可。因为反序列化过程没有内存拷贝、数据解码等耗时操作,所以速度非常快,但是数据量比原数据有所增加。此外,生成的代码量较少,CPU占用较低

场景:延迟和性能要求高,特别是在数据不需要全部加载到内存中的场景

Protobuf

压缩后的数据包更小,但序列化与反序列化都比较重度,生成的代码量较大,CPU占用较高,内存占用较多。语言支持更广,解决方案更广泛、成熟

场景:适用于需要高效传输大量数据的分布式应用,如网络通信和数据存储

技巧:消息的字段数量不超过15、正数使用uint而非int、尽量控制int32或int64型数据在0~127

Proto 文件中定义字段的顺序与最终编码结果的字段顺序无关,两者有可能相同也可能不同。当消息被编码时,Protobuf 无法保证消息的顺序,消息的顺序可能随着版本或者不同的实现而变化。任何 Protobuf 的实现都应该保证字段以任意顺序编码的结果都能被读取。

  • 序列化后的消息字段顺序是不稳定的。
  • 对同一段字节流进行解码,不同实现或版本的 Protobuf 解码得到的结果不一定完全相同(bytes 层面)。只能保证相同版本相同实现的 Protobuf 对同一段字节流多次解码得到的结果相同。
  • 假设有一条消息foo,以下关系可能不成立:
1
2
3
4
foo.SerializeAsString() == foo.SerializeAsString()
Hash(foo.SerializeAsString()) == Hash(foo.SerializeAsString())
CRC(foo.SerializeAsString()) == CRC(foo.SerializeAsString())
FingerPrint(foo.SerializeAsString()) == FingerPrint(foo.SerializeAsString())
  • 假设有两条逻辑上相等消息,但是序列化之后的内容(bytes 层面)不相同,部分可能的原因有:
    • 其中一条消息可能使用了较老版本的 protobuf,不能处理某些类型的字段,设为 unknwon。
    • 使用了不同语言实现的 Protobuf,并且以不同的顺序编码字段。
    • 消息中的字段使用了不稳定的算法进行序列化。
    • 某条消息中有 bytes 类型的字段,用于储存另一条消息使用 Protobuf 序列化的结果,而这个 bytes 使用了不同的 Protobuf 进行序列化。
    • 使用了新版本的 Protobuf,序列化实现不同。
    • 消息字段顺序不同。

RPC

GRPC

gRPC是一种高性能 RPC 框架。使用Protocol Buffers作为二进制序列化,并使用HTTP/2协议进行传输。gRPC基于服务的思想:定义一个服务,描述这个服务的方法以及入参出参,服务器端有这个服务的具体实现,客户端保有一个存根,提供与服务端相同的服务。

  • 基于 ProtoBuf 接口定义语言:它可以定义服务的方法和消息类型,并生成各种语言的代码桩,实现跨平台。
  • HTTP/2作为底层传输协议:支持二进制传输效率高、双向流、消息头压缩、单 TCP 的多路复用、服务端推送等。
  • 支持流式数据:gRPC支持流式传输,即服务可以持续发送数据,而客户端可以持续接收数据。这在传输大量数据时非常有用。
  • gRPC同时支持同步调用和异步调用,同步RPC调用时会一直阻塞直到服务端处理完成返回结果,异步RPC是客户端调用服务端时不等待服务段处理完成返回,而是服务端处理完成后主动回调客户端告诉客户端处理完成

gRPC与HTTP/2的关系:HTTP/2为长连接、实时的通信流提供了基础。gRPC建立在这个基础之上,具有连接池、健康语义、高效使用数据帧和多路复用以及KeepAlive,gRPC的通讯基石就是HTTP/2

基于http2协议的特性:gRPC允许定义如下四类服务方法

  1. 一元RPC:客户端发送一次请求,等待服务端响应结构,会话结束,就像一次普通的函数调用这样简单
  2. 服务端流式RPC:客户端发起一起请求,服务端会返回一个流,客户端会从流中读取一系列消息,直到没有结果为止
  3. 客户端流式RPC:客户端提供一个数据流并写入消息发给服务端,一旦客户端发送完毕,就等待服务器读取这些消息并返回应答
  4. 双向流式RPC:客户端和服务端都一个数据流,都可以通过各自的流进行读写数据,这两个流是相互独立的,客户端和服务端都可以按其希望的任意顺序独写

名称解析器和负载平衡器:解析器将名称转换为地址,然后将这些地址交给负载均衡器。负载均衡器负责从这些地址创建连接,并在连接之间对rpc进行负载均衡。

当连接失败时,负载均衡器将开始使用最后已知的地址列表重新连接。同时,解析器将开始尝试重新解析主机名列表,告知负载均衡器。

对于失效连接:

  • 对方主动关闭、超时计时器情况:TCP的 FIN 语义即可完成 HTTP2/GRPC 连接的关闭
  • 端点死亡、挂起而不通知客户端:TCP得长时间重试,gRPC 使用 HTTP2 语义中的 PING 帧绕过流量控制,判断连接是否有效

BRPC

特点

  1. 性能好,M:N (bthread:pthread)高效轻量的线程模型
  2. 自带性能监控和分析功能:bvar,一般的RPC框架没有提供热点分析、延时统计的工具,brpc通过自带的 bvar 能够监控并采集到多种性能相关的数据,便于快速确定热点、定位瓶颈。

Bthread

线程模型:1:1,一个内核线程里只有一个用户线程。利于调度到其他核上,多核扩展性好。缺点:线程锁、线程调度到另一个CPU时,data要从L1 cache同步到其他cpu,锁住总线。

协程模型:n:1,n个用户线程跑在一个CPU上。无多线程竞争,数据的locality好,但难以扩展到多核,一个线程阻塞会阻塞整个内核线程。

Bthread 通过实现上层 M:N 线程绑定减小了线程间的来回调度,减小了cache buncing;通过内部的多个队列和 stealing worker 机制,保证了整体的并发度QPS*latency最大;

Bthread的机制核心是 work stealing,即 pthread 之间可以偷bthread来执行。每个bthread两种调度方式,既可以只在某一个内核线程里调度(locality不错),也可以在该bthread被卡住时,允许被偷到其他内核线程去运行,只要有空闲的worker(pthread)在,就不会有bthread被阻塞,保证了多核在任意时刻只要有bthread被创建出来就可以去跑任务。

内存管理

BRPC通过IOBuf(非连续零拷贝缓存)方式减少数据传递处理过程中的拷贝,提高了数据传输效率。此外,各种ThreadLocal的ResourcePool、Object Pool等优化了定长的短生命周期的申请和释放,减少了内存分配和释放的开销。

执行队列

BRPC使用多生产者单消费者无锁队列作为执行队列,实现了多生产者单消费者之间的的高效通信。这种队列设计避免了锁竞争和线程同步的问题,提高了系统的吞吐量和并发性能。在实现单TCP连接复用、高效发送-接收数据等方面发挥了很重要的作用。


StreamLoader

1
2
3
4
class CompoundLoader : StreamLoaderBase {
map<int32, vector<PerDbData>> table_db_map_
map<string, LoaderProcessor*> loaders; // hive, kafka, btq...
}

退出方式:

  1. per_db->stop = true,Processor::ThreadWork 循环中检测退出
  2. StreamLoaderBase::ThreadWorkBase 退出,此时所有 ThreadWork 线程已 join
  3. StreamLoaderBase 唤醒 CondVar,通过 BeforeUnsubscribe() 停止consumer消费(线程还在写数据,等写完一条通过flag退出了,才唤醒cv,执行后续 unsubscribe)

关于offset:

  • 框架由db传入,若db没记录则从 info_file 获取
  • 框架定期调用 GetSubscribeInfo 将 offset 持久化到文件中

StreamLoaderBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void Subscribe() {
SetOptions()
AfterSubscribe() {
loaders->AfterSubscribe()
}
work_threads.emplace_back(&StreamLoaderBase::ThreadWorkBase)
}
void Unsubscribe() {
loaders->BeforeUnsubscribe()
}
void ThreadWorkBase() {
CompoundLoader::ThreadWork() {
threads.emplace_back(&LoaderProcessor::ThreadWork)
threads.join();
}
cv_.SignalAll();
}

void PauseSubscribe() { per_db.pause = true; }
void ResumeSubscribe() {per_db.pause = false; per_db.pause_notifier_.Notify(); }
bool LagNearLatest() { return Processor::PerDbLagNearLatest(); }

Kafka Processor

工具函数

1
2
3
4
5
6
7
8
9
// 调 Kafka client,  根据 timestamp 查找offset (-1 latest, -2 earliset)
bool QueryOffset(shared_ptr<RawConsumer> &consumer, string &kafka_cluster_id,
string &kafka_topic, int partition, int64_t timestamp, int64_t *kafka_offset);

// last_offset 最后消费到的offset (多个取最早), latest_offset Kafka中最新的 offset
void GetOffset(PerDbData *per_db, int64_t *last_offset, int64_t *latest_offset);

// 是否丢数据:当前 offset < kafka earliset offset
bool IsOffsetGapExcessive()

功能函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void AfterSubscribe(ExternStateDB *db, PerDbData *per_db) {
// 计算 offset, 获取kafka client consumer开始消费
offset = max(earliset_offset, to_consume_offset);
consumer = GetRawConsumerByLogicalTopicId();
consumer->Start(cluster_id, topic_name, partition, offset);
// 记录当前db消费offset
per_db->topic_last_offset = offset;
// 过于落后则 remove DB
}

// consumer->Stop() 释放kafka client
void BeforeUnsubscribe(ExternStateDB *db, PerDbData *per_db);

// max_latest_offset - last_offset > FLAG ?
bool PerDbLagNearLatest(ExternStateDB *db, PerDbData *per_db, int max_lag_offset);

// need sync (ckpt)?
void PerDbOffsetExpired(ExternStateDB *db, PerDbData *per_db,
const std::unordered_map<std::string, std::string> &offset, bool *need_sync);

void ThreadWork(ExternStateDB *db, PerDbData *per_db, vector<PerDbData> *table_dbs, int thread_id) {
while (true) {
if (per_db->stop) break;
if (per_db->pause) wait(FLAGS_kafka_pause_s);
pre_db->topic_latest_offsets = QueryOffset() // kafka latest offset
msg = per_db->consumer->Consume()
per_db->topic_last_offset = msg->offset(); // consume offset
per_db->db->StreamLoaderPut(msg->payload(), msg->len(), topic, msg->timestamp(), msg->offset());
}
}

Hive Processor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ThreadWork(ExternStateDB *db, PerDbData *per_db, vector<PerDbData> *table_dbs, int thread_id) {
while (true) {
for (hdfs_path : paths) {
read_paths = GetReadPath(hdfs_path)
for (file_path : read_paths) {
file = Open(file_path)
buffer_file = BufferedFile(file)
per_db->db->StreamLoaderFileStart()
StreamLoaderPutByBlocks()
per_db->db->StreamLoaderFileEnd()
}
}
}
}
void StreamLoaderPutByBlocks(BufferedFile &buffer_file,) {
while (true) {
string block = buffer_file.ReadLine()
per_db->db->StreamLoaderPut(block.data(), block.size(), ...)
}
}