初始Kafka
发布与订阅消息系统
在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性。数据(消息)的发送者(发布者)不会直接把消息发送给接收 者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接收者 (订阅者)订阅它们,以便接收特定类型的消息。发布与订阅系统一般会有一个 broker,也就是发布消息的中心点。
发布与订阅消息系统的大部分应用场景都是从一个简单的消息队列或一个进程间通信开始的。比如电商系统中,包含会员模块、订单模块、商品模块、推荐模块、配送物流模块等,多个模块(子系统)间涉及消息的传递。
最早的应用解决方案就是采用(子系统间)直连的方式,使得很多子系统交错复杂。这种点对点的连接方式,形成网状的连接,弊端很多,不一一赘述。
后来,为了解决子系统间直连交错的问题,出现了队列系统。下图所示的架构包含了 3 个独立的发布与订阅系统。
这种方式比直接使用点对点的连接要好得多,但这里有太多重复的地方。你的公司因此要为数据队列维护多个系统,每个系统又有各自的缺陷和不足。而且,接下来可能会有更多的场景需要用到消息系统。 此时,你真正需要的是一个单一的集中式系统,它可以用来发布通用类型的数据,其规模可以随着公司业务的增长而增长。这时Kafka登场了。
Kafka登场
Kafka就是为了解决上述问题而设计的一款基于发布与订阅的消息系统。它一般被称为 “分布式提交日志”或者“分布式流平台”。文件系统或数据库提交日志用来提供所有事务 的持久记录 , 通过重放这些日志可以重建系统的状态。同样地, Kafka 的数据是按照 一定顺序持久化保存的,可以按需读取 。 此外, Kafka 的数据分布在整个系统里,具备数据故障保护和性能伸缩能力。
消息和批次
Kafka的数据单元被称为消息。如果你在使用 Kafka之前已经有数据库使用经验,那么可 以把消息看成是数据库里的一个“数据行”或一条“记录”。消息由字节数组组成,所以 对于 Kafka来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据, 也就是键(key)。键也是一个字节数组,与消息一样,对于 Kafka来说也没有特殊的含义。 当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致 性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区 。这样可 以保证具有 相同键的消息总是被写到相同的分区上。
为了提高效率,消息被分批次写入 Kafka。 批次就是一组消息,这些消息属于同一个主题 和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传 输可以减少网络开销。不过,这要在时间延迟和吞吐量之间作出权衡;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升 数据的传输和存储能力,但要做更多的计算处理。
主题(topic)和分区(partition)
Kafka 的悄息通过 主题进行分类。主题就好比数据库的表,或者文件系统里的文件夹。主题可以被分为若干个分区 , 一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。下图 所示的主题有 4 个分区,消息被迫加写入每个分区的尾部。 Kaflca通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说, 一个主题可以横跨多个服务器,以此来提供比 单个服务器更强大的性能。
我们通常会使用流这个词来描绘Kafka这类系统对数据。很多时候,人们把一个主题的数据看成一个流,不管它有多少个分区。流是一组从生产者移动到消费者的数据。当我们讨 论流式处理时,一般都是这样描述消息的。 Kaflca Streams、 Apache Samza 和 Storm 这些框 架以实时的方式处理消息,也就是所谓的流式处理。我们可以将流式处理与离线处理进行比较,比如 Hadoop 就是被设计用于在稍后某个时刻处理大量的数据。
生产者和消费者
Kafka 的客户端就是 Kafka 系统的用户,它们被分为两种基本类型 : 生产者和消费者。除此之外,还有其他高级客户端 API——用于数据集成的 Kaflca Connect API 和用于流式处理 的 Kaflca Streams。这些高级客户端 API 使用生产者和消费者作为内部组件,提供了高级的 功能。
生产者创建消息。在其他发布与订阅系统中,生产者可能被称为发布者或写入者。一般情 况下,一个消息会被发布到一个特定的主题(topic)上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生 成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到 同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分 区。下一章将详细介绍生产者。
消费者读取消息。在其他发布与订阅系统中,消费者可能被称为订阅者或读者 。 消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移盘来区 分已经读取过的消息。 偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息 时, Kafka 会把它添加到消息里。在给定的分区里,每个悄息的偏移量都是唯 一 的。消费 者把每个分区最后读取的悄息偏移量保存在 Zookeeper或 Kafka上,如果悄费者关闭或重 启,它的读取状态不会丢失。
消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。 群组保证每个分区只能被一个消费者使用 。下图所示的群组中,有 3 个消费者同时读取一 个主题。其中的两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。消费 者与分区之间的映射通常被称为悄费者对分区的所有权关系 。
通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组 里的其他消费者可以接管失效悄费者的工作。第 4章将详细介绍消费者和悄费者群组。
broker和集群
一个独立的 Kafka服务器被称为 broker。 broker接收来自 生产者的消息,为消息设置偏移 量,并提交消息到磁盘保存。 broker 为消费者提供服务,对读取分区的请求作出响应,返 回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker可以轻松处理数 千个分区以及每秒百万级的消息量。
Broker可以看作是消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群。
broker是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动 从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker和监控 broker. 在集群中, 一个分区从属于一个 broker, i亥 broker被称为分区的首领。一个分区 可以分配给多个 broker,这个时候会发生分区复制(见下图)。这种复制机制为分区提供 了消息冗余,如果有一个 broker失效,其他 broker可以接管领导权。不过,相关的消费者 和生产者都要重新连接到新的首领。
保留消息(在一定期限内)是 Kafka的一个重要特性。 Kafka broker默认的消息保留策略 是这样的:要么保留一段时间(比如 7天),要么保留到消息达到一定大小的字节数(比 如 1GB)。当消息数量达到这些上限时,旧消息就会过期井被删除,所以在任何时刻, 可 用消息的总量都不会超过配置参数所指定的大小。主题可以配置自己的保留策略,可以将 悄息保留到不再使用它们为止。例如,用于跟踪用户活动的数据可能需要保留几天,而应 用程序的度量指标可能只需要保留几个小时。可以通过配置把主题当作 紧凑型日志, 只有 最后一个带有特定键的消息会被保留下来。这种情况对于变更日志类型的数据来说比较适 用,因为人们只关心最后时刻发生的那个变更。
为什么选择 Kafka
多个生产者
Kafka 可以无缝地支持多个生产者,不管客户端在使用单个 主题还是多个主题。所以它很 适合用来从多个前端系统收集数据,并以统一的格式对外提供数据。例如, 一个包含了 多 个微服务的网站,可以为页面视图创建一个单独的主题,所有服务都以相同的消息格式向 该主题写入数据。消费者应用程序会获得统一的页面视图,而无需协调来自不同生产者的 数据流。
多个消费者
除了支持多个生产者外, Kafka也支持多个消费者从一个单独的消息流上读取数据,而且 消费者之间直不影响。这与其他队列系统不同,其他队列系统的消息一旦被一个客户端读 取,其他客户端就无法再读取它。另外,多个消费者可以组成一个群组,它们共享一个消息流,并保证整个群组对每个给定的消息只处理一次。
基于磁盘的数据存储
Kafka不仅支持多个消费者,还允许消费者非实时地读取消息,这要归功于 Kafka的数据 保留特性。?肖息被提交到磁盘,根据设置的保留规则进行保存。每个主题可以设置单独的 保留规则,以便满足不同消费者的需求,各个主题可以保留不同数量的消息。消费者可能 会因为处理速度慢或突发的流量高峰导致无陆及时读取消息,而持久化数据可以保证数据 不会丢失。?肖费者可以在进行应用程序维护时离线一小段时间,而无需担心消息丢失或堵 塞在生产者端。 消费者可以被关闭,但消息会继续保留在 Kafka里。消费者可以从上次中 断的地方继续处理消息。
伸缩性
为了能够轻松处理大量数据, Kafka 从一开始就被设计成一个具有灵活伸缩性的系统。用 户在开发阶段可以先使用单个 broker,再扩展到包含 3 个 broker 的小型开发集群,然后随 着数据盐不断增长,部署到生产环境的集群可能包含上百个 broker。对在线集群进行扩展 丝毫不影响整体系统的可用性。也就是说, 一个包含多个 broker的集群,即使个别 broker 失效,仍然可以持续地为客户提供服务。要提高集群的容错能力,需要配置较高的复制系 数。
高性能
上面提到的所有特性,让 Kafka成为了一个高性能的发布与订阅消息系统。通过横向扩展 生产者、消费者和 broker, Kafka可以轻松处理巨大的消息流。在处理大量数据的同时, 它还能保证亚秒级的消息延迟。
Kafka生产者-向Kafka写入数据
不管是把 Kafka 作为消息队列、消息、总线还是数据存储平台来使用 ,总是需要有一个可以往 Kafka 写入数据的生产者和一个可以从 Kafka读取数据的消费者,或者一个兼具两种角 色的应用程序。
例如,在一个信用卡事务处理系统里,有一个客户端应用程序,它可能是一个在线商店, 每当有支付行为发生时,它负责把事务发送到 Kafka上。另一个应用程序根据规则引擎检 查这个事务,决定是批准还是拒绝。 批准或拒绝的响应消息被写回 Kafka,然后发送给发起事务的在线商店。第三个应用程序从 Kafka上读取事务和审核状态,把它们保存到数据 库, 随后分析师可以对这些结果进行分析,或许还能借此改进规则引擎 。
开发者们可以使用 Kafka 内置的客户端 API开发 Kafka应用程序。
在这一章,我们将从 Kafra生产者的设计和组件讲起,学习如何使用 Kafka生产者。我们将展示如何创建 KafkaProducer和 ProducerRecords对象、如何将记录发送给 Kafka,以及如何处理从 Kafka 返回的错误,然后介绍用干控制生产者行为的重要配置选项,最后深入 探讨如何使用不同的分区方法和序列化器,以及如何自定义序列化器和分区器 。
在下一章,我们将会介绍 Kafra的悄费者客户端,以及如何从 Kafka读取消息。
生产者概览
一个应用程序在很多情况下需要往 Kafka 写入消息 : 记录用户的活动(用于审计和分析 )、 记录度量指标、保存日志、消息、记录智能家电的信息、与其他应用程序进行异步通信、 缓冲即将写入到数据库的数据,等等。
多样的使用场景意味着多样的需求:是否每个消息都很重要?是否允许丢失 一 小部分消息?偶尔出现重复消息是否可以接受?是否有严格的延迟和吞吐量要求?
在之前提到的信用卡事务处理系统里,消息丢失或消息重复是不允许的,可以接受的延迟最大为 500ms,对吞吐量要求较高,我们希望每秒钟可以处理一百万个消息。
保存网站的点击信息是另 一种使用场景。在这个场景里,允许丢失少量的消息或出现少量 的消息重复,延迟可以高一些,只要不影响用户体验就行。换句话说,只要用户点击链接 后可以马上加载页面,那么我们并不介意消息要在几秒钟之后才能到达 Kafka 服务器。 吞 吐量则取决于网站用户使用网站的频度。
不同的使用场景对生产者 API 的使用和配置会有直接的影响。
尽管生产者 API 使用起来很简单 ,但消息的发送过程还是有点复杂的。下图展示 了向Kafka 发送消息的主要步骤。
Kafka 生产者组件图
我们从创建 一个 ProducerRecord 对象开始, ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord对象时,生产者要先把键和 值对象序列化成字节数组,这样它们才能够在网络上传输 。
接下来,数据被传给分区器。如果之前在 ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区 ,那么分区器会根据 ProducerRecord对象的键来选择一个分区 。选好分区以后 ,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回 一 个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败, 就会返回 一个错误 。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
创建Kafka生产者
要往 Kafka写入消息,首先要创建一个生产者对象,井设置一些属性。
下面的代码片段展示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置。
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Kafka生产者有 3个必选的属性
bootstrap.servers
该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的broker地址,生产者会从给定的 broker里查找到其他 broker的信息。不过建议至少要提供两个 broker的信息, 一旦其中一个宕机,生产者仍然能够连接到集群上。
key.serializer
broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把 Java对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些 Java对象转换成字节数组。 key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。 Kafka 客户端默认提供了ByteArraySerializer(这个只做很少的事情)、 StringSerializer和 IntegerSerializer,因此,如果你只使用常见的几种 Java对象类型,那么就没必要实现自己的序列化器 。要注意, key.serializer是必须设置的,就算你打算只发送值内容。
value.serializer
与 key.serializer一样, value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符扇 , 那么需要使用不同的序列化器。
发送消息主要有3种方式:
1、发送并忘记( fire-and-forget):我们把消息发送给服务器,但井不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
2、同步发送:我们使用send()方怯发送消息, 它会返回一个Future对象,调用get()方法进行等待, 就可以知道悄息是否发送成功。
3、异步发送:我们调用 send() 方怯,并指定一个回调函数, 服务器在返回响应时调用该函数。
在下面的几个例子中 , 我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能 发生的异常情况。
本章的所有例子都使用单线程,但其实生产者是可以使用多线程来发送消息的。刚开始的 时候可以使用单个消费者和单个线程。如果需要更高的吞吐量,可以在生产者数量不变的 前提下增加线程数量。如果这样做还不够 , 可以增加生产者数量。
发送消息到Kafka
最简单的同步发送消息方式如下所示 :
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record);
} catch(Exception e) {
e.printStack();
}
生产者的 send() 方住将 ProducerRecord对象作为参数,它需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。
我们使用生产者的 send() 方越发送 ProducerRecord对象。从生产者的架构图里可以看到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。 send() 方法会返回一个包含 RecordMetadata 的 Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录 Twitter 消息日志,或记录不太重要的应用程序日志。
我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException (说明序列化消息失败)、 BufferExhaustedException 或 TimeoutException (说明缓冲区已满),又或者是 InterruptException (说明发送线程被中断)。
同步发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record).get();
} catch(Exception e) {
e.printStack();
}
在这里, producer.send() 方住先返回一个 Future对象,然后调用 Future对象的 get() 方法等待 Kafka 响应。如果服务器返回错误, get()方怯会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata对象,可以用它获取消息的偏移量。如果在发送数据之前或者在发送过程中发生了任何错误 ,比如 broker返回 了一个不允许重发消息的异常或者已经超过了重发的次数 ,那么就会抛出异常。我们只是简单地把异常信息打印出来。
如何处理从Kafka生产者返回的错误
KafkaProducer一般会发生两类错误。其中一类是可重试错误 ,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(noleader)” 错误则可 以通过重新为分区选举首领来解决。 KafkaProducer可以被配置成自动重试,如果在多次重试后仍无能解决问题,应用程序会收到一个重试异常。另一类错误无出通过重试解决 ,比如“消息太大”异常。对于这类错误, KafkaProducer不会进行任何重试,直接抛出异常。
异步发送消息
假设消息在应用程序和 Kafka集群之间一个来回需要 10ms。如果在发送完每个消息后都等待回应,那么发送 100个消息需要 1秒。但如果只发送消息而不等待响应,那么发送100个消息所需要的时间会少很多。大多数时候,我们并不需要等待响应——尽管 Kafka 会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入 “错误消息”文件以便日后分析。
为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持 。下面是使用异步发送消息、回调的一个例子。
生产者的配置
到目前为止 , 我们只介绍了生产者的几个必要配置参数——bootstrap.servers API 以及序列化器。
生产者还有很多可配置的参数,在 Kafka文档里都有说明,它们大部分都有合理的默认值 , 所以没有必要去修改它们 。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明。
1. acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
这个参数对消息丢失的可能性有重要影响。 该参数有如下选项。 • 如果 acks=0, 生产者在成功写入悄息之前不会等待任何来自服务器的响应。也就是说, 如果当中出现了问题 , 导致服务器没有收到消息,那么生产者就无从得知,消息也就丢 失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大 速度发送消息,从而达到很高的吞吐量。
• 如果 acks=1,只要集群的首领节点收到消息,生产者就会收到 一个来自服务器的成功 响应。如果消息无撞到达首领节点(比如首领节点崩愤,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个 没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是 同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future对象 的 get()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用异步回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生 产者在收到服务器响应之前可以发送多少个消息)。
• 如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行(第 5 章将讨论更多的细节)。不过,它的延迟比 acks=1时更高,因为我们要等待不只一个服务器节点接收消息。
2. buffer.memory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果 应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send()方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffe.full 参数 (在0.9.0.0版本里被替换成了max.block.ms,表示在抛出异常之前可以阻塞一段时间)。
3. compression.type
默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、 gzip 或 lz4,它指定了消息被发送给 broker之前使用哪一种压缩算法进行压缩。 snappy 压缩算怯由 Google巳发明, 它占用较少 的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。 gzip压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka发送消息的瓶颈所在。
4. retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 1OOms,不过可以通过 retries.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前, 先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试时间比 Kafka集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办怯通过重试来解决(比如“悄息太大”错误)。一般情 况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。 你只需要处理那些不可重试的错误或重试次数超出上限的情况。
5. batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在放一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,半捕 的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
6. linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。 KafkaProducer 会在批次填满或 linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程, 生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms设置成比0大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次 。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
7. client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
8. max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。 把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回 一个错误 (抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么 broker就会返回 一个错误 。
10. max.block.ms
该参数指定了在调用 send() 方法或使用 parttitionFor() 方能获取元数据时生产者的阻塞 时间。当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方屈就会阻塞。在阻塞时间达到 max.block.ms时,生产者会抛出超时异常。
11 . max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000个消息,每个消息大小为 1KB 。另外, broker对可接收的消息最大值也有自己的限制( message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker拒绝 。
12. receive.buffer.bytes 和 send.buffer.bytes
这两个参数分别指定了 TCP socket接收和发送数据包的缓冲区大小 。 如果它们被设为 -1 , 就使用操作系统的默认值。如果生产者或消费者与 broker处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
顺序保证
Kafka可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息, broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下 , 顺序是非常重要的。如果把retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功, broker会重试写入第一个批次。如果此时第一个批次也写入成功,那 么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是 很关键的,所以不建议把顺序是非常重要的。如果把retries 设为 0。可以把 max.in.flight.requests.per.connection设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量 ,所以只有在 对消息的顺序有严格要求的情况下才能这么做。
分区
ProducerRecord 对象包含了目标主题、键和值。 Kafka 的消息是 一个个 键值对, ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的 null,不过大多数应用程序会用到键。键有两个用途 :可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的悄息将被写到同一个分区。 也就是说,如果一个进程只从一个主题的分区读取数据(第 4章会介绍更多细节),那么具有相 同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需像下面这样创建 ProducerRecord 对象:
如果键值为 null, 井且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列(使用 Kafka 自己的散列算法,即使升级Java版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。这里的关键之处在于 ,同一个键总是被映射到同一个分区上 ,所以在进 行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区 。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。我们将在第 6章讨论 Kafka 的复制功能和可用性。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变 。举个例子,在分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分区读取数据肘,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证 了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上 。 如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
实现自定义分区策略
我们已经讨论了默认分区器的特点,它是使用次数最多的分区器。不过 ,除了散列分区之 外,有时候也需要对数据进行不一样的分区。假设你是一个 B2B 供应商,你有 一 个大客 户,它是手持设备 Banana 的制造商。 Banana 占据了你整体业务 10% 的份额。如果使用默 认的散列分区算怯, Banana 的账号记录将和其他账号记录一起被分配给相同的分区,导致 这个分区比其他分区要大一些。服务器可能因此出现存储空 间不足、处理缓慢等问题。我 们需要给 Banana 分配单独的分区,然后使用散列分区算住处理其他账号 。
下面是一个自定义分区器的例子 :
Kafka消费者-从Kafka读取数据
应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 。 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法。如果不先理解 这些概念,就难以理解如何使用消费者 API。所以我们接下来先解释这些重要的概念,然 后再举几个例子,横示如何使用消费者 API 实现不同的应用程序。
消费者和消费者群组
假设我们有一个应用程序需要从-个 Kafka主题读取消息井验证这些消息,然后再把它们 保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息 井保存结果。过了 一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?如果只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的 主题 写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者 接收主题一部分分区的消息。
假设主题 T1 有 4 个分区,我们创建了消费者 C1 ,它是群组 G1 里唯 一 的消费者,我们用 它订阅主题 T1。消费者 Cl1将收到主题 T1全部 4个分区的消息,如图 4-1 所示。
如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我 假设消费者 C1接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,如图 4-2 所示。
如果群组 G1 有 4 个消费者,那么每个消费者可以分配到 一个分区,如图 4-3 所示。
如果我们往群组里添加更多的消费者,超过主题的分区数量,那么多出的消费者就会被闲置,不会接收到任何消息。
往群组里增加消费者是横向伸缩消费能力的主要方式。 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要性意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上, Kafka 设计的主要目标之一 ,就是要让 Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息, 而不只是其中的 一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka消费者和消费者群组并不会对性能造成负面影响。
在上面的例子里,如果新增一个只包含一个消费者的群组 G2,那么这个消费者将从主题 T1 上接收所有的消息,与群组 G1 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 G1 那样,如图 4-5 所示。总的来说,群组 G2 还是会接收到所有消息,不管有没有其他群组存在。
简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组, 然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。
消费者群组和分区再均衡
我们已经从上一个小节了解到,群组里的消费者共同读取主题的分区。一个新的消费者加 入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时 , 比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者), 不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另 一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存 ,在它重新恢复状态之前会拖慢应用程序。我们将在本章讨论如何进行安全的再均衡,以及如何避免不必要的再均衡。
消费者通过向被指派为 群组协调器的 broker (不同的群组可以有不同的协调器)发送 心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,井停止读取消息,群组协调器(broker)会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。在本章的后续部分,我们将讨论一些用于控制发送心跳频率和会话过期时间的配置参数,以及如何根据实际需要来配置这些参数 。
分配分区是怎样的一个过程
当消费者要加入群组时,它会向群组协调器发送 一 个 JoinGroup 请求。第 一 个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列 表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用 一个实现了 PartitionAssignor接口的类来决定哪些分 区应该被分配给哪个消费者 。
Kafka 内置了两种分配策略,在后面的配置参数小节我们将深入讨论。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群 主知道群组 里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
创建 Kafka消费者
在读取消息之前,需要先创建 一个 KafkaConsumer对象 。 创建 KafkaConsumer 对象与创建 KafkaProducer对象非常相似——把想要传给消费者的属性放在 Properties 对象里。本章 后续部分会深入讨论所有的属性。在这里,我们只需要使用 3个必要的属性: bootstrap.servers、 key.deserializer、 value.deserializer。
下面代码演示了如何创建一个KafkaConsumer对象:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092, broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
deserializer使用指定的类(反序列化器)把字节数组转成 Java对象。
group.id指定了KafkaConsumer 属于哪一个消费者群组。 group.id不是必需的,不过我们现在姑且认为它是必需的。它指定了 KafkaConsumer 属于哪一个消费者群组。创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见。
订阅主题
创建好消费者之后,下一步可以开始订阅主题了。subscribe()方法接受一个主题列表作为参数
consumer.subscribe(Collections.singletonList("customerCountries"));
在这里我们创建了一个包含单个元素的列表,主题的名字叫作“customerCountries”,我们也可以在调用subscribe()方法时传入一个正则表达式,正则表达式可以匹配多个主题如果有人创建了新的主题,并且主题名与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题时很常见的做法。
要订阅所有test相关的主题,可以这样做:consumer.subscribe(“test.*”);
轮询
消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题 ,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据, 开发者只需要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分如下所示 :
轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator, 然后加入群组,接受分配的分区。 如果发生了再均衡,整个过程也是在轮询期间进行的。当然 ,心跳也是从轮询里发迭出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。
线程安全
在同一个群组中,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用Java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上。
消费者的配置
到目前为止,我们学习了如何使用消费者 API,不过只介绍了几个配置属’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,不过有一些参数与消费 者的性能和可用性有很大关系。接下来介绍这些重要的属性。
1. fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。 broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
2. fetch.max.wait.ms
我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms则用于指定 broker的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。 如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms被设 为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回所有可用的数据 , 就看哪个条件先得到满足。
3. max.parition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也 就是说, KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。如果一个主题有 20个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因 为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。 max.parition.fetch.bytes 的值必须比 broker能够接收的最大消息的字节数(通过 max.message.size属 性配置 )大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。 消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况, 可以把 max.parition.fetch.bytes 值改小 ,或者延长会话过期时间。
4. session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者 。该属性与 heartbeat.interval.ms紧密相关。heartbeat.interval.ms 指定了poll()方法向协调器 发送心跳的频 率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以, 一般需要同时修改这两个属性, heartbeat.interval.ms 必须比 session.timeout.ms 小, 一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 ls。 把 session.timeout.ms 值设 得比默认值小,可以更快地检测和恢 复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡 ,不过检测节点崩溃需要更长的时间。
5. auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认值是latest, 意 思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者 启动之 后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从 起始位置读取分区的记录。
6. enable.auto.commit
我们稍后将介绍 几种 不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.mls 属性来控制提交的频率。
7. partition.assignment.strategy
我们知道,分区会被分配给群组里的消费者。 PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。 Kafka 有两个默认的分配策略 。
- Range
该策略会把主题的若干个连续的分区分配给消费者。假设悄费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,井且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和 分区 1,而消费者 C2 分配到这两个主题 的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
- RoundRobin
该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2分配分区,那么消费者 C1 将分到主题 T1 的分区 0和分区 2以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 l 以及主题T2 的分区 0和分区 2。一般 来说,如果所有消费者都订阅相同的主题(这种情况很常见), RoundRobin策略会给所 有消费者分配相同数量 的分区(或最多就差一个分区)。
可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 这个类实现了 Range策略,不过也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定 义策略,在这种情况下 , partition.assignment.strategy 属性的值就是自定义类的名字。
8. client.id
该属性可以是任意字符串 , broker用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
9. max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
10. receive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与 broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有 比较高的延迟和比较低的带宽 。
提交和偏移量
KafkaConsumer(消费者)每次调用 poll()方法,它总是返回由生产者写入 Kafka但还没有被消费者读取过的记录, 我们因 此可以追踪到哪些记录是被群组里的哪个消费者读取的。之前已经讨论过, Kafka 不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的一个独特之处。相反,消 费者可以使用 Kafka来追踪消息在分区里的位置(偏移量)。
我们把更新分区当前位置的操作叫作提交。
那么消费者是如何提交偏移量的呢?消费者往一个 叫作 _consumer_offset 的特殊主题发送 消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果悄费者发生崩溃或者有新 的消费者加入群组,就会触发再均衡,完 成再均衡之后,每个消费者可能分配到新 的分区,而不是之前处理的那个。为了能够继续 之前的工作,消费者需要读取每个分区最后一次提交 的偏移量,然后从偏移量指定的地方 继续处理。
如果提交的偏移量小于客户端处理 的最后一个消息的偏移量 ,那么处于两个偏移量之间的 消息就会被重复处理,如图 4-6所示。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的 消息将会丢失,如图 4-7 所示。
所以,处理偏移量的方式对客户端会有很大的影响。 KafkaConsumer API提供了很多种方式来提交偏移量。
自动提交
最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,那么每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。
不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。 假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。
在使用自动提交时 ,每次调用轮询方怯都会把上一次调用返 回的偏移量提交上去,它并不 知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回 的消息 都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。 一般情况下不会有什么问 题,不过在处理异常或提前退出轮询时要格外小心 。
自动提交虽然方便 , 不过并没有为开发者留有余地来避免重复处理消息。
提交当前偏移量
大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,井在发生再均衡时减少 重复消息的数量。消费者 API提供了另一种提交偏移量的方式 , 开发者可以在必要的时候 提交当前偏移盘,而不是基于时间间隔。
取消自动提交,把 auto.commit.offset 设为 false,让应用程序决定何时提交 偏 移量。使用 commitSync() 提交偏移量最简单也最可靠。这个 API会提交由 poll() 方法返回 的最新偏移量,提交成 功后马上返回,如果提交失败就抛出异常。
要记住, commitSync() 将会提交由 poll() 返回的最新偏移量 , 所以在处理完所有记录后要 确保调用了 commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一 批消息到发生再均衡之间的所有消息都将被重复处理。
下面是我们在处理完最近一批消息后使用 commitSync() 方法提交偏移量的例子。
异步提交
同步提交有一个不足之处,在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡, 会增加重复消息的数量。
这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker的响应。
在成功提交或碰到无怯恢复的错误之前, commitSync() 会一直重试(应用程序也一直阻塞),但是 commitAsync() 不会,这也是 commitAsync() 不好的 一个地方。它之所以不进行重试,是因为在它收到 服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题 ,服务器收不到请求,自然也不会 作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000之后提交成功。这个时 候如果发生再均衡,就会出现重复消息。
我们之所以提到这个问题的复杂性和提交顺序的重要性,是因为 commitAsync()也支持回 调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标, 不 过如果你要用它来进行重试, 一定要注意提交的顺序。
重试异步提交
我们可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏 移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调 的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。
同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败 是 因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或 再均衡前的最后一次提交,就要确保能够提交成功。
因此,在消费者关闭前一般会组合使用 commitAsync()和 commitSync()。它们的工作原理如下(后面讲到再均衡监听器时,我们会讨论如何在发生再均衡前提交偏移量):
提交特定的偏移量
提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交出怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync()或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。
幸运的是,消费者 API 允许在调用 commitSync()和 commitAsync()方法时传进去希望提交 的分区和偏移量的 map。假设你处理了半个批次的消息, 最后一个来自主题“customers” 分区 3 的消息的偏移量是 5000, 你可以调用 commitSync() 方法来提交它。不过,因为消费者可能不只读取一个分区, 你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移 量 的提交会让代码变复杂。
下面是提交特定偏移量的例子 :
再均衡监听器
在提交偏移量一节中提到过,消费者在退出和进行分区再均衡之前,会做一些清理工作。
你会在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费 者准备了 一 个缓冲区用于处理偶发的事件,那么在失去分区所有权之前, 需要处理在缓冲 区累积下来的记录。你可能还需要关闭文件句柄、数据库连接等。
在为消费者分配新分区或移除旧分区时,可以通过消费者 API执行 一 些应用程序代码,在调用 subscribe()方法时传进去一个ConsumerRebalancelistener实例就可以了。 ConsumerRebalancelistener有两个需要实现的方法。
(1) public void onPartitionsRevoked(Collection partitions)方法会在 再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接 管分区 的消费者就知道该从哪里开始读取了。
(2) public void onPartitionsAssigned(Collection partitions)方法会在 重新分配分区之后和消费者开始读取消息之前被调用。
下面的例子将演示如何在失去分区所有权之前通过 onPartitionsRevoked()方法来提交偏移量。在下一节,我们会演示另一个同时使用了 onPartitionsAssigned()方法的例子。
从特定偏移量处开始处理记录
到目前为止,我们知道了如何使用 poll() 方法从各个分区的最新偏移量处开始处理消息。 不过,有时候我们也需要从特定的偏移量处开始读取消息。
如果你想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息, 可以使 用 seekToBeginning(Collection tp) 和 seekToEnd(Collection tp) 这两个方法。
不过, Kafka也为我们提供了用 于查找特定偏移量的 API。 它有很多用途,比如向 后回退 几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能 够向前跳过若干个消息)。在使用 Kafka 以外的系统来存储偏移量时,它将给我们 带来更 大的惊喜。
试想一下这样的场景:应用程序从 Kafka读取事件(可能是网站的用户点击事件流 ),对 它们进行处理(可能是使用自动程序清理点击操作井添加会话信息),然后把结果保 存到 数据库、 NoSQL 存储引擎或 Hadoop。假设我们真的不想丢失任何数据,也不想在数据库 里多次保存相同的结果。
这种情况下,消费者的代码可能是这样的 :
在这个例子里,每处理一条记录就提交一次偏移量。尽管如此, 在记录被保存到数据库之后以及偏移量被提交之前 ,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里就会出现重复记录。
如果保存记录和偏移量可以在一个原子操作里完成,就可以避免出现上述情况。记录和偏 移量要么 都被成功提交,要么都不提交。如果记录是保存在数据库里而偏移量是提交到 Kafka 上,那么就无法实现原子操作。
不过 ,如果在同一个事务里把记录和偏移量都写到数据库里会怎样呢?那么我们就会知道 记录和偏移量要么都成功提交,要么都没有,然后重新处理记录。
现在的问题是:如果偏移量是保存在数据库里而不是 Kafka里,那么消费者在得到新分区 时怎么知道该从哪里开始读取?这个时候可以使用 seek() 方法。在消费者启动或分配到新 分区时 ,可以使用 seek()方法查找保存在数据库里的偏移量。
下面的例子大致说明了如何使用这个 API。 使用 ConsumerRebalancelistener和 seek() 方 战确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。
通过把偏移量和记录保存到同 一个外部系统来实现单次语义可以有很多种方式,不过它们 都需要结合使用 ConsumerRebalancelistener和 seek() 方法来确保能够及时保存偏移量, 井保证消费者总是能够从正确的位置开始读取消息。
如何退出
在之前讨论轮询时就说过,不需要担心消费者会在一个无限循环里轮询消息,我们会告诉消费者如何优雅地退出循环。
如果确定要退出循环,需要通过另一个线程调用 consumer.wakeup()方法。如果循环运行 在主线程里,可以在 ShutdownHook里调用该方法。要记住, consumer.wakeup() 是消费者 唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup()可以退出 poll(), 并抛出 WakeupException异常,或者如果调用 cconsumer.wakeup() 时线程没有等待轮询, 那 么异常将在下一轮调用 poll()时抛出。我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。不过, 在退出线程之前调用 consumer.close()是很有必要的, 它 会提交任何还没有提交的东西 , 并向群组协调器(broker)发送消息,告知自己要离开群组,接下来 就会触发再均衡 ,而不需要等待会话超时。
下面是运行在主线程上的消费者退出线程的代码 。