纵有疾风起
人生不言弃

Kafka集群搭建


一、Kafka集群搭建

1、环境构建

 安装kafka集群之前,确保zookeeper服务已经正常运行,这里3台zookeeper准备工作都已完成,三台主机分别为:192.168.3.220,192.168.3.221,192.168.3.222

wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz ##拉取压缩包到本地进行解压安装

tar -xf kafka_2.11-2.1.1.tgz ##解压文件

mv kafka-2.1.1-src /usr/local/kafka ##移动到/usr/local/kafka目录下面

2、编辑配置

cd /usr/local/kafka/config

vim server.properties

修改配置文件的以下属性

## 强调这个ID在集群中必须是唯一否则会出现ID冲突问题broker.id=0 ## 配置kafka的服务监听端口## 如果配置0.0.0.0则绑定全部网卡,如果默认像下面这样,kafka会绑定默认的所有网卡ip,一般在机器中hosts,hostname都要正确配置,这里默认即可;然后下面的port默认9092不用配置,如果自定义端口号需要设置和listeners的一致,这个是kafka服务监听的端口号. listeners=PLAINTEXT://192.168.3.220:9092## 配置日志目录,建议在kafka的根目录创建一个日志目录log.dirs=/usr/local/kafka/logs## 配置分区个数num.partitions=1## 配置Zookeeper集群字符串zookeeper.connect=192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181

 

把刚的配置远程拷贝到其他的2台机器上面

scp -r /usr/local/kafka 192.168.3.221:/usr/local/

scp -r /usr/local/kafka 192.168.3.222:/usr/local/

并修改上面配置文件的属性

broker.id和listeners就OK

3、启动kafka集群

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

4、测试kafka集群

4.1、创建和查看消息主题

连接zookeeper,创建一个名为test-topic的topic

sh kafka-topics.sh --create --zookeeper 192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181 --replication-factor 3 --partitions 3 --topic test-topic Created topic "test-topic".

 

查看此topic属性

sh kafka-topics.sh --describe --zookeeper 192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181 --topic test-topicTopic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:    Topic: test-topic       Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 2,1    Topic: test-topic       Partition: 1    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1    Topic: test-topic       Partition: 2    Leader: 1       Replicas: 1,3,2 Isr: 1
 

查看已经创建的topic列表

sh kafka-topics.sh --list --zookeeper 192.168.3.220:2181,192.168.3.221:2181,192.168.3.222:2181test-topic

4.2、创建消息生产者发送消息

sh kafka-console-producer.sh --broker-list 192.168.3.220:9092,192.168.3.221:9092,192.168.3.222:9092 --topic test-topic

4.3、创建消息消费者接收消息

sh kafka-console-consumer.sh --bootstrap-server 192.168.3.220:9092,192.168.3.221:9092,192.168.3.222:9092 --topic test-topic --from-beginning

 

★错误记录:-bash: ./kafka-server-start.sh: Permission denied

在执行启动启动kafka集群的时候,报错没有权限操作该文件,直接通过 chmod 777 zookeeper-server-start.sh命令给无权限的文件更改权限。

[2019-02-28 22:00:23,657] WARN [Producer clientId=console-producer] Connection to node 1 (/192.168.3.220:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

这个是因为防火墙没有放行端口

firewall-cmd –add-port=9092/tcp –permanent

5、Java连接生成消息和发送消息

引入依赖

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.12</artifactId>    <version>2.1.1</version></dependency>

 

编写生产者代码

import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;​​public class KafkaProducer {     public static void main(String[] args) throws InterruptedException {        Properties props = new Properties();        //kafka服务器地址        props.put("bootstrap.servers", "192.168.3.220:9092,192.168.3.221:9092,192.168.3.222:9092");        //ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。        props.put("acks", "1");        //retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。        props.put("retries", 0);        //producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)        props.put("batch.size", 16384);        //默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间        //希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们        //没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。        //不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。        props.put("linger.ms", 1);        //buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值        //通过max.block.ms设定,之后他将抛出一个TimeoutExecption。        props.put("buffer.memory", 33554432);        //key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //设置kafka的分区数量        props.put("kafka.partitions", 12);                Producer<String, String> producer = new KafkaProducer<>(props);        for (int i = 0; i < 50; i++){            System.out.println("key-->key"+i+"  value-->vvv"+i);            producer.send(new ProducerRecord<String, String>("aaa", "key"+i, "vvv"+i));            Thread.sleep(1000);        }        producer.close();    }}

 

用于建立消费者的相关参数说明及其默认值参见producerconfigs,此处对代码中用到的几个参数进行解释:

bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;

acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。

retries:生产者发送失败后,重试的次数

batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。

linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。

batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。

buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。

key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

编写消费端代码

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;import java.util.Properties;spublic class KafkaConsumer {     public KafkaConsumer<String, String>  getConsmer() {        Properties props = new Properties();        //设置kafka服务器        props.put("bootstrap.servers", "192.168.3.220:9092,192.168.3.221:9092,192.168.3.222:9092");        //消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一个群组内的消费者只有一个能消费到消息        props.put("group.id", "test");        //true,消费者的偏移量将在后台定期提交;false关闭自动提交位移,在消息被完整处理之后再手动提交位移        props.put("enable.auto.commit", "true");        //如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期        props.put("auto.commit.interval.ms", "1000");        //session.timeout.ms:在使用kafka的组管理时,用于检测消费者故障的超时        props.put("session.timeout.ms", "30000");        //key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        return consumer;    }             public static void main(String[] args) {        KConsumer kconsumer =  new KConsumer();        KafkaConsumer<String, String> consumer = kconsumer.getConsmer();                consumer.subscribe(Arrays.asList("aaa"));        while (true) {            ConsumerRecords<String, String> records = consumer.poll(100);            for (ConsumerRecord<String, String> record : records)                System.out.println("offset =  "+record.offset()+", key = "+record.key()+", value = "+ record.value());        }    }}

 

 

 

 

文章转载于:https://www.cnblogs.com/lm970585581/p/13398341.html

原著是一个有趣的人,若有侵权,请通知删除

未经允许不得转载:起风网 » Kafka集群搭建

分享到: 生成海报
avatar

评论 抢沙发

评论前必须登录!

立即登录   注册

切换注册

登录

忘记密码 ?

切换登录

注册

我们将发送一封验证邮件至你的邮箱, 请正确填写以完成账号注册和激活