kafka_2.12-2.4.x-producer生产者-java原生api实现 作者:马育民 • 2022-09-26 10:41 • 阅读:10059 # 创建maven工程 略 # pom.xml ``` org.apache.kafka kafka-clients 2.4.1 com.fasterxml.jackson.core jackson-databind 2.10.1 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-log4j12 1.7.30 ``` # log4j.properties 在 `resources` 目录下创建 `log4j.properties` 文件,内容如下: ``` # DEBUG 级别,即:所有信息都会输出 log4j.rootLogger=WARN, stdout # 配置 stdout,输出到控制台 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n ``` # java 代码 ### 主类 ``` package top.malaoshi.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; public class Producer { final static String TOPIC = "kafka2flink"; public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // StringSerializer.class.getName() 等于 org.apache.kafka.common.serialization.StringSerializer properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") ; // 3. 创建 kafka 生产者对象 KafkaProducer kafkaProducer = new KafkaProducer(properties); // 4. 调用 send 方法,发送消息 String event1 = EventBuilder.buildJson("1",36.1); kafkaProducer.send(new ProducerRecord(TOPIC ,event1 )); // 休眠,模拟发送数据延迟 Thread.sleep(2000); String event2 = EventBuilder.buildJson("2",33.1); kafkaProducer.send(new ProducerRecord(TOPIC,event2 )); // 休眠,模拟发送数据延迟 Thread.sleep(5000); String event3 = EventBuilder.buildJson("3",38.1); kafkaProducer.send(new ProducerRecord(TOPIC,event3 )); // 5. 关闭资源 kafkaProducer.close(); } } ``` ### Event ``` package top.malaoshi.kafka; import com.fasterxml.jackson.annotation.JsonFormat; import java.util.Date; public class Event { private String id; private double temp; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date time; public Event(String id, double temp, Date time) { this.id = id; this.temp = temp; this.time = time; } public String getId() { return id; } public void setId(String id) { this.id = id; } public double getTemp() { return temp; } public void setTemp(double temp) { this.temp = temp; } public Date getTime() { return time; } public void setTime(Date time) { this.time = time; } } ``` ### EventBuilder ``` package top.malaoshi.kafka; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Date; public class EventBuilder { public static Event build(String id, double temp){ Date date = new Date(); Event ret = new Event(id,temp,date); return ret; } public static String buildJson(String id, double temp){ Event event = build(id,temp); ObjectMapper om = new ObjectMapper(); try { String json = om.writeValueAsString(event); return json; } catch (JsonProcessingException e) { throw new RuntimeException(e); } } } ``` # kafka 服务 ### 启动zk 登录 hadoop1,执行下面命令: ``` /program/bin/zk.sh start ``` ### 启动kafka 登录 hadoop1,执行下面命令: ``` /program/bin/kafka.sh start ``` ### 创建主题 ``` kafka-topics.sh --bootstrap-server hadoop1:9092 --create --replication-factor 3 --partitions 3 --topic kafka2flink ``` ### 启动消费者、指定主题 ``` kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink ``` ### 运行 java 代码 略 ### 执行结果 消费者控制台执行结果如下: ``` {"id":"1","temp":36.1,"time":"2022-09-26 10:35:25"} {"id":"1","temp":36.1,"time":"2022-09-26 10:35:27"} {"id":"1","temp":36.1,"time":"2022-09-26 10:35:32"} ``` ### 参考 https://blog.csdn.net/weixin_45847167/article/details/123022786 原文出处:http://malaoshi.top/show_1IX47LK63jjR.html