某些网站网速慢,百度法务部联系方式,网站开发项目简单描述,东莞轨道公司一、概述 Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统#xff0c;后成为Apache的一部分#xff0c;它使用Scala编写#xff0c;以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。 Kaf… 一、概述 Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统后成为Apache的一部分它使用Scala编写以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。 Kafka凭借着自身的优势越来越受到互联网企业的青睐唯品会也采用Kafka作为其内部核心消息引擎之一。Kafka作为一个商业级消息中间件消息可靠性的重要性可想而知。如何确保消息的精确传输?如何确保消息的准确存储?如何确保消息的正确消费?这些都是需要考虑的问题。本文首先从Kafka的架构着手先了解下Kafka的基本原理然后通过对kakfa的存储机制、复制原理、同步原理、可靠性和持久性保证等等一步步对其可靠性进行分析最后通过benchmark来增强对Kafka高可靠性的认知。 二、Kafka的使用场景 1日志收集一个公司可以用Kafka可以收集各种服务的log通过kafka以统一接口服务的方式开放给各种consumer例如Hadoop、Hbase、Solr等 2消息系统解耦和生产者和消费者、缓存消息等 3用户活动跟踪Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、搜索、点击等活动这些活动信息被各个服务器发布到kafka的topic中然后订阅者通过订阅这些topic来做实时的监控分析或者装载到Hadoop、数据仓库中做离线分析和挖掘 4运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告 5流式处理比如spark streaming和storm 6事件源 三、Kafka基本架构 如上图所示一个典型的Kafka体系架构包括 若干Producer(可以是服务器日志业务数据页面前端产生的page view等等) 若干broker(Kafka支持水平扩展一般broker数量越多集群吞吐率越高) 若干Consumer (Group)以及一个Zookeeper集群。 Kafka通过Zookeeper管理集群配置选举leader以及在consumer group发生变化时进行rebalance。Producer使用push(推)模式将消息发布到brokerConsumer使用pull(拉)模式从broker订阅并消费消息。 1、Topic Partition 一个topic可以认为一个一类消息每个topic将被分成多个partition每个partition在存储层面是append log文件。任何发布到此partition的消息都会被追加到log文件的尾部每条消息在文件中的位置称为offset(偏移量)offset为一个long型的数字它唯一标记一条消息。每条消息都被append到partition中是顺序写磁盘因此效率非常高(经验证顺序写磁盘效率比随机写内存还要高这是Kafka高吞吐率的一个很重要的保证)。 每一条消息被发送到broker中会根据partition规则选择被存储到哪一个partition。partition机制可以通过指定producer的partition.class这一参数来指定该class必须实现kafka.producer.Partitioner接口。如果partition规则设置的合理所有消息可以均匀分布到不同的partition里这样就实现了水平扩展。(如果一个topic对应一个文件那这个文件所在的机器I/O将会成为这个topic的性能瓶颈而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示)当然可以在topic创建之后去修改partition的数量。 # The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#默认partitions数量
num.partitions1 四、高可靠性存储分析概述 Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关参数可以使得Kafka在性能和可靠性之间运转的游刃有余。Kafka从0.8.x版本开始提供partition级别的复制,replication的数量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。 这里先从Kafka文件存储机制入手从最底层了解Kafka的存储细节进而对其的存储有个微观的认知。之后通过Kafka复制原理和同步方式来阐述宏观层面的概念。最后从ISRHWleader选举以及数据可靠性和持久性保证等等各个维度来丰富对Kafka相关知识点的认知。 五、Kafka文件存储机制 Kafka中消息是以topic进行分类的生产者通过topic向Kafka broker发送消息消费者通过topic读取数据。然而topic在物理层面又能以partition为分组一个topic可以分成若干个partition那么topic以及partition又是怎么存储的呢?partition还可以细分为segment一个partition物理上由多个segment组成那么这些segment又是什么呢?下面我们来一一揭晓。 为了便于说明问题假设这里只有一个Kafka集群且这个集群只有一个Kafka broker即只有一台物理机。在这个Kafka broker中配置( 中 以 此 来 设 置 消 息 文 件 存 储 目 录 与 此 同 时 创 建 一 个 的 数 量 为 KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_vms_test –replication-factor 4)。那么我们此时可以在/tmp/kafka-logs目录中可以看到生成了4个目录 drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3 在Kafka文件存储中同一个topic下有多个不同的partition每个partiton为一个目录partition的名称规则为topic名称有序序号第一个序号从0开始计最大的序号为partition数量减1partition是实际物理上的概念而topic是逻辑上的概念。 上面提到partition还可以细分为segment这个segment又是什么?如果就以partition为最小存储单位我们可以想象当Kafka producer不断发送消息必然会引起partition文件的无限扩张这样对于消息文件的维护以及已经被消费的消息的清理带来严重的影响所以这里以segment为单位又将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)这种特性也方便old segment的删除即方便已被消费的消息的清理提高磁盘的利用率。每个partition只需要支持顺序读写就行segment的文件生命周期由服务端配置参数(log.segment.byteslog.roll.{ms,hours}等若干参数)决定。 #在强制刷新数据到磁盘允许接收消息的数量
#log.flush.interval.messages10000 # 在强制刷新之前消息可以在日志中停留的最长时间
#log.flush.interval.ms1000 #一个日志的最小存活时间可以被删除
log.retention.hours168 # 一个基于大小的日志保留策略。段将被从日志中删除只要剩下的部分段不低于log.retention.bytes。
#log.retention.bytes1073741824 # 每一个日志段大小的最大值。当到达这个大小时会生成一个新的片段。
log.segment.bytes1073741824 # 检查日志段的时间间隔看是否可以根据保留策略删除它们
log.retention.check.interval.ms300000 segment文件由两部分组成分别为“.index”文件和“.log”文件分别表示为segment索引文件和数据文件。这两个文件的命令规则为partition全局的第一个segment从0开始后续每个segment文件名为上一个segment文件最后一条消息的offset值数值大小为64位20位数字字符长度没有数字用0填充如下 00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index 00000000000000239430.log 以上面的segment文件为例展示出segment00000000000000170410的“.index”文件和“.log”文件的对应的关系如下图 如上图“.index”索引文件存储大量的元数据“.log”数据文件存储大量的消息索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例在“.log”数据文件表示第3个消息即在全局partition中表示1704103170413个消息该消息的物理偏移地址为348。 那么如何从partition中通过offset查找message呢? 以上图为例读取offset170418的消息首先查找segment文件其中00000000000000000000.index为最开始的文件第二个文件为00000000000000170410.index(起始偏移为1704101170411)而第三个文件为00000000000000239430.index(起始偏移为2394301239431)所以这个offset170418就落到了第二个文件之中。其他后续文件可以依次类推以其实偏移量命名并排列这些文件然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。 要是读取offset170418的消息从00000000000000170410.log文件中的1325的位置进行读取那么怎么知道何时读完本条消息否则就读到下一条消息的内容了? 这个就需要联系到消息的物理结构了消息都具有固定的物理结构包括offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段可以确定一条消息的大小即读取到哪里截止。 六、复制原理和同步方式 Kafka中topic的每个partition有一个预写式的日志文件虽然partition可以继续细分为若干个segment文件但是对于上层应用来说可以将partition看成最小的存储单元(一个有多个segment文件拼接的“巨型”文件)每个partition都由一些列有序的、不可变的消息组成这些消息被连续的追加到partition中。 上图中有两个新名词HW和LEO。这里先介绍下LEOLogEndOffset的缩写表示每个partition的log最后一条Message的位置。HW是HighWatermark的缩写是指consumer能够看到的此partition的位置这个涉及到多副本的概念这里先提及一下下节再详表。 言归正传为了提高消息的可靠性Kafka每个topic的partition有N个副本(replicas)其中N(大于等于1)是topic的复制因子(replica fator)的个数。Kafka通过多副本机制实现故障自动转移当Kafka集群中一个broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的日志能有序地写到其他节点上N个replicas中其中一个replica为leader其他都为follower, leader处理partition的所有读写请求与此同时follower会被动定期地去复制leader上的数据。 如下图所示Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3 Kafka提供了数据复制算法保证如果leader发生故障或挂掉一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写表示副本同步队列具体可参考下节)中所有follower滞后的状态。当producer发送一条消息到broker后leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制重要的是快速检测慢副本如果follower“落后”太多或者失效leader将会把它从ISR中删除。 七、零拷贝 Kafka中存在大量的网络数据持久化到磁盘Producer到Broker和磁盘文件通过网络发送Broker到Consumer的过程。这一过程的性能直接影响Kafka的整体吞吐量。 1、传统模式下的四次拷贝与四次上下文切换 以将磁盘文件通过网络发送为例。传统模式下一般使用如下伪代码所示的方法先将文件数据读入内存然后通过Socket将内存中的数据发送出去。 buffer File.read
Socket.send(buffer) 这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态BufferDMA拷贝然后应用程序将内存态Buffer数据读入到用户态BufferCPU拷贝接着用户程序通过Socket发送数据时将用户态Buffer数据拷贝到内核态BufferCPU拷贝最后通过DMA拷贝将数据拷贝到NIC Buffer。同时还伴随着四次上下文切换如下图所示。 2、sendfile和transferTo实现零拷贝 Linux 2.4内核通过sendfile系统调用提供了零拷贝。数据通过DMA拷贝到内核态Buffer后直接通过DMA拷贝到NIC Buffer无需CPU拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外因为整个读文件-网络发送由一个sendfile调用完成整个过程只有两次上下文切换因此大大提高了性能。零拷贝过程如下图所示。 从具体实现来看Kafka的数据传输通过TransportLayer来完成其子类PlaintextTransportLayer通过Java NIO的FileChannel的transferTo和transferFrom方法实现零拷贝如下所示。 Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, socketChannel);
} 注 transferTo和transferFrom并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关如果操作系统提供sendfile这样的零拷贝系统调用则这两个方法会通过这样的系统调用充分利用零拷贝的优势否则并不能通过这两个方法本身实现零拷贝。 八、 ISR副本同步队列 上节我们涉及到ISR (In-Sync Replicas)这个是指副本同步队列。副本数对Kafka的吞吐率是有一定的影响但极大的增强了可用性。默认情况下Kafka的replica数量为1即每个partition都有一个唯一的leader为了确保消息的可靠性通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1比如3。 所有的副本(replicas)统称为Assigned Replicas即AR。ISR是AR中的一个子集由leader维护ISR列表follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度)任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表新加入的follower也会先存放在OSR中。ARISROSR Kafka 0.10.x版本后移除了replica.lag.max.messages参数只保留了replica.lag.time.max.ms作为ISR中副本管理的参数。为什么这样做呢?replica.lag.max.messages表示当前某个副本落后leaeder的消息数量超过了这个参数的值那么leader就会把follower从ISR中删除。假设设置replica.lag.max.messages4那么如果producer一次传送至broker的消息数量都小于4条时因为在leader接受到producer发送的消息之后而follower副本开始拉取这些消息之前follower落后leader的消息数不会超过4条消息故此没有follower移出ISR所以这时候replica.lag.max.message的设置似乎是合理的。但是producer发起瞬时高峰流量producer一次发送的消息超过4条时也就是超过replica.lag.max.messages此时follower都会被认为是与leader副本不同步了从而被踢出了ISR。但实际上这些follower都是存活状态的且没有性能问题。那么在之后追上leader,并被重新加入了ISR。于是就会出现它们不断地剔出ISR然后重新回归ISR这无疑增加了无谓的性能损耗。而且这个参数是broker全局的。设置太大了影响真正“落后”follower的移除;设置的太小了导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值故此新版本的Kafka移除了这个参数。注ISR中包括leader和follower。 上面一节还涉及到一个概念即HW。HW俗称高水位HighWatermark的缩写取一个partition对应的ISR中最小的LEO作为HWconsumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息consumer不能立刻消费leader会等待该消息被所有ISR中的replicas同步后更新HW此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求没有HW的限制。 下图详细的说明了当producer生产消息至broker后ISR以及HW和LEO的流转过程 由此可见Kafka的复制机制既不是完全的同步复制也不是单纯的异步复制。事实上同步复制要求所有能工作的follower都复制完这条消息才会被commit这种复制方式极大的影响了吞吐率。而异步复制方式下follower异步的从leader复制数据数据只要被leader写入log就被认为已经commit这种情况下如果follower都还没有复制完落后于leader时突然leader宕机则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。 Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为/brokers/topics/[topic]/partitions/[partition]/state。 目前有两个地方会对这个Zookeeper的节点进行维护 Controller来维护Kafka集群中的其中一个Broker会被选举为Controller主要负责Partition管理和副本状态管理也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下Controller下的LeaderSelector会选举新的leaderISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。 leader来维护leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化则会将新的ISR的信息返回到Zookeeper的相关节点中。 九、数据可靠性和持久性保证 当producer向leader发送数据时可以通过request.required.acks参数来设置数据可靠性的级别 1(默认)这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了则会丢失数据。 0这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高但是数据可靠性确是最低的。 -1producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成可靠性最高。但是这样也不能保证数据不丢失比如当ISR中只有leader时(前面ISR那一节讲到ISR中的成员由于某些情况会增加也会减少最少就只剩一个leader)这样就变成了acks1的情况。 官网配置说明 如果要提高数据的可靠性在设置request.required.acks-1的同时也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合这样才能发挥最大的功效。min.insync.replicas这个参数设定ISR中的最小副本数是多少默认值为1当且仅当request.required.acks参数设置为-1时此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时客户端会返回异常org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。 接下来对acks1和-1的两种情况进行详细分析 9.1、request.required.acks1 producer发送数据到leaderleader写本地日志成功返回客户端成功;此时ISR中的副本还没有来得及拉取该消息leader就宕机了那么此次发送的消息就会丢失。 9.2、request.required.acks-1 同步(Kafka默认为同步即producer.typesync)的发送模式replication.factor2且min.insync.replicas2的情况下不会丢失数据。 有两种典型情况。acks-1的情况下(如无特殊说明以下acks都表示为参数request.required.acks)数据发送到leader, ISR的follower全部完成数据同步后leader此时挂掉那么会选举出新的leader数据不会丢失。 acks-1的情况下数据发送到leader后 部分ISR的副本同步leader此时挂掉。比如follower1h和follower2都有可能变成新的leader, producer端会得到返回异常producer端会重新发送数据数据可能会重复 当然上图中如果在leader crash的时候follower2还没有同步到任何数据而且follower2被选举为新的leader的话这样消息就不会重复。 注Kafka只处理fail/recover问题,不处理Byzantine问题。 9.3、关于HW的进一步探讨 考虑上图(即acks-1,部分ISR副本同步)中的另一种情况如果在Leader挂掉的时候follower1同步了消息4,5follower2同步了消息4与此同时follower2被选举为leader那么此时follower1中的多出的消息5该做如何处理呢? 这里就需要HW的协同配合了。如前所述一个partition中的ISR列表中leader的HW是所有ISR列表里副本中最小的那个的LEO。类似于木桶原理水位取决于最低那块短板。 如上图某个topic的某partition有三个副本分别为A、B、C。A作为leader肯定是LEO最高B紧随其后C机器由于配置比较低网络比较差故而同步最慢。这个时候A机器宕机这时候如果B成为leader假如没有HW在A重新恢复之后会做同步(makeFollower)操作在宕机时log文件之后直接做追加操作而假如B的LEO已经达到了A的LEO会产生数据不一致的情况所以使用HW来避免这种情况。 A在做同步操作的时候先将log文件截断到之前自己的HW的位置即3之后再从B中拉取消息进行同步。 如果失败的follower恢复过来它首先将自己的log文件截断到上次checkpointed时刻的HW的位置之后再从leader中同步消息。leader挂掉会重新选举新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。 当ISR中的个副本的LEO不一致时如果此时leader挂掉选举新的leader时并不是按照LEO的高低进行选举而是按照ISR中的顺序选举。 9.4、Leader选举 一条消息只有被ISR中的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader还没来得及被任何follower复制就宕机了而造成数据丢失。而对于producer而言它可以选择是否等待消息commit这可以通过request.required.acks来设置。这种机制确保了只要ISR中有一个或者以上的follower一条被commit的消息就不会丢失。 有一个很重要的问题是当leader宕机了怎样在follower中选举出新的leader因为follower可能落后很多或者直接crash了所以必须确保选择“最新”的follower作为新的leader。一个基本的原则就是如果leader不在了新的leader必须拥有原来的leader commit的所有消息。这就需要做一个折中如果leader在表名一个消息被commit前等待更多的follower确认那么在它挂掉之后就有更多的follower可以成为新的leader但这也会造成吞吐率的下降。 一种非常常用的选举leader的方式是“少数服从多数”Kafka并不是采用这种方式。这种模式下如果我们有2f1个副本那么在commit之前必须保证有f1个replica复制完消息同时为了保证能正确选举出新的leader失败的副本数不能超过f个。这种方式有个很大的优势系统的延迟取决于最快的几台机器也就是说比如副本数为3那么延迟就取决于最快的那个follower而不是最慢的那个。“少数服从多数”的方式也有一些劣势为了保证leader选举的正常进行它所能容忍的失败的follower数比较少如果要容忍1个follower挂掉那么至少要3个以上的副本如果要容忍2个follower挂掉必须要有5个以上的副本。也就是说在生产环境下为了保证较高的容错率必须要有大量的副本而大量的副本又会在大数据量下导致性能的急剧下降。这种算法更多用在Zookeeper这种共享集群配置的系统中而很少在需要大量数据的系统中使用的原因。HDFS的HA功能也是基于“少数服从多数”的方式但是其数据存储并不是采用这样的方式。 实际上leader选举的算法非常多比如Zookeeper的Zab、Raft以及Viewstamped Replication。而Kafka所使用的leader选举算法更像是微软的PacificA算法。 Kafka在Zookeeper中为每一个partition动态的维护了一个ISR这个ISR里的所有replication都跟上了leader只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enablefalse)。在这种模式下对于f1个副本一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败在大多数使用场景下这种模式是十分有利的。事实上为了容忍f个副本的失败“少数服从多数”的方式和ISR在commit前需要等待的副本的数量是一样的但是ISR需要的总的副本的个数几乎是“少数服从多数”的方式的一半。 上文提到在ISR中至少有一个follower时Kafka可以确保已经commit的数据不丢失但如果某一个partition的所有replica都挂了就无法保证数据不丢失了。这种情况下有两种可行的方案 等待ISR中任意一个replica“活”过来并且选它作为leader 选择第一个“活”过来的replica(并不一定是在ISR中)作为leader 这就需要在可用性和一致性当中作出一个简单的抉择。如果一定要等待ISR中的replica“活”过来那不可用的时间就可能会相对较长。而且如果ISR中所有的replica都无法“活”过来了或者数据丢失了这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保障已经包含了所有已commit的消息它也会成为leader而作为consumer的数据源。默认情况下Kafka采用第二种策略即 unclean.leader.election.enabletrue也可以将此参数设置为false来启用第一种策略。 unclean.leader.election.enable这个参数对于leader的选举、系统的可用性以及数据的可靠性都有至关重要的影响。 下面我们来分析下几种典型的场景。 如果上图所示假设某个partition中的副本数为3replica-0, replica-1, replica-2分别存放在broker0, broker1和broker2中。AR(0,1,2)ISR(0,1)。 设置request.required.acks-1, min.insync.replicas2unclean.leader.election.enablefalse。这里讲broker0中的副本也称之为broker0起初broker0为leaderbroker1为follower。 当ISR中的replica-0出现crash的情况时broker1选举为新的leader[ISR(1)]因为受min.insync.replicas2影响write不能服务但是read能继续正常服务。此种情况恢复方案 尝试恢复(重启)replica-0如果能起来系统正常; 如果replica-0不能恢复需要将min.insync.replicas设置为1恢复write功能。 当ISR中的replica-0出现crash紧接着replica-1也出现了crash, 此时[ISR(1),leader-1],不能对外提供服务此种情况恢复方案 尝试恢复replica-0和replica-1如果都能起来则系统恢复正常; 如果replica-0起来而replica-1不能起来这时候仍然不能选出leader因为当设置unclean.leader.election.enablefalse时leader只能从ISR中选举当ISR中所有副本都失效之后需要ISR中最后失效的那个副本能恢复之后才能选举leader, 即replica-0先失效replica-1后失效需要replica-1恢复后才能选举leader。保守的方案建议把unclean.leader.election.enable设置为true,但是这样会有丢失数据的情况发生这样可以恢复read服务。同样需要将min.insync.replicas设置为1恢复write功能;replica-1恢复replica-0不能恢复这个情况上面遇到过read服务可用需要将min.insync.replicas设置为1恢复write功能; replica-0和replica-1都不能恢复这种情况可以参考情形2. 当ISR中的replica-0, replica-1同时宕机,此时[ISR(0,1)],不能对外提供服务此种情况恢复方案尝试恢复replica-0和replica-1当其中任意一个副本恢复正常时对外可以提供read服务。直到2个副本恢复正常write功能才能恢复或者将将min.insync.replicas设置为1。 9.5、Kafka的发送模式 Kafka的发送模式由producer端的配置参数producer.type来设置这个参数指定了在后台线程中消息的发送方式是同步的还是异步的默认是同步的方式即producer.typesync。如果设置成异步的模式即producer.typeasync可以是producer以batch的形式push数据这样会极大的提高broker的性能但是这样会增加丢失数据的风险。如果需要确保消息的可靠性必须要将producer.type设置为sync。 对于异步模式还有4个配套的参数如下 以batch的方式推送数据可以极大的提高处理效率kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小可以减少网络请求和磁盘IO的次数当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。 十、高可靠性使用分析 10.1、消息传输保障 前面已经介绍了Kafka如何进行有效的存储以及了解了producer和consumer如何工作。接下来讨论的是Kafka如何确保消息在producer和consumer之间传输。有以下三种可能的传输保障(delivery guarantee): At most once: 消息可能会丢但绝不会重复传输 At least once消息绝不会丢但可能会重复传输 Exactly once每条消息肯定会被传输一次且仅传输一次 Kafka的消息传输保障机制非常直观。当producer向broker发送消息时一旦这条消息被commit由于副本机制(replication)的存在它就不会丢失。但是如果producer发送数据给broker后遇到的网络问题而造成通信中断那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么但是producer可以retry多次确保消息已经正确传输到broker中所以目前Kafka实现的是at least once。 consumer从broker中读取消息后可以选择commit该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然也可以将consumer设置为autocommit即consumer一旦读取到数据立即自动commit。如果只讨论这一读取消息的过程那Kafka是确保了exactly once, 但是如果由于前面producer与broker之间的某种原因导致消息的重复那么这里就是at least once。 考虑这样一种情况当consumer读完消息之后先commit再处理消息在这种模式下如果consumer在commit后还没来得及处理消息就crash了下次重新开始工作后就无法读到刚刚已提交而未处理的消息这就对应于at most once了。 读完消息先处理再commit。这种模式下如果处理完了消息在commit之前consumer crash了下次重新开始工作时还会处理刚刚未commit的消息实际上该消息已经被处理过了这就对应于at least once。 要做到exactly once就需要引入消息去重机制。 10.2、消息去重 如上一节所述Kafka在producer端和consumer端都会出现消息的重复这就需要去重处理。 Kafka文档中提及GUID(Globally Unique Identifier)的概念通过客户端生成算法得到每个消息的unique id同时可映射至broker上存储的地址即通过GUID便可查询提取消息内容也便于发送方的幂等性保证需要在broker上提供此去重处理模块最新版本已经支持。 针对GUID, 如果从客户端的角度去重那么需要引入集中式缓存必然会增加依赖复杂度另外缓存的大小难以界定。 不只是Kafka, 类似RabbitMQ以及RocketMQ这类商业级中间件也只保障at least once, 且也无法从自身去进行消息去重。所以我们建议业务方根据自身的业务特点进行去重比如业务消息本身具备幂等性或者借助Redis等其他产品进行去重处理。 10.3、高可靠性配置 Kafka提供了很高的数据冗余弹性对于需要数据高可靠性的场景我们可以增加数据冗余备份数(replication.factor)调高最小写入副本数的个数(min.insync.replicas)等等但是这样会影响性能。反之性能提高而可靠性则降低用户需要自身业务特性在彼此之间做一些权衡性选择。 要保证数据写入到Kafka是安全的高可靠的需要如下的配置 topic的配置replication.factor3,即副本数至少是3个;2min.insync.replicasreplication.factor broker的配置leader的选举条件unclean.leader.election.enablefalse producer的配置request.required.acks-1(all)producer.typesync 十一、内部网络框架 Broker的内部处理流水线化分为多个阶段来进行(SEDA)以提高吞吐量和性能尽量避免Thead盲等待以下为过程说明。 Accept Thread负责与客户端建立连接链路然后把Socket轮转交给Process Thread Process Thread负责接收请求和响应数据Process Thread每次基于Selector事件循环首先从Response Queue读取响应数据向客户端回复响应然后接收到客户端请求后读取数据放入Request Queue。 Work Thread负责业务逻辑、IO磁盘处理等负责从Request Queue读取请求并把处理结果放入Response Queue中待Process Thread发送出去。 十二、rebalance机制 Kafka保证同一consumer group中只有一个consumer会消费某条消息实际上Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定的数据而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据优势是每个consumer不用都跟大量的broker通信减少通信开销同时也降低了分配难度实现也更简单。另外因为同一个partition里的数据是有序的这种设计可以保证每个partition里的数据也是有序被消费。 如果某consumer group中consumer数量少于partition数量则至少有一个consumer会消费多个partition的数据如果consumer的数量与partition数量相同则正好一个consumer消费一个partition的数据而如果consumer的数量多于partition的数量时会有部分consumer无法消费该topic下任何一条消息。 Consumer Rebalance算法如下 1. 将目标 topic 下的所有 partirtion 排序存于PT 2. 对某 consumer group 下所有 consumer 排序存于 CG第 i 个consumer 记为 Ci 3. Nsize(PT)/size(CG)向上取整 4. 解除 Ci 对原来分配的 partition 的消费权i从0开始 5. 将第i*N到i1*N-1个 partition 分配给 Ci 目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下 在/consumers/[consumer-group]/下注册id
设置对/consumers/[consumer-group] 的watcher
设置对brokers/ids的watcher
zk下设置watcher的路径节点更改触发consumer rebalance 在这种策略下每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition为了保证整个consumer group的一致性所以当一个consumer触发了rebalance时该consumer group内的其它所有consumer也应该同时触发rebalance。 Herd effect 任何broker或者consumer的增减都会触发所有的consumer的rebalance Split Brain 每个consumer分别单独通过Zookeeper判断哪些partition down了那么不同consumer从Zookeeper“看”到的view就可能不一样这就会造成错误的reblance尝试。而且有可能所有的consumer都认为rebalance已经完成了但实际上可能并非如此。 十三、BenchMark Kafka在唯品会有着很深的历史渊源根据唯品会消息中间件团队(VMS团队)所掌握的资料显示在VMS团队运转的Kafka集群中所支撑的topic数已接近2000每天的请求量也已达千亿级。这里就以Kafka的高可靠性为基准点来探究几种不同场景下的行为表现以此来加深对Kafka的认知为大家在以后高效的使用Kafka时提供一份依据。 13.1、测试环境 Kafka broker用到了4台机器分别为broker[0/1/2/3]配置如下 CPU: 24core/2.6GHZ Memory: 62G Network: 4000Mb OS/kernel: CentOs release 6.6 (Final) Disk: 1089G Kafka版本0.10.1.0 broker端JVM参数设置 -Xmx8G -Xms8G -server -XX:UseParNewGC -XX:UseConcMarkSweepGC -XX:CMSClassUnloadingEnabled -XX:CMSScavengeBeforeRemark -XX:DisableExplicitGC -Djava.awt.headlesstrue -Xloggc:/apps/service/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:PrintGCDetails -XX:PrintGCDateStamps -XX:PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Dcom.sun.management.jmxremote.port9999 客户端机器配置 CPU: 24core/2.6GHZ Memory: 3G Network: 1000Mb OS/kernel: CentOs release 6.3 (Final) Disk: 240G 13.2、不同场景测试 场景1 测试不同的副本数、min.insync.replicas策略以及request.required.acks策略(以下简称acks策略)对于发送速度(TPS)的影响。 具体配置一个producer;发送方式为sync;消息体大小为1kB;partition数为12。副本数为1/2/4;min.insync.replicas分别为1/2/4;acks分别为-1(all)/1/0。 具体测试数据如下表(min.insync.replicas只在acks-1时有效) 测试结果分析 客户端的acks策略对发送的TPS有较大的影响TPSacks_0 acks_1 ack_-1; 副本数越高TPS越低;副本数一致时min.insync.replicas不影响TPS; acks0/1时TPS与min.insync.replicas参数以及副本数无关仅受acks策略的影响。 下面将partition的个数设置为1来进一步确认下不同的acks策略、不同的min.insync.replicas策略以及不同的副本数对于发送速度的影响详细请看情景2和情景3。 场景2 在partition个数固定为1测试不同的副本数和min.insync.replicas策略对发送速度的影响。 具体配置一个producer;发送方式为sync;消息体大小为1kB;producer端acks-1(all)。变换副本数2/3/4; min.insync.replicas设置为1/2/4。 测试结果如下 测试结果分析 副本数越高TPS越低(这点与场景1的测试结论吻合)但是当partition数为1时差距甚微。min.insync.replicas不影响TPS。 场景3 在partition个数固定为1测试不同的acks策略和副本数对发送速度的影响。 具体配置一个producer;发送方式为sync;消息体大小为1kB;min.insync.replicas1。topic副本数为1/2/4;acks 0/1/-1。 测试结果如下 测试结果分析(与情景1一致) 副本数越多TPS越低; 客户端的acks策略对发送的TPS有较大的影响TPSacks_0 acks_1 ack_-1。 场景4 测试不同partition数对发送速率的影响 具体配置一个producer;消息体大小为1KB;发送方式为sync;topic副本数为2;min.insync.replicas2;acks-1。partition数量设置为1/2/4/8/12。 测试结果分析 partition的不同会影响TPS随着partition的个数的增长TPS会有所增长但并不是一直成正比关系到达一定临界值时partition数量的增加反而会使TPS略微降低。 场景5 通过将集群中部分broker设置成不可服务状态测试对客户端以及消息落盘的影响。 具体配置一个producer;消息体大小1KB;发送方式为sync;topic副本数为4;min.insync.replicas设置为2;acks-1;retries0/100000000;partition数为12。 测试结果分析 kill两台broker后客户端可以继续发送。broker减少后partition的leader分布在剩余的两台broker上造成了TPS的减小; kill三台broker后客户端无法继续发送。Kafka的自动重试功能开始起作用当大于等于min.insync.replicas数量的broker恢复后可以继续发送; 当retries不为0时消息有重复落盘;客户端成功返回的消息都成功落盘异常时部分消息可以落盘。 场景6 测试单个producer的发送延迟以及端到端的延迟。 具体配置一个producer;消息体大小1KB;发送方式为sync;topic副本数为4;min.insync.replicas设置为2;acks-1;partition数为12。 测试数据及结果(单位为ms) 各场景测试总结 当acks-1时Kafka发送端的TPS受限于topic的副本数量(ISR中)副本越多TPS越低; acks0时TPS最高其次为1最差为-1即TPSacks_0 acks_1 ack_-1 min.insync.replicas参数不影响TPS; partition的不同会影响TPS随着partition的个数的增长TPS会有所增长但并不是一直成正比关系到达一定临界值时partition数量的增加反而会使TPS略微降低; Kafka在acks-1,min.insync.replicas1时具有高可靠性所有成功返回的消息都可以落盘。 选择下载器: 下载此0条链接 当前处于选择模式 点击/拖拽 鼠标左键 选择链接 点击/拖拽 鼠标右键 取消选择链接 按住 ALT键 点击/拖拽 鼠标左键 取消选择链接 本文由 mdnice 多平台发布