Kafka Schema Registry工作原理

Schema Registry简介
Schema Registry的由来和解决的问题
Confluent 公司为了能让 Kafka 支持 Avro 序列化,创建了 Kafka Schema Registry 项目,项目地址为 https://github.com/confluentinc/schema-registry 。对于存储大量数据的 kafka 来说,使用 Avro 序列化,可以减少数据的存储空间提高了存储量,减少了序列化时间提高了性能。 Kafka 有多个topic,里面存储了不同种类的数据,每种数据都对应着一个 Avro schema 来描述这种格式。Registry 服务支持方便的管理这些 topic 的schema,它还对外提供了多个 restful 接口,用于存储和查找。
Schema Registry在Kafka生态中的作用
- Schema存储与版本管理。Schema Registry为每个Topic的Key和Value提供了架构信息的版本化仓库, Producer和Consumer可以存取不同版本的Schema。
- 数据格式验证。在消息进入Kafka之前,Schema Registry会验证消息结构是否符合预定义的Schema。这保证了数据质量。
- 客户端Schema信息共享。Producer和Consumer可以通过Schema Registry获取对应Topic的Schema定义,实现schema信息的共享和重用。
- 支持schema的兼容性演变。Schema Registry支持schema的兼容性规则设置,允许schema演变而不破坏客户端,比如新增字段。
- 统一的数据序列化和反序列化。结合Avro等,Schema Registry提供了从byte到语义消息的转换能力。
- 运行时数据解析。Consumer可以利用Schema Registry在运行时解析序列化数据,而不需要了解所有的应用级schema信息。
- Schema操作审计。Schema Registry提供了所有schema操作的审计日志,用于 governance、compliance等目的。
- 增强对多语言支持。Schema Registry使用标准webhookcallback及Schema定义,支持各种语言的Client。
Schema Registry的工作原理简介
Schema Registry 是Avro Schemas 的一个分布式存储层,它具有以下特点:
- 为每个注册的schema 提供一个全局唯一ID,分配的ID 保证单调递增,但不一定是连续的。
- kafka 为Schema Registry 的状态和它包含的schema 提供一个持久的后端
- Schema Registry 是一个分布式的单主架构,由kafka 或者zookeeper 协调主节点的选举。
- 生产者在发送含schema 的消息时,会先判断schema 的消息是否在本地内存中,如果不在本地内存中,说明schema 尚未注册到Schema Registry,则需要进行注册,否则,无需注册
- 消费者在反序列化消息时,会先判断schema 是否在本地内存中,如果不在本地内存中,则需要从Schema Registry 中获取schema,否则,无需获取。
Schema Registry实战应用
Producer集成
- Producer在发送消息前,会先向Schema Registry校验消息格式
- 通过Schema Registry获取消息对应的最新Schema ID
- Producer将Schema ID包含在消息中,并序列化消息
- 如果Schema发生变更,Producer可以获取新的Schema
public class SchemaProducer {
public static void main(String[] args) throws Exception {
String kafkaHost = "xxx.xxx.xxx.xxx:9092";
String topic = "schema-tutorial";
String schameFilename = "user.json";
String registryHost = "http://xxx.xxx.xxx.xxx:8081";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 指定Value的序列化类,KafkaAvroSerializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// 指定 registry 服务的地址
// 如果 Schema Registry 启动了高可用,那么这儿的配置值可以是多个服务地址,以逗号隔开
props.put("schema.registry.url", registryHost);
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
String key = "Alyssa key";
Schema schema = new Schema.Parser().parse(new File(schameFilename));
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "Alyssa");
avroRecord.put("favorite_number", 256);
// 发送消息
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, key, avroRecord);
producer.send(record);
producer.flush();
producer.close();
}
}
Kafka Registry 原理
我们知道 Kafka 的消息由 Key 和 Value 组成,这两部分的值可以有不同的数据格式。而这些数据格式都会保存在 Registry 服务端,客户端需要指定数据格式的名称(在 Registry 中叫做 subject),才能获取到。如果我们要获取当前消息 Key 这部分的数据格式,它对于的 subject 名称为 -key,如果要获取 Value 这部分的数据格式,它对应的 subject 名称为 -value(topic 为该消息所在的 topic 名称)。
KafkaProducer 在第一次序列化的时候,会自动向 Registry 服务端注册。服务端保存数据格式后,会返回一个 id 号。KafkaProducer发送消息的时候,需要附带这个 id 号。这样 KafkaConsumer 在读取消息的时候,通过这个 id 号,就可以从 Registry 服务端 获取。
Registry 客户端负责向服务端发送 http 请求,然后会将结果缓存起来,以提高性能。
public class CachedSchemaRegistryClient implements SchemaRegistryClient {
// Key 为数据格式的名称, 里面的 Value 为 Map类型,它对于的 Key 为数据格式,Value 为对应的 id 号
private final Map<String, Map<Schema, Integer>> schemaCache;
// Key 为数据格式的名称,里面的 Value 为 Map类型,它对于的 Key 为 id 号,Value 为对应的数据格式
// 这个集合比较特殊,当 Key 为 null 时,表示 id 到 数据格式的缓存
private final Map<String, Map<Integer, Schema>> idCache;
@Override
public synchronized int register(String subject, Schema schema, int version, int id)
throws IOException, RestClientException {
// 从schemaCache查找缓存,如果不存在则初始化空的哈希表
final Map<Schema, Integer> schemaIdMap =
schemaCache.computeIfAbsent(subject, k -> new HashMap<>());
// 获取对应的 id 号
final Integer cachedId = schemaIdMap.get(schema);
if (cachedId != null) {
// 检查 id 号是否有冲突
if (id >= 0 && id != cachedId) {
throw new IllegalStateException("Schema already registered with id "
+ cachedId + " instead of input id " + id);
}
// 返回缓存的 id 号
return cachedId;
}
if (schemaIdMap.size() >= identityMapCapacity) {
throw new IllegalStateException("Too many schema objects created for " + subject + "!");
}
// 如果缓存没有,则向服务端发送 http 请求
final int retrievedId = id >= 0
? registerAndGetId(subject, schema, version, id)
: registerAndGetId(subject, schema);
// 缓存结果
schemaIdMap.put(schema, retrievedId);
idCache.get(null).put(retrievedId, schema);
return retrievedId;
}
}
Consumer集成
Spring Boot集成案例
@Configuration
public class KafkaConsumerConfig {
@Bean
public SchemaRegistryClient schemaRegistryClient() {
return new CachedSchemaRegistryClient(
"http://localhost:8081", 100);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
JsonDeserializer<String> deserializer = new JsonDeserializer<>();
deserializer.setRemoveTypeHeaders(false);
deserializer.setSchemaRegistryClient(schemaRegistryClient());
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
return new KafkaConsumer<>(consumerConfigs());
}
}
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<String, String> record) {
// 获取反序列化的消息
String message = record.value();
}
}
与Avro结合使用
// SchemaRegistry配置
@Bean
public SchemaRegistryClient schemaRegistryClient() {
return new CachedSchemaRegistryClient("http://localhost:8081");
}
// Avro反序列化器
@Bean
public AvroDeserializer avroDeserializer() {
AvroDeserializer deserializer = new AvroDeserializer(schemaRegistryClient());
return deserializer;
}
// ConsumerFactory
@Bean
public ConsumerFactory<String, User> consumerFactory() {
return new DefaultKafkaConsumerFactory(
consumerConfigs(),
new StringDeserializer(),
avroDeserializer()
);
}
// Listener容器
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// Listener方法
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<String, User> record) {
User user = record.value(); // 直接获取反序列化的对象
}
