纵有疾风起
人生不言弃

Kafka及Spring Cloud Stream


安装

下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz 

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

在kafka server端 config/server.properties中设置

 

必须要配置:

advertised.listeners=PLAINTEXT://192.168.3.201:9092    # 公布访问地址和端口 

 

启动kafka 

 bin/kafka-server-start.sh ../config/server.properties 

检测是否启动 

netstat -tunlp | egrep ” (2181|9092)”

或 lsof -i:9092

测试发送信息和消费消息

创建主题

./kafka-topics.sh –create –zookeeper localhost:2182 –replication-factor 1 –partitions 1 – topic test

生产者 

 ./kafka-console-producer.sh –broker-list localhost:9092 –topic test 

消费者 

./kafkaconsole-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

如果想在外部使用kafka必须 9092 端口加入到防火墙列表

firewall-cmd –list-ports 查询所有放行端口
firewall-cmd –add-port=9092/tcp # 临时端口放行
firewall-cmd –add-port=9092/tcp –permanent # 永久放行
firewall-cmd –reload # 重新载入放行列表

 

简单API的应用 

引入依赖

        <dependency>            <groupId>org.springframework.kafka</groupId>            <artifactId>spring-kafka</artifactId>        </dependency>

编写生成者

package com.example.springkafka.api;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/** * @Date: 2018/11/6 20:25 * @Description: 生产者 */public class KafkaProducerDemo {    public static void main(String[] args) {        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","192.168.3.221:9092");        properties.setProperty("key.serializer", StringSerializer.class.getName());        properties.setProperty("value.serializer", StringSerializer.class.getName());        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);        String topic = "message"; // 主题        Integer partition = 0; // 指定分区        long timeMillis = System.currentTimeMillis(); // 毫秒值 15分钟        String key = "key-message"; // key        String value = "value-message"; // value        // 创建ProducerRecord        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, timeMillis, key, value);        // 生产消息        kafkaProducer.send(producerRecord);        kafkaProducer.close();    }}

 

编写消费者

package com.example.springkafka.api;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;import java.util.Properties;/** * @Date: 2018/11/6 20:25 * @Description: 消费者 */public class KafkaConsumerDemo {    public static void main(String[] args) {        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "192.168.3.221:9092");        properties.setProperty("group.id", "group-1");        properties.setProperty("key.deserializer", StringDeserializer.class.getName());        properties.setProperty("value.deserializer", StringDeserializer.class.getName());        // 创建kafka的消费者对象        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);        // 订阅kafka主题        kafkaConsumer.subscribe(Arrays.asList("message"));        while (true) {            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);            for (ConsumerRecord<String, String> record : records)                System.out.printf("========offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());        }    }}

 spring kafka

依赖

        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-stream-binder-kafka</artifactId>        </dependency>

生成者与消费者配置

# 生成者配置spring:  kafka:    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    bootstrap-servers: 192.168.3.221:9092    consumer: # 消费者      group-id: gerry-1      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializerkafka:  topic: gerry

生成者代码

package com.example.springcloudkafka.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;/** * @Date: 2018/11/6 21:03 * @Description: */@RestControllerpublic class KafkaProducerController {    public final KafkaTemplate<String, String> kafkaTemplate;    private final String topic;    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,                                   @Value("${kafka.topic}") String topic) {        this.kafkaTemplate = kafkaTemplate;        this.topic = topic;    }    @PostMapping("message/send") // 这种方式只支持post    public boolean sendMessage(@RequestParam String message) {        kafkaTemplate.send(topic,message);        return true;    }}

消费者代码

package com.example.springcloudkafka.listener;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * @Date: 2018/11/6 21:20 * @Description: */@Componentpublic class KafkaConsumerListener {    @KafkaListener(topics={"${kafka.topic}"})    public void getMessage(String message) {        System.out.println("kafka 消费者监听,接收到消息:" + message);    }}

Spring Cloud Stream 

官方定义三个接口
Source=> 发送者 Producer、Publisher
Sink=> 接收器 Consumer、 Subscriber Processor: 上流而言Sink、下流而言Souce

Spring Cloud Stream Binder: Kafka 

引入依赖:

        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-stream-binder-kafka</artifactId>        </dependency>

配置:

# 生成者配置spring:  kafka:    bootstrap-servers: 192.168.3.221:9092  cloud:    stream:      bindings:        output:          destination: ${kafka.topic}        input:          destination: ${kafka.topic}kafka:  topic: cloud-stream

 

生产者:

package com.example.springcloudstreamkafkademo.producer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Component@EnableBinding(Source.class)public class MessageProducerBean {    @Autowired    @Qualifier(Source.OUTPUT)    private MessageChannel messageChannel;    @Autowired    private Source source;    /**     * 发送信息     * @param message     */    public void send(String message) {        // 通过消息管道发送消息        // messageChannel.send(MessageBuilder.withPayload(message).build());        source.output().send(MessageBuilder.withPayload(message).build());    }}

消费者

package com.example.springcloudstreamkafkademo.consumer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.SubscribableChannel;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component@EnableBinding(value={Sink.class})public class MessageConsumerBean {    @Autowired    @Qualifier(Sink.INPUT)    private SubscribableChannel subscribableChannel;    //1、 当subscribableChannel注入完成后完成回调    @PostConstruct    public void init() {        subscribableChannel.subscribe(message->{            System.out.println(message.getPayload());        });    }    // 2、@ServiceActivator    @ServiceActivator(inputChannel=Sink.INPUT)    public void message(String message) {        System.out.println("@ServiceActivator:"+message);    }    //3、@StreamListener    @StreamListener(Sink.INPUT)    public void onMessage(String message) {        System.out.println("@StreamListener:"+message);    }}

 

Spring Cloud Stream Binder: RabbitMQ 

 引入依赖

        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>        </dependency>

配置

spring:  cloud:    stream:      bindings:        output:          destination: ${rabbit.queue}        input:          destination: ${rabbit.queue}  rabbitmq:    host: 192.168.3.221    port: 5672    username: rabbit    password: rabbitrabbit:  queue: cloud-stream-queue

代码同kafka

 

完整代码详见:https://gitee.com/lm970585581/cloud-config/tree/master/Spring%20Cloud%20Stream%20

文章转载于:https://www.cnblogs.com/lm970585581/p/9920978.html

原著是一个有趣的人,若有侵权,请通知删除

未经允许不得转载:起风网 » Kafka及Spring Cloud Stream

分享到: 生成海报
avatar

评论 抢沙发

评论前必须登录!

立即登录   注册

切换注册

登录

忘记密码 ?

切换登录

注册

我们将发送一封验证邮件至你的邮箱, 请正确填写以完成账号注册和激活