加入收藏 | 设为首页 | 会员中心 | 我要投稿 航空爱好网 (https://www.dakongjun.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Kafka之消费与激情

发布时间:2021-05-30 22:41:20 所属栏目:Linux 来源:互联网
导读:1、Kafka消费 首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的
副标题[/!--empirenews.page--]

首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:

public class JConsumerSubscribe extends Thread {  

    public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /** 初始化Kafka集群信息. */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址  

        props.put("group.id", "ke");// 指定消费者组  

        props.put("enable.auto.commit", "true");// 开启自动提交  

        props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔  

        // 反序列化消息主键        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  

        // 反序列化消费记录        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  

        return props;  

    }    /** 实现一个单线程消费者. */    @Override    public void run() {        // 创建一个消费者实例对象        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());        // 订阅消费主题集合        consumer.subscribe(Arrays.asList("test_kafka_topic"));  

        // 实时消费标识        boolean flag = true;  

        while (flag) {  

            // 获取主题消息数据            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  

            for (ConsumerRecord<String, String> record : records)  

                // 循环打印消息记录                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  

        }        // 出现异常关闭消费者对象        consumer.close();  

    }}  

上述代码我们就可以非常便捷地拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下:

org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {  

        acquireAndEnsureOpen();        try {  

(编辑:航空爱好网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!