Kafka Schema Registry工作原理

Schema Registry简介

Schema Registry的由来和解决的问题

Confluent 公司为了能让 Kafka 支持 Avro 序列化,创建了 Kafka Schema Registry 项目,项目地址为 https://github.com/confluentinc/schema-registry 。对于存储大量数据的 kafka 来说,使用 Avro 序列化,可以减少数据的存储空间提高了存储量,减少了序列化时间提高了性能。 Kafka 有多个topic,里面存储了不同种类的数据,每种数据都对应着一个 Avro schema 来描述这种格式。Registry 服务支持方便的管理这些 topic 的schema,它还对外提供了多个 restful 接口,用于存储和查找。

Schema Registry在Kafka生态中的作用

  1. Schema存储与版本管理。Schema Registry为每个Topic的Key和Value提供了架构信息的版本化仓库, Producer和Consumer可以存取不同版本的Schema。
  2. 数据格式验证。在消息进入Kafka之前,Schema Registry会验证消息结构是否符合预定义的Schema。这保证了数据质量。
  3. 客户端Schema信息共享。Producer和Consumer可以通过Schema Registry获取对应Topic的Schema定义,实现schema信息的共享和重用。
  4. 支持schema的兼容性演变。Schema Registry支持schema的兼容性规则设置,允许schema演变而不破坏客户端,比如新增字段。
  5. 统一的数据序列化和反序列化。结合Avro等,Schema Registry提供了从byte到语义消息的转换能力。
  6. 运行时数据解析。Consumer可以利用Schema Registry在运行时解析序列化数据,而不需要了解所有的应用级schema信息。
  7. Schema操作审计。Schema Registry提供了所有schema操作的审计日志,用于 governance、compliance等目的。
  8. 增强对多语言支持。Schema Registry使用标准webhookcallback及Schema定义,支持各种语言的Client。

Schema Registry的工作原理简介

Schema Registry 是Avro Schemas 的一个分布式存储层,它具有以下特点:

  • 为每个注册的schema 提供一个全局唯一ID,分配的ID 保证单调递增,但不一定是连续的。
  • kafka 为Schema Registry 的状态和它包含的schema 提供一个持久的后端
  • Schema Registry 是一个分布式的单主架构,由kafka 或者zookeeper 协调主节点的选举。
  • 生产者在发送含schema 的消息时,会先判断schema 的消息是否在本地内存中,如果不在本地内存中,说明schema 尚未注册到Schema Registry,则需要进行注册,否则,无需注册
  • 消费者在反序列化消息时,会先判断schema 是否在本地内存中,如果不在本地内存中,则需要从Schema Registry 中获取schema,否则,无需获取。

Schema Registry实战应用

Producer集成

  1. Producer在发送消息前,会先向Schema Registry校验消息格式
  2. 通过Schema Registry获取消息对应的最新Schema ID
  3. Producer将Schema ID包含在消息中,并序列化消息
  4. 如果Schema发生变更,Producer可以获取新的Schema
public class SchemaProducer {

    public static void main(String[] args) throws Exception {
        
        String kafkaHost = "xxx.xxx.xxx.xxx:9092";
        String topic = "schema-tutorial";
        String schameFilename = "user.json";
        String registryHost = "http://xxx.xxx.xxx.xxx:8081";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 指定Value的序列化类,KafkaAvroSerializer
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        // 指定 registry 服务的地址
        // 如果 Schema Registry 启动了高可用,那么这儿的配置值可以是多个服务地址,以逗号隔开
        props.put("schema.registry.url", registryHost);
        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

        String key = "Alyssa key";
        Schema schema = new Schema.Parser().parse(new File(schameFilename));
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("name", "Alyssa");
        avroRecord.put("favorite_number", 256);

        // 发送消息
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, key, avroRecord);
        producer.send(record);
        producer.flush();
        producer.close();
    }
}

Kafka Registry 原理

我们知道 Kafka 的消息由 Key 和 Value 组成,这两部分的值可以有不同的数据格式。而这些数据格式都会保存在 Registry 服务端,客户端需要指定数据格式的名称(在 Registry 中叫做 subject),才能获取到。如果我们要获取当前消息 Key 这部分的数据格式,它对于的 subject 名称为 -key,如果要获取 Value 这部分的数据格式,它对应的 subject 名称为 -value(topic 为该消息所在的 topic 名称)。

