微博怎么注册 微博是干什么用的( 三 )



微博怎么注册 微博是干什么用的



在了解转换过程之前还需了解 Pulsar Message 协议 。协议中一部分信息专注于元数据 , message payload 字段中包含实际数据,每个 message 中有多条消息,与 RecordBatch 类似 。单条消息还有自己的元数据 。

微博怎么注册 微博是干什么用的



从 Kafka 请求转换为 Pulsar 消息要做协议转换 。当 entryFormat=kafka(取值为 kafka) 时,主要会设置 publish time、num messages、properties(标识 message 的 entryFormat 类型 , 解码阶段需要),最后 payload 部分就是将整个 RecordBatch 通过 Persistent Topic 组件发送到 Broker 。这里 Pulsar 的客户端无法识别解析 RecordBatch 。
如果要用 KoP 将 Kafka 集群数据迁移到 Pulsar,就需要用到 entryFormat=pulsar 。它会遍历 Kafka 的 RecordBatch 和内部的 Record 信息一一对应设置能够对应的 Message Metadata 和 Single Message Metadata,从而转换为 Pulsar 消息发布到 bookie 。
要解决兼容性问题就要专注于 EntryFormat,根据生产者和消费者的版本情况进行消息的转换 。转换会出现性能损耗,此处注意消费者版本较高时可以将转换过程交给消费者处理来节省性能 。
新特性改进介绍:元数据事件管理器
引入原因一:元数据不一致
微博怎么注册 微博是干什么用的



上图是一个两节点的 KoP 集群,客户端生产的 topic 的分区 0,位于 broker1 中 。客户端的引导地址是 broker1 和 broker2 。现在客户端要发送元数据请求给 broker2,broker2 会响应 metadata response 。在 KoP 之前的处理逻辑中复用了 Topic lookup 机制 , broker2 返回的 response 中不会包含自身的信息,只有分区所在的 broker1 的信息 。然后客户端会向 broker1 分区的 leader 节点发送生产者请求 。
Broker1 挂掉后 , 分区 0 会容错到 Broker2 上 。于是 broker2 成为分区 0 的 owner 。这时客户端向 broker1 发送元数据请求失败,又因为自身没有 broker2 的处理逻辑,所以元数据就无法路由到 broker2 上,出现元数据超时问题 。

微博怎么注册 微博是干什么用的



引入原因二:Group 残留无效 topic 状态
微博怎么注册 微博是干什么用的



如上图,通过 KoP 消费 topic 时,消费的组元数据信息都会记到 coordinator 中,用 ./bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group my-group –describe可以看到描述的消费信息 。当 topic 下线并删除时(./bin/kafka-topics.sh –bootstrap-server localhost:9092 —topic test —delete),再去描述组信息就会返回原数据超时异常 。因为 admin 客户端执行删除命令时,请求到达 KoP Cluster,KoP broker 会通过 PulsarAdmin 删除 topic 。Pulsar Cluster 处理删除请求时 , 会发送到所有分区的 owner broker 上,后者负责删除 topic 信息并移除 topic 。
但因为 Group 元数据信息位于 coordinator 中,其 owner broker 和 topic owner broker 不在一起,所以删除后者时无法清除前者 , 就会出现残留 。问题发生原因是 Group Coordinator 里面有 Group 元数据信息记录了消费分区 , 客户端在获取分区时 commit offset 会记录 Lag 值,Kafka 当前生产的消息位移 。之后获取 topic 信息,但是由于 topic 已经删除,因此会一直返回 onload partition 错误 。命令工具不断重复尝试获取元数据直到 Request Timeout 超时并暴露超时 。

微博怎么注册 微博是干什么用的

推荐阅读