以下建议是在我之前的问题 How to test a ConsumerAwareRebalanceListener? 的答案中给我的,我能够使用 Spring Kafka 2.1.x 设置一个 ConsumerAwarereBalanceListener

但是,下面的代码

Map<TopicPartition, Long> queries = new HashMap<>(); 
final Long rewindTime = getRewindTime(); 
for (TopicPartition partition : partitions) { 
    queries.put(partition, rewindTime); 
} 
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(queries); 
result.forEach((key, value) -> consumer.seek(key, value.offset())); 

如果主题查询结果为空,则抛出 NullPointerException。在本例中,valuenull。堆栈跟踪如下。

java.lang.NullPointerException: null 
    at org.rcardin.MyConsumerAwareRebalancedListener.lambda$onPartitionsAssigned$0(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na] 
    at java.util.HashMap.forEach(HashMap.java:1288) ~[na:1.8.0_144] 
    at org.rcardin.MyConsumerAwareRebalancedListener.onPartitionsAssigned(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:596) ~[spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE] 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) [kafka-clients-1.0.2.jar:na] 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) [kafka-clients-1.0.2.jar:na] 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) [kafka-clients-1.0.2.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) [kafka-clients-1.0.2.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144] 
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144] 

我正在尝试编写一个测试,其目的是验证此行为。但是,由于异常发生在与主线程不同的单独线程上,因此我无法检索此类异常。

有什么建议吗?

非常感谢,

请您参考如下方法:

根据 offsetsForTimes() Javadocs,这看起来像是代码中的错误:

 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater 
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no 
 *         such message. 

因此,您绝对应该使用 null 检查来保护您的 value.offset():

result.entrySet() 
            .stream() 
            .filter(e -> e.getValue() != null) 
            .forEach(e -> consumer.seek(e.getKey(), e.getValue().offset())); 


评论关闭
IT干货网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!