Blog

Keep up to date with the latest news

kafka基础使用指南

0.前言

虽然现如今的互联网行业已经步入衰退期,java应用开发的火热程度已经远不及当年,但是瘦死的骆驼比马大,java开发在编程领域还是占有很大的市场的。今天我们说的主角kafka就是用java开发的一款开源软件,通常用作java应用的消息中间件,并且也是目前最好用的消息中间件之一,今天我们就来部署一个kafka,并且看下kafka有哪些常见的用法。

1.准备工作

还是准备一台rocky9 linux操作系统的虚拟机,配置信息如下:

IP地址CPU内存192.168.159.1674核8G

当然,其他配置或者操作系统的虚拟机也可以。

2.部署单机版kafka

部署kafka通常需要搭配一个zookeeper(新版本也支持kraft模式),本身kafka自带zookeeper,但为了方便管理,还是单独部署一个zookeeper给kafka使用。

kafka的安装包可以到官网下载:https://kafka.apache.org/downloads

2.1 安装包准备

本次示例我们的安装包如下:

kafka安装包:kafka_2.12-3.2.0.tgz

zookeeper安装包:zookeeper-3.4.5.tar.gz

注意:kafka启动依赖java,需要虚拟机有jdk

安装包准备好之后,根据个人喜好,上传到虚拟机指定目录。

2.2 部署kafka

因为我们下载的是源码包,所以部署就非常简单了,只需要解压安转包存放到指定目录即可。

(1)部署zookeeper命令如下:

cd ~/install/kafka

tar xf zookeeper-3.4.5.tar.gz

mv zookeeper-3.4.5 /usr/local/zookeeper

(2)部署kafka命令如下:

cd ~/install/kafka

tar xf kafka_2.12-3.2.0.tgz

mv zookeeper-3.4.5 /usr/local/kafka

执行完以上几条命令,kafka就算部署完成了。

2.3 配置修改

接着看一下zookeeper和kafka的配置文件,并可以根据需要修改一些配置。

(1)zookeeper的配置文件

mv /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

vim /usr/local/zookeeper/conf/zoo.cfg

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

# do not use /tmp for storage, /tmp here is just

# example sakes.

dataDir=/data/zookeeper/data

# the port at which the clients will connect

clientPort=2181

#

# Be sure to read the maintenance section of the

# administrator guide before turning on autopurge.

#

# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#

# The number of snapshots to retain in dataDir

#autopurge.snapRetainCount=3

# Purge task interval in hours

# Set to "0" to disable auto purge feature

#autopurge.purgeInterval=1

zookeeper主要需要关注的是dataDir这个配置,是zookeeper存放数据的目录,原本是在/tmp目录下,线上环境最好指定专门的目录。

(2)kafka配置文件

vim /usr/local/kafka/config/server.properties

broker.id=0

listeners=PLAINTEXT://192.168.159.167:9092

advertised.listeners=PLAINTEXT://192.168.159.167:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/data/kafka/log

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0

如果需要从其他服务器访问kafka,需要修改listeners、advertised.listeners配置,指定服务器IP地址,另外可以修改下kafka日志存放目录log.dirs。

2.4 启动服务

配置修改完成之后,就可以启动服务,需要先启动zookeeper,再启动kafka。

(1)启动zookeeper

cd /usr/local/zookeeper/bin/

./zkServer.sh start

(2)启动kafka

cd /usr/local/kafka/bin/

./kafka-server-start.sh -daemon ../config/server.properties

(3)验证服务

ps -ef |grep zookeeper |grep zoo.cfg

如果看到zookeeper进程正常运行,说明zookeeper启动成功。

ps -ef |grep kafka |grep server.properties

如果看到kafka进程正常运行,说明kafka启动成功。

到此,单机版本的kafka就部署完成了。

3.kafka常用术语

kafka涉及到的术语比较多,这里列举一些常用的,说明一下。

(1)broker:kafka集群包括一个或多个服务器,这种服务器叫做broker。broker接受来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出相应,返回已经提交到磁盘上的信息。

(2)topic(主题):每条发布到kafka的消息都有一个类别,这个类别叫做topic。topic就好比数据库的表或者文件系统中的文件夹。一个主题可以分为多个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。注意,由于一个主题可以多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。其实就是队列存储。

(3)message(消息) :kafka的数据单元。

批次:就是一组消息,这些消息属于同一个主题和分区。为了提高效率,消息被分批次写入kafka。

(4)partition(分区):物理上的概念,一个topic通常有多个partition,类似队列,提高读写的并发。

(5)offset(偏移量):一种元数据,它是一个不断递增的整数值,在创建消息时,kafka会把它添加到消息里。kafka为每条在分区的消息保存这个offset,这也是消费者在分区的位置。比如一个偏移量为10的消费者,表示它已经消费了0-9偏移量的消息,下一个要消费的消息是偏移量为10的。kafka 0.9版本之前存在zookeeper,0.9之后存在kafka。

(6)producer(生产者):消息的生产者,负责发送指定topic的消息到broker。默认情况下把消息均衡地分布到主题的所有分区上。

(7)consumer(消费者):消息读取客户端,通过订阅一组topic的消息从broker拉取消息。

(8)consumer group(消费者群组):消费者是消费者群组的一部分,就是说,会有一个或者多个消费者共同读取一个topic。群组保证每个分区只能被一个消费者使用。可以为消费者指定group name,若不指定则属于默认的group。

(9)rebalance(重平衡):消费者组内某个消费者实例挂了之后,其它消费者实例自动重新分配订阅主题分区的过程。rebalance是kafka消费端实现高可用的重要手段。

4.基础功能测试

kafka部署完毕,基础知识也掌握了一些,接下来就来动手试试kafka生产者如何产生消息,消费者如何消费消息。

4.1 主题相关

(1)创建主题

./kafka-topics.sh --create --bootstrap-server 192.168.159.167:9092 --topic mytest

(2)查看主题

./kafka-topics.sh --bootstrap-server 192.168.159.167:9092 --list --exclude-internal

(3)删除主题

./kafka-topics.sh --bootstrap-server 192.168.159.167:9092 --delete --topic test

注意:如果发现删除主题后又重新出现了,需要在配置文件添加delete.topic.enable=true

4.2 消息相关

(1)生产消息

./kafka-console-producer.sh --bootstrap-server 192.168.159.167:9092 --topic test

基于test主题生产消息,命令会卡住,并提示输入消息,如下所示:

我们可以输入一串消息进去,比如:hello,world,hello world

(2)消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.159.167:9092 --topic test --from-beginning

可以看到消费者会消费消息,并打印消费的消息。

5.总结

kafka作为一个老牌消息中间件,有这免费、性能好、社区活跃等优点,目前依然是最常用的消息中间件之一,今天只是讲解了一下kafka的基础知识,还有很多好玩有趣的功能等着大家探索。