写了个kafka的java demo 顺便记录下,仅供参考

1.创建maven项目

目录如下:

kafka生产者和消费者的javaAPI怎么用

2.pom文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
  <modelVersion>4.0.0</modelVersion> 
  <groupId>Kafka-Maven</groupId> 
  <artifactId>Kafka-Maven</artifactId> 
  <version>0.0.1-SNAPSHOT</version> 
  <dependencies> 
    <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>0.10.1.1</version> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.2.0</version> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
      <version>2.2.0</version> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.2.0</version> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-client</artifactId> 
      <version>1.0.3</version> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-server</artifactId> 
      <version>1.0.3</version> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
      <version>2.2.0</version> 
    </dependency> 
    <dependency> 
      <groupId>jdk.tools</groupId> 
      <artifactId>jdk.tools</artifactId> 
      <version>1.7</version> 
      <scope>system</scope> 
      <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> 
    </dependency> 
    <dependency> 
      <groupId>org.apache.httpcomponents</groupId> 
      <artifactId>httpclient</artifactId> 
      <version>4.3.6</version> 
    </dependency> 
  </dependencies> 
  <build> 
    <plugins> 
      <plugin> 
        <groupId>org.apache.maven.plugins</groupId> 
        <artifactId>maven-compiler-plugin</artifactId> 
        <configuration> 
          <source>1.7</source> 
          <target>1.7</target> 
        </configuration> 
      </plugin> 
    </plugins> 
  </build> 
</project>

3.kafka生产者KafkaProduce:

package com.lijie.producer; 
 
import java.io.File; 
import java.io.FileInputStream; 
import java.util.Properties; 
 
import org.apache.kafka.clients.producer.Callback; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
public class KafkaProduce { 
  private static Properties properties; 
 
  static { 
    properties = new Properties(); 
    String path = KafkaProducer.class.getResource("/").getFile().toString() 
        + "kafka.properties"; 
    try { 
      FileInputStream fis = new FileInputStream(new File(path)); 
      properties.load(fis); 
    } catch (Exception e) { 
      e.printStackTrace(); 
    } 
  } 
 
  /** 
   * 发送消息 
   *  
   * @param topic 
   * @param key 
   * @param value 
   */ 
  public void sendMsg(String topic, byte[] key, byte[] value) { 
 
    // 实例化produce 
    KafkaProducer<byte[], byte[]> kp = new KafkaProducer<byte[], byte[]>( 
        properties); 
 
    // 消息封装 
    ProducerRecord<byte[], byte[]> pr = new ProducerRecord<byte[], byte[]>( 
        topic, key, value); 
 
    // 发送数据 
    kp.send(pr, new Callback() { 
      // 回调函数 
      @Override 
      public void onCompletion(RecordMetadata metadata, 
          Exception exception) { 
        if (null != exception) { 
          System.out.println("记录的offset在:" + metadata.offset()); 
          System.out.println(exception.getMessage() + exception); 
        } 
      } 
    }); 
 
    // 关闭produce 
    kp.close(); 
  } 
}

4.kafka消费者KafkaConsume:

package com.lijie.consumer; 
 
import java.io.File; 
import java.io.FileInputStream; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
 
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; 
 
import com.lijie.pojo.User; 
import com.lijie.utils.JsonUtils; 
 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.serializer.StringDecoder; 
import kafka.utils.VerifiableProperties; 
 
public class KafkaConsume { 
 
  private final static String TOPIC = "lijietest"; 
 
  private static Properties properties; 
 
  static { 
    properties = new Properties(); 
    String path = KafkaConsume.class.getResource("/").getFile().toString() 
        + "kafka.properties"; 
    try { 
      FileInputStream fis = new FileInputStream(new File(path)); 
      properties.load(fis); 
    } catch (Exception e) { 
      e.printStackTrace(); 
    } 
  } 
 
  /** 
   * 获取消息 
   *  
   * @throws Exception 
   */ 
  public void getMsg() throws Exception { 
    ConsumerConfig config = new ConsumerConfig(properties); 
 
    ConsumerConnector consumer = kafka.consumer.Consumer 
        .createJavaConsumerConnector(config); 
 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
 
    topicCountMap.put(TOPIC, new Integer(1)); 
 
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); 
 
    StringDecoder valueDecoder = new StringDecoder( 
        new VerifiableProperties()); 
 
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer 
        .createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 
 
    KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0); 
 
    ConsumerIterator<String, String> it = stream.iterator(); 
 
    while (it.hasNext()) { 
      String json = it.next().message(); 
      User user = (User) JsonUtils.JsonToObj(json, User.class); 
      System.out.println(user); 
    } 
  } 
}

5.kafka.properties文件

##produce 
bootstrap.servers=192.168.80.123:9092 
producer.type=sync 
request.required.acks=1 
serializer.class=kafka.serializer.DefaultEncoder 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
bak.partitioner.class=kafka.producer.DefaultPartitioner 
bak.key.serializer=org.apache.kafka.common.serialization.StringSerializer 
bak.value.serializer=org.apache.kafka.common.serialization.StringSerializer 
 
##consume 
zookeeper.connect=192.168.80.123:2181  
group.id=lijiegroup  
zookeeper.session.timeout.ms=4000  
zookeeper.sync.time.ms=200  
auto.commit.interval.ms=1000  
auto.offset.reset=smallest  
serializer.class=kafka.serializer.StringEncoder

本文参考链接:https://www.yisu.com/zixun/207397.html
评论关闭
IT干货网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

java为什么调用 start() 方法时会执行 run() 方法