微博怎么注册 微博是干什么用的( 二 )




通过 KoP 协议可以落地 Pulsar 并原生支持新浪现有的 Kafka 客户端,也可以解决新浪 Kafka 团队在 Kafka 上的运维痛点 。于是团队开始调研和实践 KoP 组件 。在此过程中,团队也遇到了一些问题 , 其中一个主要挑战就是 KoP 低版本兼容性问题 。
部署问题与解决方案
KoP 低版本兼容性问题
新浪 Kafka 集群中一些较重要的集群仍在使用较老的 Kafka 版本(如 0.10),因此在调研与实践中需要兼容较老版本的客户端 。KoP 只支持 1.0 及以后的版本 。经过总结,团队发现了以下细节问题并给出对应解决方案 。
低版本认证不兼容
客户端需要通过认证才可以访问新浪的认证集群,与服务端交互 。在 Kafka 1.0 版本之前,客户端与服务端的认证交互是通过 V0 版本的 SaslHandshakeRequest 请求完成的,之后的 token 信息由 SASL tokens(不需要 Kafka request headers)包装,这是一些不透明的数据包 。所以团队需要在 KoP 中手动处理这些数据包才能完成认证工作 。
在 Kafka 1.0 版本之后,认证交互通过 V1 版本的 SaslHandshakeRequest 请求完成 , token 信息则由 SaslAuthenticateRequest 请求封装 。KoP 处理时会直接解析 token 的协议头 。KoP 的低版本认证不兼容问题主要出现在 token 信息这个层面 , 团队需要通过重构来避免 KoP 直接解析令牌的协议头 , 从而顺利处理旧版本的不透明数据包 。详细代码参见 GitHub 。(https://github.com/streamnative/kop/pull/676) 。
日志协议兼容性问题

微博怎么注册 微博是干什么用的



以上是 Kafka 消息协议的几个版本示意,从左至右分别为 V0、V1、V2 。Kafka 0.10 版本之前使用 V0 版消息协议,0.10 版本改用 V1 版,0.11 之后改用 V2 版 。V1 版本相比 V0 版本新增了时间戳;V2 版本改动较大,从 message set 变成了 RecordBatch,后者内部还封装了很多 Record 。上图中各个方框内都是协议中的关键字段 。V2 版本开始内部消息都使用相对位移,RecordBatch 的元数据部分只需放置起始的绝对位移 。
于是不同版本之间生产消费时就会存在日志协议兼容性问题 。例如一个高版本的生产者生产消息后,低版本的消费者是无法解析新版日志协议的 , 自然只会报错而无法消费 。为此需要引入跨版本消息转换功能,才能让低版本读取高版本的消息 。但如果生产者是低版本,消费者是高版本,由于协议是向下兼容的,所以数据消费不会存在问题,不需要转换 。
那么 KoP 是如何处理生产者请求的呢?Kafka 客户端发来生产者请求时 , KoP 解析请求后,Handler 线程会调用 ReplicaManager 主键,追加 Kafka 的 Records 。这个主键与 Kafka 中的副本管理器是对应的,做了映射 。
io.streamnative.pulsar.handlers.kop.storage.ReplicaManager#appendRecords复制代码
每个分区对应一个 PartitionLog,映射了 Kafka 里面的 loggingScala 类对应,每一个 KoP broker 中都有一个 PartitionLogManager 来管理 PartitionLog 。要将 Kafka Records 处理为消息写入 Bookie,这里的问题就是如何从 Records 编码成 Messages 。
io.streamnative.pulsar.handlers.kop.storage.PartitionLog#appendRecords复制代码

微博怎么注册 微博是干什么用的



PartitionLog 在追加 Kafka 的 Records 时,会执行 EntryFormatter 的 encode 过程 io.streamnative.pulsar.handlers.kop.format.EntryFormatter#encode 。编码之后会通过 Pulsar broker 的 Persistenttopic 组件 org.apache.pulsar.broker.service.persistent.PersistentTopic#publishMessage来 publishMessage 。在 EntryFormat 过程中将 Kafka Records 转换为 Pulsar 消息 。然后用存储层 ManagedLedger 将消息发布为 Bookie 可识别的 entry 写入 Bookie 。这里的关键就是 EntryFormat 编码过程 。下图引据了 entryFormat 的介绍,可以看到其取值可选 kafka、mixed_kafka 和 pulsar , 默认为 Pulsar 。

推荐阅读