kafka详解--性能

kafka详解–性能

kafka性能如此之高主要是kafka针对四个方面做了性能优化

  • 将大量小io改造成少量大io
  • 利用sendfile减少数据拷贝
  • 支持snappy,gzip,lz4三种算法批量压缩消息,减少网络传输消耗
  • 采用nio网络模型,与1 acceptor thread + N processor threads的reactor线程模型

大量小io改造成少量大io

大量读写少量消息会导致性能较差,通过将消息聚合,可以减少读写次数(减少随机IO),增加单次读写数据量(增加顺序IO)

在大量文件读写的时候,基于queue的read和append只需要一次磁盘寻址,而Btree则会涉及多次。磁盘寻址过程极大降低了读写性能

利用sendfile减少数据拷贝

sendfile

在传统的文件传输里面(read/write方式),在实现上其实是比较复杂的,需要经过多次上下文的切换,我们看一下如下两行代码:

1
2
3
//传统的read/write方式进行文件到socket的传输
read(file, tmp_buf, len);
write(socket, tmp_buf, len);

当需要对一个文件进行传输的时候,其具体流程细节如下:

  1. 调用read函数,文件数据被copy到内核缓冲区
  2. read函数返回,文件数据从内核缓冲区copy到用户缓冲区
  3. write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区。
  4. 数据从socket缓冲区copy到相关协议引擎。

一般来说一个网络应用是通过读硬盘数据,然后写数据到 socket 来完成网络传输的。上面2行用代码解释了这一点,不过上面2行简单的代码掩盖了底层的很多操作。来看看底层是怎么执行上面2行代码的:

  1. 系统调用 read() 产生一个上下文切换:从 user mode 切换到 kernel mode,然后 DMA 执行拷贝,把文件数据从硬盘读到一个 kernel buffer 里。
  2. 数据从 kernel buffer 拷贝到 user buffer,然后系统调用 read() 返回,这时又产生一个上下文切换:从kernel mode 切换到 user mode。
  3. 系统调用 write() 产生一个上下文切换:从 user mode 切换到 kernel mode,然后把步骤2读到 user buffer 的数据拷贝到 kernel buffer(数据第2次拷贝到 kernel buffer),不过这次是个不同的 kernel buffer,这个 buffer 和 socket 相关联。
  4. 系统调用 write() 返回,产生一个上下文切换:从 kernel mode 切换到 user mode(第4次切换了),然后 DMA 从 kernel buffer 拷贝数据到协议栈(第4次拷贝了)。

上面4个步骤有4次上下文切换,有4次拷贝,我们发现如果能减少切换次数和拷贝次数将会有效提升性能。在kernel 2.0+ 版本中,系统调用 sendfile() 就是用来简化上面步骤提升性能的。sendfile() 不但能减少切换次数而且还能减少拷贝次数。

以上细节是传统read/write方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次copy操作:

硬盘—>内核buf—>用户buf—>socket相关缓冲区—>协议引擎

而sendfile系统调用则提供了一种减少以上多次copy,提升文件传输性能的方法。Sendfile系统调用是在2.1版本内核时引进的:

1
sendfile(socket, file, len);

运行流程如下:

  1. sendfile系统调用,文件数据被copy至内核缓冲区
  2. 再从内核缓冲区copy至内核中socket相关的缓冲区
  3. 最后再socket相关的缓冲区copy到协议引擎

相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,再由user缓冲区到socket相关 缓冲区的文件copy,而在内核版本2.4之后,文件描述符结果被改变,sendfile实现了更简单的方式,系统调用方式仍然一样,细节与2.1版本的 不同之处在于,当文件数据被复制到内核缓冲区时,不再将所有数据copy到socket相关的缓冲区,而是仅仅将记录数据位置和长度相关的数据保存到 socket相关的缓存,而实际数据将由DMA模块直接发送到协议引擎,再次减少了一次copy操作。

支持snappy,gzip,lz4三种算法批量压缩消息,减少网络传输消耗

采用nio网络模型,与1 acceptor thread + N processor threads的reactor线程模型

