广告计费系统Kafka Stream的实践

流式计算

什么是流式计算

流式计算一般和批量计算做比较,批量计算往往有一个固定的数据集作为输入并计算结果。

而流式计算输入往往是”无界“的、持续输入的、即永远拿不到全量数据去做的计算。同时,流式计算是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。

(PS:批量计算是全量的,那一批数据计算一个结果;流式计算是增量的,数据持续输入,持续计算最新的结果)

流式计算和实时计算

流式计算的实时性较高,有时候容易和实时计算混淆。流式计算对比的对象应该是批量计算,而实时计算对应离线计算,流式计算强调的是计算的方式,而事实计算强调计算结果的响应时间。

为什么要有Kafka Streams

当前已经有非常多的不错流式处理系统,开源的流式处理系统有Spark Streaming、Apache Storm、Apache Flink 应用广泛,既然有那么多成熟流处理系统为什么还需要Kafka Streams呢?

  1. 开发调试,Spark、Storm和FLink 都是流处理框架,而Kafka Streams提供的是一个基于Kafka的流处理类库。框架要求开发者按照特定的方式开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高。而Kafka Streams 作为流处理类库,直接提供具体的类给开发者使用,整个应用的运行方式主要由开发者控制,方便使用、调试和监控。
  2. 运维,这些流处理框架的部署相对复杂,而Kafka Streams作为类库,可以非常方便的嵌入应用程序中。更为重要的是Kafka Streams利用Kafka的分区机制和Consumer的Rebalance机制,使得Kafka Streams可以非常方便的水平扩展。
  3. 易用,Kafka基本上市主流的流式计算系统的标准数据源,大部分公司都部署了Kafka,因此使用Kafka Streams的成本非常低。
  4. 重新计算与并行度,由于Kafka本周提供数据持久化Kafka Streams就提供了重新计算能力,由于Kafka Consumer Rebalance机制,Kafka Streams可以在调整并行度。

Kafka Streams的特点

Kafka Streams是什么

Kafka Streams是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Streams的特点

  • Kafka Streams提供了简单的、轻量级的客户端类库,能被集成到任何Java应用中
  • 除了Kafka之前没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
  • 通过可容错的状态存储实现高效的状态操作(如windowed、join、aggregation)
  • 支持exactly-once语义(只处理一次)
  • 提供记录级的处理能力及实现毫秒级的延迟
  • 提供高层抽象的Stream DSL 和 底层原语的Processor API

Kafka Streams模型

Stream

stream是Kafka Streams中重要的抽象概念,代表一个无界、持续更新的数据集。stream是有序的、可重放的、容错的、不可变数据记录的序列,其中的数据记录为键值对类型。

Processor Topology

processor topology定义了应用程序的流处理计算逻辑,即输入数据如何转化为输出数据,topology 是由 stream processors 连接 streams 以及 shared state stores组成的一个图。

Stream Processor

stream processor 是 processor topology中的节点,代表一个处理步骤,通过接收上游的processor的输入,应用计算逻辑,产生一个或多个校友的processor。

Kafka Streams 提供了两个API来定义流处理器:

  1. Kafka Streams DSL(高级抽象),提供了基础的、通用的数据操作,比如map、filter、join、aggregation。
  2. Processor API(底层原语),定义和连接用户自定义的processor,并且和state store交互,比DSL更灵活。

Stream Processing Application

流处理应用程序,可以定义一个或者多个这样的processor topology,开发人员可以通过高层抽象的Stream DSL 和 底层原语的Processor API 来定义topology。可以单独运行或者在机器运行。

streams-architecture-topology

Time

流处理中一个关键的方面就是时间的概念,以及它如何建模和整合,例如 windowing操作就是基于时间边界定义的。从Kafka 0.10开始,每条记录除了Key和Value之外还增加了timestamp属性。目前Kafka Streams支持三种时间:

  • Event time:事件发生的时间,产生在”客户端“。
  • Processing time:流处理应用处理时的时间,Processing time 可能落后于 Event time几毫秒、几分钟、几小时、或者几天都有可能。
  • Ingestion time:数据存储到Kafka Topic的时间,同样落后于Event time。