KafkaProducer 在第一次序列化的时候,会自动向 Registry 服务端注册。服务端保存数据格式后,会返回一个 id 号。KafkaProducer发送消息的时候,需要附带这个 id 号。这样 KafkaConsumer 在读取消息的时候,通过这个 id 号,就可以从 Registry 服务端 获取。

Registry 客户端负责向服务端发送 http 请求,然后会将结果缓存起来,以提高性能。

public class CachedSchemaRegistryClient implements SchemaRegistryClient {
  // Key 为数据格式的名称, 里面的 Value 为 Map类型,它对于的 Key 为数据格式,Value 为对应的 id 号
  private final Map<String, Map<Schema, Integer>> schemaCache;
  // Key 为数据格式的名称,里面的 Value 为 Map类型,它对于的 Key 为 id 号,Value 为对应的数据格式
  // 这个集合比较特殊,当 Key 为 null 时,表示 id 到 数据格式的缓存
  private final Map<String, Map<Integer, Schema>> idCache;
        
  @Override
  public synchronized int register(String subject, Schema schema, int version, int id)
      throws IOException, RestClientException {
    // 从schemaCache查找缓存,如果不存在则初始化空的哈希表
    final Map<Schema, Integer> schemaIdMap =
        schemaCache.computeIfAbsent(subject, k -> new HashMap<>());

    // 获取对应的 id 号
    final Integer cachedId = schemaIdMap.get(schema);
    if (cachedId != null) {
      // 检查 id 号是否有冲突
      if (id >= 0 && id != cachedId) {
        throw new IllegalStateException("Schema already registered with id "
            + cachedId + " instead of input id " + id);
      }
      // 返回缓存的 id 号
      return cachedId;
    }

    if (schemaIdMap.size() >= identityMapCapacity) {
      throw new IllegalStateException("Too many schema objects created for " + subject + "!");
    }
      
    // 如果缓存没有,则向服务端发送 http 请求 
    final int retrievedId = id >= 0
                            ? registerAndGetId(subject, schema, version, id)
                            : registerAndGetId(subject, schema);
    // 缓存结果
    schemaIdMap.put(schema, retrievedId);
    idCache.get(null).put(retrievedId, schema);
    return retrievedId;
  }
}    

Consumer集成

Spring Boot集成案例

@Configuration
public class KafkaConsumerConfig {

  @Bean
  public SchemaRegistryClient schemaRegistryClient() {
    return new CachedSchemaRegistryClient(
      "http://localhost:8081", 100);
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    JsonDeserializer<String> deserializer = new JsonDeserializer<>();
    deserializer.setRemoveTypeHeaders(false);
    deserializer.setSchemaRegistryClient(schemaRegistryClient());
    return new DefaultKafkaConsumerFactory<>(
      consumerConfigs(), 
      new StringDeserializer(),
      deserializer
    );
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }

  @Bean
  public KafkaConsumer<String, String> kafkaConsumer() {
    return new KafkaConsumer<>(consumerConfigs());
  }

}


@Component
public class KafkaConsumer {

  @KafkaListener(topics = "test")
  public void listen(ConsumerRecord<String, String> record) {
    // 获取反序列化的消息
    String message = record.value();
  }

}

与Avro结合使用

// SchemaRegistry配置
@Bean
public SchemaRegistryClient schemaRegistryClient() {
  return new CachedSchemaRegistryClient("http://localhost:8081"); 
}

// Avro反序列化器 
@Bean
public AvroDeserializer avroDeserializer() {
  AvroDeserializer deserializer = new AvroDeserializer(schemaRegistryClient());
  return deserializer;
}

// ConsumerFactory
@Bean
public ConsumerFactory<String, User> consumerFactory() {
  return new DefaultKafkaConsumerFactory(
    consumerConfigs(), 
    new StringDeserializer(),
    avroDeserializer()
  );
}

// Listener容器
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> 
  kafkaListenerContainerFactory() {
  
  ConcurrentKafkaListenerContainerFactory factory = 
    new ConcurrentKafkaListenerContainerFactory();
  
  factory.setConsumerFactory(consumerFactory());

  return factory;
}


// Listener方法
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<String, User> record) {
  User user = record.value(); // 直接获取反序列化的对象
}
0%