kafka0.8升级0.10测试计划

kafka0.8升级0.10测试计划

升级测试 10.0.41.135~10.0.41.137 kafka集群测试机

一、升级主流程

1、拷贝0.8版本配置文件,修改添加0.10版本配置

1
2
3
4
5
6
7
8
9
#修改项
log.dirs=/opt/kafka_2.10-0.8.2.1/kafka-logs,/opt/kafka_2.10-0.10.1.0/kafka-logs

#添加项
auto.create.topics.enable=false
num.replica.fetchers=3
unclean.leader.election.enable=false
inter.broker.protocol.version=0.8.2.0
log.message.format.version=0.8.2.0

2、逐一停止0.8版本kafka,启动0.10版本kafka

1
2
3
4
5
6
7
#停止该服务器上所有0.8kafka进程
netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill –SIGTERM
#启动0.10kafka
/opt/kafka_2.10-0.10.1.0/start

#可选命令
/opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181(可选)

3、修改配置,并在此逐一启动kafka

1
2
3
4
5
6
7
8
9
10
#修改0.10配置
inter.broker.protocol.version=0.10.1.0

#停止该服务器上所有0.10kafka进程
netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -s TERM
#启动0.10kafka
/opt/kafka_2.10-0.10.1.0/start

#可选命令
/opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181(可选)

二、测试

topic准备

  • wangzhe:用于测试升级过程自动leader均衡和手动leader均衡
  • wangzhe_long_running :测试升级过程中topic生产消费是否出问题

wangzhe

测试0.8和0.10版本客户端对0.10版本kafka集群的生产消费许可状况

分别测试未均衡前、自动均衡后和手动均衡后的leader、replicas和lsr

自动均衡时间过久(大概要一天),只测试手动均衡和未均衡之前的状态

wangzhe_long_running

在测试开始前执行持续开启写入命令和持续开启消费命令,在测试过程中查看是否报错

测试过程

1、准备测试初试环境

之前测试已经将0.10的kafka升级完成,将0.8和0.10配置均恢复为未升级前,将kafka集群停止,恢复到0.8kafka集群

2、创建新topic

1
2
3
bin/kafka-topics.sh --create --zookeeper 10.0.41.135:2181 --replication-factor 3 --partitions 3 --topic wangzhe

bin/kafka-topics.sh --create --zookeeper 10.0.41.135:2181 --replication-factor 3 --partitions 3 --topic wangzhe_long_running

3、预置topic数据

1
2
3
4
5
6
7
8
9
10
bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe
kafka_08_1
kafka_08_2
kafka_08_3
kafka_08_4
kafka_08_5
kafka_08_6
kafka_08_7
kafka_08_8
kafka_08_9

4、拷贝0.8版本配置并添加修改

1
2
3
4
5
6
7
8
9
10
server.properties
#修改项
log.dirs=/opt/kafka_2.10-0.8.2.1/kafka-logs,/opt/kafka_2.10-0.10.1.0/kafka-logs

#添加项
auto.create.topics.enable=false
num.replica.fetchers=3
unclean.leader.election.enable=false
inter.broker.protocol.version=0.8.2.0
log.message.format.version=0.8.2.0

5、开启持续写入/消费

在10.0.41.132测试机上开启持续写入和持续消费,测试wangzhe_long_running

1
2
3
4
5
#开启持续写入
bin/kafka-producer-perf-test.sh --messages 1000000000 --message-size 5 --batch-size 10 --topics wangzhe_long_running --threads 1 --broker-list slave135:9092,slave136:9092,slave137:9092

#开启持续消费
bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe_long_running

6、逐一停止0.8kafka节点并对应的启动0.10节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#停止0.8节点
netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -SIGTERM

#启动0.10节点
/opt/kafka_2.10-0.10.1.0/start

#查看topic状态
bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

#第二遍测试手动均衡前后topic状态
#手动均衡
/opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181

bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

7、修改配置,并再次逐一重启

1
2
3
4
5
6
7
8
9
10
11
12
#修改0.10配置
inter.broker.protocol.version=0.10.1.0

#停止该服务器上所有0.10kafka进程
netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -s TERM
#启动0.10kafka
/opt/kafka_2.10-0.10.1.0/start

#第二遍测试手动均衡前后topic状态
/opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181

bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

9、测试0.8和0.10客户端生产消费0.10版本kafka集群

测试0.8和0.10客户端生产消费0.10版本kafka集群
生产者和消费者为10.0.41.132测试机,位于kafka集群之外

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#0.8的生产者
/opt/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe

#0.8的消费者
/opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

#0.10的消费者
/opt/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

#0.10的生产者
/opt/kafka_2.10-0.10.1.0/bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe

#0.8的消费者
/opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

#0.10的消费者
/opt/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

至此,测试流程完成

命令参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#启动kafka
bin/kafka-server-start.sh config/server.properties
/opt/kafka_2.10-0.10.1.0/start

#创建topic
bin/kafka-topics.sh --create --zookeeper 10.0.41.135:2181 --replication-factor 3 --partitions 3 --topic wangzhe

#删除topic
bin/kafka-topics.sh --delete --zookeeper 10.0.41.135:2181 --topic wangzhe

#查看所有topic
bin/kafka-topics.sh --list --zookeeper 10.0.41.135:2181

#查看topic状态
bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

#启动生产者(启动成功进入命令行阻塞状态,可以输入数据,回车发送)
bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe_long_running

#启动消费者(启动后命令行处于阻塞状态,生产者发布的消息会在此显示)
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –wangzhe_long_running

#终止节点所有kafka进程
netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -SIGTERM

#持续开启写入
bin/kafka-producer-perf-test.sh --messages 1000000000 --message-size 5 --batch-size 10 --topics wangzhe_long_running --threads 1 --broker-list slave135:9092,slave136:9092,slave137:9092

#持续开启消费
bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe_long_running --from-beginning