Windowing

流式数据是在时间上无界的数据,而聚合操作只能作用在特定的数据集,即有界的数据集上。因此需要通过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种非常常用的设定计算边界的方式。

Kafka Streams支持的Windowing如下:

1.Hopping Time Widow 该窗口定义如下图。它有两个属性,一个是Window size,一个是Advance interval。Window size 指定了窗口的大小,也即每次计算的数据集大小。而Advance interval定义输出的诗句间隔。一个典型的场景:每个5秒钟输出过去1小时内访问网站的PV或者UV。

![Hopping Time Window](../images/Hopping Time Window.gif)

2.Tumbling Time Window 该窗口定义如下图,可以认为它是Hopping Time Window的一种特例。同样有两个属性,一个是Window size,一个是Advance interval。

![Tumbling Time Window](../images/Tumbling Time Window.gif)

3.Sliding Window 该窗口用于两个KStream进行Join计算。该窗口定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。

4.Session Window 该窗口对于Key做Group后的聚合操作中。它需要对Key进行分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。

Kafka Streams架构

streams-architecture-overview

Stream Partitions

Kafka消息层为了进行存储和传输数据进行分区,Kafka Streams为了处理数据而分区。在两种场景下,分区保证了数据的可扩展性、容错性、高性能。

Kafka Streams使用了基于Topic Partition的Partitions和Tasks概念作为并行模型中的逻辑单元,在并发环境Kafka Streams和Kafka有赞紧密的联系:

  • 每个stream partition是顺序的数据记录集合,并且被映射到一个topic partition
  • stream中的每个data record对应topic中的一条消息
  • 数据记录中的keys决定了Kafka和Kafka Streams中的数据分区。

Tasks

流处理应用的processor topology 通过拆分成功多个task来完成扩容。更具体的,Kafka Streams根据输入的 stream partitions 创建固定的task,每个task分配来自stream的一个分区列表。分配结果不会变更,所以tasks是应用程序固定的并行单元。Tasks可以根据分配的分区初始化自己的processor topology。分区和tasks的分配关系不会变更,如果应用实例宕机,实例分配的任务会自动在其他实例上重启并从相同的stream partition开始消费数据。

Threading Model

Kafka Streams允许用户配置应用实例中类库可以用于并行处理的线程数。每个线程可以执行一个或者多个task。下图中一个线程执行了两个stream task:

streams-architecture-threads

启动多个stream线程或者实例,仅仅只是增加了topology,使得它们并行处理不同的分区。值得注意的树这些线程之间不共享状态,无需协调内部线程。Kafka topic partition的分配通过Kafka的下跳棋完成,对Kafka Streams是透明的。

State Store

流式处理中部分操作是无状态的,例如过滤操作(DSL中用filter方法实现)。而部分操作是有状态的,需要记录中间状态,如Window操作和聚合计算。State Store被用来存储中间状态。它可以是一个持久化的Key-Value存储,也可以是内存的HashMap或者数据库。Kafka提供了基于Topic的状态存储。

Topic中存储的数据记录本身是Key-Value形式的,同时Kafka的logcompaction机制可对历史数据compact操作,保留每个Key对应的最后一个Value,从而在保证Key不丢失的前提下,减少总数据量,从而提供查询效率。

构造KTable时,需要指定其State Store Name。默认情况下,该名字也即用于存储该KTable的状态的Topic的名字,遍历KTable的过程,实际就是遍历它对应的State Store,或者说遍历Topic的所有Key,并取每个Key最新值得过程。

streams-architecture-states

Fault Tolerance

Kafka Streams的容错依赖于Kafka自身的容错能力,Kafka 的 partition提供了高可用复制的能力,所以如果将Kafka Streams的数据存储在partition中那自然实现了容错,Kafka Streams中的task的容错实际上是依赖Kafka Consumer 的容错能力,如果task所在机器故障,Kafka Streams 自动的在可用的应用上重启task。