kafka server端采用与Mina一样的网络、线程模型。server端基于nio,采用1个acceptor线程接受tcp连接,并将连接分配给N个proccessor线程,proccessor线程执行具体的IO读写、逻辑处理操作。(注:相比较于这种模型,netty的N boss + N worker的模型更加灵活)

Kafka设计解析(一)--Kafka背景及架构介绍

Kafka设计解析(一)–Kafka背景及架构介绍

本文介绍了Kafka的创建背景,设计目标,使用消息系统的优势以及目前流行的消息系统对比。并介绍了Kafka的架构,Producer消息路由,Consumer Group以及由其实现的不同消息分发方式,Topic & Partition,最后介绍了Kafka Consumer为何使用pull模式以及Kafka提供的三种delivery guarantee。

摘要

  Kafka是由LinkedIn开发并开源的分布式消息系统,因其分布式及高吞吐率而被广泛使用,现已与Cloudera Hadoop,Apache Storm,Apache Spark集成。本文介绍了Kafka的创建背景,设计目标,使用消息系统的优势以及目前流行的消息系统对比。并介绍了Kafka的架构,Producer消息路由,Consumer Group以及由其实现的不同消息分发方式,Topic & Partition,最后介绍了Kafka Consumer为何使用pull模式以及Kafka提供的三种delivery guarantee。

背景介绍

Kafka创建背景

  Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。
  活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。
  近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。   

Kafka简介

  Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输
  • 同时支持离线数据处理和实时数据处理
  • Scale out:支持在线水平扩展

为何使用消息系统

  • 解耦
      在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

  • 冗余
      有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

  • 扩展性
      因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

  • 灵活性 & 峰值处理能力
      在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  • 可恢复性
      系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  • 顺序保证
      在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

  • 缓冲
      在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

  • 异步通信
      很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

常用Message Queue对比

  • RabbitMQ
      RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

  • Redis
      Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

  • ZeroMQ
      ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。

  • ActiveMQ
      ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

  • Kafka/Jafka
      Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

Kafka架构

Terminology

  • Broker
      Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic
      每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition
      Parition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer
      负责发布消息到Kafka broker
  • Consumer
      消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group
      每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
    Kafka拓扑结构

Kafka拓扑结构

如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。  

Topic & Partition

  Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),如下图所示。


  每个日志文件都是一个log entry序列,每个log entry包含一个4字节整型数值(值为N+5),1个字节的”magic value”,4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:
  message length : 4 bytes (value: 1+4+n)
  “magic” value : 1 byte
  crc : 4 bytes
  payload : n bytes
  这个log entry并非由一个文件构成,而是分成多个segment,每个segment以该segment第一条消息的offset命名并以“.kafka”为后缀。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,如下图所示。

  因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

  对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示。

1
2
3
4
5
6
7
8
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

  这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。   

Producer消息路由

  Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
  
  在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
public JasonPartitioner(VerifiableProperties verifiableProperties) {}

@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}

  如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。   

1
2
3
4
5
6
7
8
9
10
public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
   List messageList = new ArrayList<KeyedMessage<String, String>>();
   for(int j = 0; j < 4; j++){
   messageList.add(new KeyedMessage<String, String>("topic2", String.valueOf(j), String.format("The %d message for key %d", i, j));
   }
   producer.send(messageList);
}
  producer.close();
}

  则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。
  
   

Consumer Group

  (本节所有描述都是基于Consumer hight level API而非low level API)。
  使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

  这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
  实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。下图是Kafka在Linkedin的一种简化部署示意图。

  下面这个例子更清晰地展示了Kafka Consumer Group的特性。首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。如下图所示。

Push vs. Pull  

  作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
  push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
  对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。   

Kafka delivery guarantee

  
  有这么几种可能的delivery guarantee:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
      
      当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。
      接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
    读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
    读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)
    如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
      总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。

Freemarker入门

Freemarker入门

基础

标量类别

  • 字符串
  • 数字
  • 日期/时间
  • 布尔值
  1. 数据模型可以被看做是树状结构
  2. 标量存储单一的值
  3. 哈希表示存储变量和与其相关且有唯一标识名称变量的容器
  4. 序列式存储有序变量的容器。存储的变量可以通过数字索引来检索,索引通常从零开始。

