第六章Kafka拦截器
Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。
拦截器(Interceptor)是在kafka 0.10.0.0引入的,主要用来实现client端的一些定制化控制逻辑。
6.1生产者拦截器
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会
对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor
按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是
org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
1. configure(configs) 获取配置信息和初始化数据时调用。
2. onSend(ProducerRecord):该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
3. onAcknowledgement(RecordMetadata, Exception):该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用,并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
4. close:关闭 interceptor,主要用于执行一些资源清理工作。
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
生产者拦截器的接口是org.apache.kafka.clients.producer.ProducerInterceptor,它包含了以下方法:
1. void configure(Map configs);
该方法继承Configurable接口,获取配置信息和初始化数据时调用
2. ProducerRecord onSend(ProducerRecord record);
将消息序列化和计算分区之前调用,用户可以在该方法中对消息进行任何操作,但最好不要修改消息的topic、key、partition等信息。
3. void onAcknowledgement(RecordMetadata metadata, Exception exception); 在消息被应答之前或消息发送失败时调用.
4. void close();关闭拦截器时执行一些资源清理的工作。
示例代码:
public class ProducerInterceptorPrefix implements ProducerInterceptor {
private volatile long sendSuccess = 0;
private volatile long sendFail = 0;
@Override
public ProducerRecord onSend(ProducerRecord record) {
String value = "test-"+record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), value, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception==null){
sendSuccess++;
}else {
sendFail++;
}
}
@Override
public void close() {
System.out.println("发送成功:"+sendSuccess+",失败:"+sendFail);
}
@Override
public void configure(Map configs) {
}
}
实现自定义的ProducerInterceptor后需要配置kafka参数interceptor.classes;
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
如果生产了5条消息,这5条消息都会增加前缀“test-”,在消息发送完成后客户端会打印“发送成功:5,失败:0”。
KafkaProducer还可以配置多个拦截器形成拦截器链,拦截器链会按照interceptor.classes参数配置的拦截器顺序执行。在拦截器链中,如果某个而拦截器执行失败,那么下一个拦截器会接着上一个而执行成功的拦截器继续执行
6.2 消费者拦截器
消费者拦截器的接口org.apache.kafka.clients.consumer.ConsumerInterceptor,该接口包含以下方法:
1. ConsumerRecords onConsume(ConsumerRecords records);
2. poll()方法返回前调用
void onCommit(Map offsets); 提交万消费位移之后调用
3. void close();
关闭拦截器时执行一些资源清理的工作。
示例代码,下面代码实现了过滤掉消息时间戳与当前时间超过10秒的消息:
public class ConsumerInterceptorTimestamp implements ConsumerInterceptor {
private static final long EXPIRE_TIME = 10*1000;
@Override
public ConsumerRecords onConsume(ConsumerRecords records) {
long now = System.currentTimeMillis();
HashMap>> newRecords = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List> tpRecordList = records.records(partition);
ArrayList> newTpRecordList = new ArrayList<>();
for (ConsumerRecord record : tpRecordList) {
// 判断比较时间
if (now- record.timestamp()(newRecords);
}
@Override
public void onCommit(Map offsets) {
offsets.forEach((t,o)->{
System.out.println(t+"---"+o.offset());
});
}
@Override
public void close() {}
@Override
public void configure(Map configs) {}
}
同样的实现自定义拦截器后需要配置参数interceptor.classes;
properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTimestamp.class.getName());
消费者同样有拦截器链的概念,同样按配置顺序执行,如果某个拦截器执行失败,同样下一个拦截器接着上一个执行成功的拦截器继续执行。