对于每个State Store,保持一个可复制的change log Kafka Topic用于跟踪state的任何变更,这些 change log Topic同样是被分区的。change log的topic 是开启压缩的,所以历史数据会被清除,这避免的数据无线增值。如果一个task所在的机器发生故障,task转移到另一个机器,Kafka Streams将通过 change log 重建Local State Store。整个失败处理过程对用户来说是透明的。

task初始化(或重新初始化)的耗时,通常主要取决于通过重播 change log来恢复 state store 的时间。为了减少恢复时间,用户可以配置他们的应用有一个备用的local states的副本(也就是说,一个state的腐败的完全拷贝)。当发生了一个task迁移,Kafka Streams视图将task分配到一个应用程序的实例上,这个实例上以及存在一个备用的副本最小task初始化的时间消耗。

Duality of Streams and Tables

Table 的简单形式是键值对的集合,如下图:

streams-table-duality-01

流表二元性描述了 streams 和 tables 的关系:

  • Stream as Table:流可以被认为表的变更日志,其中流的每条数据记录都捕获了表的变更。因此可以认为流是变相的表。通过从头到尾重放变更日志可以重建表。我们通常对stream 进行聚合则得到 table 记录。
  • Table as Stream:表可以认为是在某个时间点,流中每个key的最新值的快照。因此表示变相的流。

表不同时间点的变化,可以表示为一个changelog stream:

streams-table-duality-02

由于流表二元性,可以使用相同的流来重新构建原始表:

streams-table-duality-03

Kafka Stream DSL

Kafka Streams DSL 是构建在 Streams Processor API之上的高级抽象,大多数据处理少量的DSL代码即可。与Processor API相比,只有DSL支持:

  • KStream、KTable 和 GlobalKTable 形式的 流与表内置抽象。
  • 具有无状态转换 (如 map、filter) 以及 有状态转换聚合(如 count、reduce )、连接(join) 和 窗口化(session window)。

KStream

KStream是只有Kafka Streams DSL拥有的概念,KStream是对记录流的抽象,其中每个数据记录代表无界数据集中的一个独立的数据。可以认为所有记录都通过Insert only的方式插入到这个数据流里。

KTable

KTable是只有Kafka Streams DSL拥有的概念,KTable是 changelog stream的抽象,其中每个记录代表一个更新。数据记录中的值可以理解为同一key最后一次值的更新。有点类似数据操作 upsert,具有相同key 记录会被覆盖。

Aggregation

具体化操作可应用于KStream和KTable,当聚合发生在KStream上时必须指定窗口,从而限定计算的目标数据集。聚合操作的结果肯定是KTable,因为KTable是可更新的,可以在晚到的数据到来时更新结果KTable(乱序场景)。

需要说明的是,Kafka Streams并不会对晚到的数据都重新计算并更新结果集,而是让用户设置一个retention period,将每个窗口的结果在内存中保留一定时间,该窗口的数据晚到时,直接合并计算并更新KTable。

Join

Kafka Stream 由于包含 KStream 和 KTable两种数据集,因此提供了如下Join计算

  • KTable Join KTable 结果仍为KTable。任意一边更新,结果KTable都会更新。
  • KStream Join KStream 结果为 KStream。必须带时间窗口操作,否则会造成Join操作一直不结束。
  • KStream Join KTable / GlobalTable 结果为 KStream。 只有当KStream中有新数据时,才会触发Join计算并输出结果。

Windowing

windowing 可以使得有相同 key 的数据进行有状态操作,比如 Aggregation 或者 Join。

参考文献

  1. https://docs.confluent.io/platform/current/streams/architecture.html
  2. https://developer.confluent.io/learn-kafka/kafka-streams/
  3. http://www.jasongj.com/kafka/kafka_stream/
  4. http://ifeve.com/%E5%88%9D%E6%8E%A2kafka-streams/
  5. https://www.orchome.com/335
  6. https://cloud.tencent.com/developer/article/1702391
  7. https://docs.confluent.io/platform/current/streams/concepts.html
  8. https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/
0%