模板

  • interpolations 插值 ${...}
  • FTL tags 标签

一、整体结构

1、注释:<#–注释内容–>,不会输出。
2、文本:直接输出。
3、interpolation:由 ${var} 或 #{var} 限定,由计算值代替输出。
4、FTL标记

二、指令:

freemarker指令有两种:
1、预定义指令:引用方式为<#指令名称>
2、用户定义指令:引用方式为<@指令名称>,引用用户定义指令时须将#换为@。
注意:如果使用不存在的指令,FreeMarker不会使用模板输出,而是产生一个错误消息。
freemarker指令由FTL标记来引用,FTL标记和HTML标记类似,名字前加#来加 以区分。如HTML标记的形式为

则FTL标记的形式是<#list>< /#list>(此处h1标记和list指令没有任何功能上的对应关系,只是做为说明使用一下)。
有三种FTL标记:
1)、开始标记:<#指令名称>
2)、结束标记:</#指令名称>
3)、空标记:<#指令名称/>
注意:
1) FTL会忽略标记之中的空格,但是,<#和指令 与 </#和指令 之间不能有空格。
2) FTL标记不能够交叉,必须合理嵌套。每个开始标记对应一个结束标记,层层嵌套。 如:
<#list>


  • ${数据}
    <#if 变量>
  • game over!


    </#if>

    </#list>
    注意事项:
    1)、FTL对大小写敏感。 所以使用的标记及interpolation要注意大小写。name与NAME就是不同的对象。<#list>是正确的标记,而<#List>则不是。
    2)、interpolation只能在文本部分使用,不能位于FTL标记内。如<#if ${var}>是错误的,正确的方法是:<#if var>,而且此处var必须为布尔值。
    3)、FTL标记不能位于另一个FTL标记内部,注释例外。注释可以位于标记及interpolation内部。

    Freemarker入门

    Freemarker入门

    基础

    标量类别

    • 字符串
    • 数字
    • 日期/时间
    • 布尔值
    1. 数据模型可以被看做是树状结构
    2. 标量存储单一的值
    3. 哈希表示存储变量和与其相关且有唯一标识名称变量的容器
    4. 序列式存储有序变量的容器。存储的变量可以通过数字索引来检索,索引通常从零开始。

    模板

    • interpolations 插值 ${...}
    • FTL tags 标签 以符号# 开头,用户自定义的 FTL 标签使用@符号来代替#
    • Comments 注释 用<#---->来分 隔
    • directives 指令 就是所指的 FTL 标签。这些指令在 HTML 的标签(如<table>和 </table>)和 HTML 元素(如 table 元素)中的关系是相同的。

    常用指令

    1、 if 指令 <#if condition>和</#if> condition为true,显示中间内容,condition为false,中间内容被略过;使用<#else>标签可以指定当条件为假时程序执行的内容。
    2、 list 指令

    一、整体结构

    1、注释:<#–注释内容–>,不会输出。
    2、文本:直接输出。
    3、interpolation:由 ${var} 或 #{var} 限定,由计算值代替输出。
    4、FTL标记

    二、指令:

    freemarker指令有两种:
    1、预定义指令:引用方式为<#指令名称>
    2、用户定义指令:引用方式为<@指令名称>,引用用户定义指令时须将#换为@。
    注意:如果使用不存在的指令,FreeMarker不会使用模板输出,而是产生一个错误消息。
    freemarker指令由FTL标记来引用,FTL标记和HTML标记类似,名字前加#来加 以区分。如HTML标记的形式为

    则FTL标记的形式是<#list>< /#list>(此处h1标记和list指令没有任何功能上的对应关系,只是做为说明使用一下)。
    有三种FTL标记:
    1)、开始标记:<#指令名称>
    2)、结束标记:</#指令名称>
    3)、空标记:<#指令名称/>
    注意:
    1) FTL会忽略标记之中的空格,但是,<#和指令 与 </#和指令 之间不能有空格。
    2) FTL标记不能够交叉,必须合理嵌套。每个开始标记对应一个结束标记,层层嵌套。 如:
    <#list>


  • ${数据}
    <#if 变量>
  • game over!


    </#if>

    </#list>
    注意事项:
    1)、FTL对大小写敏感。 所以使用的标记及interpolation要注意大小写。name与NAME就是不同的对象。<#list>是正确的标记,而<#List>则不是。
    2)、interpolation只能在文本部分使用,不能位于FTL标记内。如<#if ${var}>是错误的,正确的方法是:<#if var>,而且此处var必须为布尔值。
    3)、FTL标记不能位于另一个FTL标记内部,注释例外。注释可以位于标记及interpolation内部。

    kafka-manager同时支持从zk和从内部topic"__consumer_offsets"中读取offset信息

    kafka-manager同时支持从zk和从内部topic”__consumer_offsets”中读取offset信息

    kafka-manager

    kafka-manager 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lags 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息。

    在计算lags时需要需要获取 logsize 和消费者消费到的offset下标来(offsetZK.getOffset())来计算lags,
    lag = offsetZk.getOffset() == -1 ? 0 : logSize - offsetZk.getOffset()

    需要查询offset的方法

    • OffsetController

    kafka0.8升级0.10测试计划

    kafka0.8升级0.10测试计划

    升级测试 10.0.41.135~10.0.41.137 kafka集群测试机

    一、升级主流程

    1、拷贝0.8版本配置文件,修改添加0.10版本配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #修改项
    log.dirs=/opt/kafka_2.10-0.8.2.1/kafka-logs,/opt/kafka_2.10-0.10.1.0/kafka-logs

    #添加项
    auto.create.topics.enable=false
    num.replica.fetchers=3
    unclean.leader.election.enable=false
    inter.broker.protocol.version=0.8.2.0
    log.message.format.version=0.8.2.0

    2、逐一停止0.8版本kafka,启动0.10版本kafka

    1
    2
    3
    4
    5
    6
    7
    #停止该服务器上所有0.8kafka进程
    netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill –SIGTERM
    #启动0.10kafka
    /opt/kafka_2.10-0.10.1.0/start

    #可选命令
    /opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181(可选)

    3、修改配置,并在此逐一启动kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #修改0.10配置
    inter.broker.protocol.version=0.10.1.0

    #停止该服务器上所有0.10kafka进程
    netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -s TERM
    #启动0.10kafka
    /opt/kafka_2.10-0.10.1.0/start

    #可选命令
    /opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181(可选)

    二、测试

    topic准备

    • wangzhe:用于测试升级过程自动leader均衡和手动leader均衡
    • wangzhe_long_running :测试升级过程中topic生产消费是否出问题

    wangzhe

    测试0.8和0.10版本客户端对0.10版本kafka集群的生产消费许可状况

    分别测试未均衡前、自动均衡后和手动均衡后的leader、replicas和lsr

    自动均衡时间过久(大概要一天),只测试手动均衡和未均衡之前的状态

    wangzhe_long_running

    在测试开始前执行持续开启写入命令和持续开启消费命令,在测试过程中查看是否报错

    测试过程

    1、准备测试初试环境

    之前测试已经将0.10的kafka升级完成,将0.8和0.10配置均恢复为未升级前,将kafka集群停止,恢复到0.8kafka集群

    2、创建新topic

    1
    2
    3
    bin/kafka-topics.sh --create --zookeeper 10.0.41.135:2181 --replication-factor 3 --partitions 3 --topic wangzhe

    bin/kafka-topics.sh --create --zookeeper 10.0.41.135:2181 --replication-factor 3 --partitions 3 --topic wangzhe_long_running

    3、预置topic数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe
    kafka_08_1
    kafka_08_2
    kafka_08_3
    kafka_08_4
    kafka_08_5
    kafka_08_6
    kafka_08_7
    kafka_08_8
    kafka_08_9

    4、拷贝0.8版本配置并添加修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    server.properties
    #修改项
    log.dirs=/opt/kafka_2.10-0.8.2.1/kafka-logs,/opt/kafka_2.10-0.10.1.0/kafka-logs

    #添加项
    auto.create.topics.enable=false
    num.replica.fetchers=3
    unclean.leader.election.enable=false
    inter.broker.protocol.version=0.8.2.0
    log.message.format.version=0.8.2.0

    5、开启持续写入/消费

    在10.0.41.132测试机上开启持续写入和持续消费,测试wangzhe_long_running

    1
    2
    3
    4
    5
    #开启持续写入
    bin/kafka-producer-perf-test.sh --messages 1000000000 --message-size 5 --batch-size 10 --topics wangzhe_long_running --threads 1 --broker-list slave135:9092,slave136:9092,slave137:9092

    #开启持续消费
    bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe_long_running

    6、逐一停止0.8kafka节点并对应的启动0.10节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #停止0.8节点
    netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -SIGTERM

    #启动0.10节点
    /opt/kafka_2.10-0.10.1.0/start

    #查看topic状态
    bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

    #第二遍测试手动均衡前后topic状态
    #手动均衡
    /opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181

    bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

    7、修改配置,并再次逐一重启

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #修改0.10配置
    inter.broker.protocol.version=0.10.1.0

    #停止该服务器上所有0.10kafka进程
    netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -s TERM
    #启动0.10kafka
    /opt/kafka_2.10-0.10.1.0/start

    #第二遍测试手动均衡前后topic状态
    /opt/kafka_2.10-0.10.1.0/bin/kafka-preferred-replica-election.sh --zookeeper 10.0.41.135:2181

    bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

    9、测试0.8和0.10客户端生产消费0.10版本kafka集群

    测试0.8和0.10客户端生产消费0.10版本kafka集群
    生产者和消费者为10.0.41.132测试机,位于kafka集群之外

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #0.8的生产者
    /opt/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe

    #0.8的消费者
    /opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

    #0.10的消费者
    /opt/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

    #0.10的生产者
    /opt/kafka_2.10-0.10.1.0/bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe

    #0.8的消费者
    /opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

    #0.10的消费者
    /opt/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe --from-beginning

    至此,测试流程完成

    命令参考

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #启动kafka
    bin/kafka-server-start.sh config/server.properties
    /opt/kafka_2.10-0.10.1.0/start

    #创建topic
    bin/kafka-topics.sh --create --zookeeper 10.0.41.135:2181 --replication-factor 3 --partitions 3 --topic wangzhe

    #删除topic
    bin/kafka-topics.sh --delete --zookeeper 10.0.41.135:2181 --topic wangzhe

    #查看所有topic
    bin/kafka-topics.sh --list --zookeeper 10.0.41.135:2181

    #查看topic状态
    bin/kafka-topics.sh --describe --zookeeper 10.0.41.135:2181 --topic wangzhe

    #启动生产者(启动成功进入命令行阻塞状态,可以输入数据,回车发送)
    bin/kafka-console-producer.sh --broker-list 10.0.41.135:9092 --topic wangzhe_long_running

    #启动消费者(启动后命令行处于阻塞状态,生产者发布的消息会在此显示)
    bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –wangzhe_long_running

    #终止节点所有kafka进程
    netstat -anp | grep :9092 | grep LISTEN | grep -v 2.200 | awk '{print $7}' | awk -F'/' '{print $1}' | xargs kill -SIGTERM

    #持续开启写入
    bin/kafka-producer-perf-test.sh --messages 1000000000 --message-size 5 --batch-size 10 --topics wangzhe_long_running --threads 1 --broker-list slave135:9092,slave136:9092,slave137:9092

    #持续开启消费
    bin/kafka-console-consumer.sh --zookeeper 10.0.41.135:2181 --topic wangzhe_long_running --from-beginning

    kafka入坑指南

    kafka入坑指南

    Kafka是为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用,而Storm,Spark,Flink等大数据流处理或批处理平台都有Kafka的相关插件支持。本着为开源做贡献的原则,在学习Kafka的同时也参与了Kafka官方文档的翻译,Kafka的官网文档写的比较详细,学习Kafka只看官方文档就可以了。

    kafka中文文档
    kafka-manager

    kafka技术分享PPT