博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka服务端源代码分析之Controller选举
阅读量:2429 次
发布时间:2019-05-10

本文共 12645 字,大约阅读时间需要 42 分钟。

1. controller 选举

每个kafka集群里的controller在某一个时刻只能由一个Broker担任,这个Broker是由集群里的所有Broker选举出来的, 随着时间的推移,Controller可能易主。

选举原理:

在这里插入图片描述
选举时,每个Broker都尝试向zookeeper写入/controller,但只能有一个Broker成功,这个Broker节点就是Controller所在的节点,同时每个Broker会向/controller节点注册监听器,这样当原来Controller所在的Broker挂掉后其余Broker就能感知到,以便触发新的选举流程。

选举时机:

  1. 集群启动
  2. /controller 节点消失
  3. /controller节点发生变更

Controller的主要代码在KafkaController.scala文件中,首先看下KafkaController类的重要字段。

class KafkaController(val config: KafkaConfig, // Kafka配置信息                      zkClient: KafkaZkClient, // zk客户端,封装了对zookeeper的所有操作                      time: Time, // 时间工具类                      metrics: Metrics, // 实现指标监控服务(如创建监控指标)的工具类                      initialBrokerInfo: BrokerInfo, // Broker节点信息                      initialBrokerEpoch: Long, // Broker Epoch值,用于隔离老Controller发送的请求                      tokenManager: DelegationTokenManager, // 实现Delegation token管理的工具类                      threadNamePrefix: Option[String] = None // Controller事件处理线程名字前缀                     ) extends Logging with KafkaMetricsGroup {
// 集群元数据类,保存集群所有元数据 val controllerContext = new ControllerContext // 线程调度器,当前唯一负责定期执行Leader重选举 private[controller] val kafkaScheduler = new KafkaScheduler(1) // Controller事件管理器,负责管理事件处理线程 private[controller] val eventManager = new ControllerEventManager(config.brokerId, controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign()) // topic删除管理器 val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger) // 副本状态机,负责副本状态转换 val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) // 分区状态机,负责分区状态转换 val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) partitionStateMachine.setTopicDeletionManager(topicDeletionManager) // Controller节点ZooKeeper监听器 private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager) // Broker数量ZooKeeper监听器 private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager) // Broker信息变更ZooKeeper监听器集合 private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty // 主题数量ZooKeeper监听器 private val topicChangeHandler = new TopicChangeHandler(this, eventManager) // 主题删除ZooKeeper监听器 private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager) // 主题分区变更ZooKeeper监听器 private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty // 主题分区重分配ZooKeeper监听器 private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager) // Preferred Leader选举ZooKeeper监听器 private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager) // ISR副本集合变更ZooKeeper监听器 private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager) // 日志路径变更ZooKeeper监听器 private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager) // 当前Controller所在Broker Id @volatile private var activeControllerId = -1 // 离线分区总数 @volatile private var offlinePartitionCount = 0 // 满足Preferred Leader选举条件的总分区数 @volatile private var preferredReplicaImbalanceCount = 0 // 总主题数 @volatile private var globalTopicCount = 0 // 总分区数 @volatile private var globalPartitionCount = 0...}

KafkaController的启动在KafkaServer的startup方法中完成的。

// KafkaServer.scalakafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)kafkaController.startup()
def startup() = {
// registerStateChangeHandler用于session过期后触发重新选举 zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect) } override def beforeInitializingSession(): Unit = {
val expireEvent = new Expire eventManager.clearAndPut(expireEvent) // 阻塞等待时间被处理结束,session过期触发重新选举,必须等待选举这个时间完成Controller才能正常工作 expireEvent.waitUntilProcessingStarted() } }) // 将Startup放入eventManager eventManager.put(Startup) // 启动eventManager后台线程开始选举 eventManager.start() }

KafkaController启动时会向eventManager中发送Startup事件消息,Startup消息的具体处理逻辑在Startup的process方法中。

case object Startup extends ControllerEvent {
def state = ControllerState.ControllerChange override def process(): Unit = {
// 注册ControllerChangeHandler ZooKeeper监听器 zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) // 选举controller elect() } }

process 方法中会调用elect方法。elect的主要流程如下:

在这里插入图片描述

private def elect(): Unit = {
// 获取当前Controller所在Broker的序号,如果Controller不存在,显式标记为-1 activeControllerId = zkClient.getControllerId.getOrElse(-1) // 如果当前Controller已经选出来了,直接返回即可 if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.") return } try {
// 创建临时节点,让本节点参与选举 val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId) controllerContext.epoch = epoch controllerContext.epochZkVersion = epochZkVersion activeControllerId = config.brokerId info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " + s"and epoch zk version is now ${controllerContext.epochZkVersion}") // 执行当选Controller的后续逻辑 onControllerFailover() } catch {
case e: ControllerMovedException => maybeResign() if (activeControllerId != -1) debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e) else warn("A controller has been elected but just resigned, this will result in another round of election", e) case t: Throwable => error(s"Error while electing or becoming controller on broker ${config.brokerId}. " + s"Trigger controller movement immediately", t) triggerControllerMove() } }

elect方法会尝试在zookeeper中创建/controller临时节点,因此只有Controller所在的节点才会执行onControllerFailover方法,其他节点都会进入异常处理。

在这里插入图片描述

private def onControllerFailover() {
info("Registering handlers") // 注册各种监听器 val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler) childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence) info("Deleting log dir event notifications") zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion) info("Deleting isr change notifications") zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion) info("Initializing controller context") initializeControllerContext() info("Fetching topic deletions in progress") val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() info("Initializing topic deletion manager") // 初始化topic删除管理器 topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion) info("Sending update metadata request") // 发送更新集群元数据请求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) // 启动副本状态机 replicaStateMachine.startup() // 启动分区状态机 partitionStateMachine.startup() info(s"Ready to serve as the new controller with epoch $epoch") maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet) topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered) info("Starting the controller scheduler") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) } if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler") tokenCleanScheduler.startup() tokenCleanScheduler.schedule(name = "delete-expired-tokens", fun = () => tokenManager.expireTokens, period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS) } }

/controller节点变更会触发重新选举,具体方法为maybeResign。

private def maybeResign(): Unit = {
// 判断Controller是否发生变更 val wasActiveBeforeChange = isActive // 注册ControllerChangeHandler监听器 zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) activeControllerId = zkClient.getControllerId.getOrElse(-1) if (wasActiveBeforeChange && !isActive) {
// 执行卸任逻辑 onControllerResignation() } }

onControllerResignation复杂执行卸任逻辑。

private def onControllerResignation() {
debug("Resigning") // de-register listeners // 取消ZooKeeper监听器的注册 zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path) zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path) zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path) zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path) unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet) // reset topic deletion manager topicDeletionManager.reset() // shutdown leader rebalance scheduler // 关闭Kafka线程调度器,其实就是取消定期的Leader重选举 kafkaScheduler.shutdown() // 将统计字段全部清0 offlinePartitionCount = 0 preferredReplicaImbalanceCount = 0 globalTopicCount = 0 globalPartitionCount = 0 // stop token expiry check scheduler // 关闭Token过期检查调度器 if (tokenCleanScheduler.isStarted) tokenCleanScheduler.shutdown() // de-register partition ISR listener for on-going partition reassignment task // 取消分区重分配监听器的注册 unregisterPartitionReassignmentIsrChangeHandlers() // shutdown partition state machine // 关闭分区状态机 partitionStateMachine.shutdown() // 取消主题变更监听器的注册 zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path) // 取消分区变更监听器的注册 unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq) // 取消主题删除监听器的注册 zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path) // shutdown replica state machine // 关闭副本状态机 replicaStateMachine.shutdown() // 取消Broker变更监听器的注册 zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path) // 清空controller元数据 controllerContext.resetContext() info("Resigned") }
2. controller 作用:

Controller是kafka一个非常重要的组件。主要的功能包括集群管理和主题管理。下面就结合源代码介绍下集群管理中的broke信息管理如何实现。

broker信息变更主要依靠BrokerModificationsHandler实现,当broker信息变更时会向eventManager发送BrokerModifications消息。

class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
override val path: String = BrokerIdZNode.path(brokerId) override def handleDataChange(): Unit = {
eventManager.put(controller.BrokerModifications(brokerId)) }}

broker信息变更的处理逻辑在BrokerModifications的process方法中。

case class BrokerModifications(brokerId: Int) extends ControllerEvent {
override def state: ControllerState = ControllerState.BrokerChange override def process(): Unit = {
if (!isActive) return // 从zookeeper中获取获取目标Broker的详细数据。 val newMetadata = zkClient.getBroker(brokerId) // 从元数据缓存中获得目标Broker的详细数据 val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId) // 如果两者不相等,说明Broker数据发生了变更 if (newMetadata.nonEmpty && oldMetadata.nonEmpty && newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) {
info(s"Updated broker: ${newMetadata.get}") // 更新元数据 controllerContext.updateBrokerMetadata(oldMetadata, newMetadata) // 向其他Broker同步新的元数据 onBrokerUpdate(brokerId) } } }
private def onBrokerUpdate(updatedBrokerId: Int) {
info(s"Broker info update callback for $updatedBrokerId") // 给集群所有Broker发送更新元数据的请求UpdateMetadataRequest sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) }

转载地址:http://hrcmb.baihongyu.com/

你可能感兴趣的文章
# Fedora 折腾记(一)
查看>>
# 运维日志20180108记录
查看>>
# 运维日志20180109
查看>>
# 运维日志20180110
查看>>
# 运维日志20180111
查看>>
小米笔记本pro系统重置记事
查看>>
Fedora 27添加默认拼音输入法
查看>>
Ubuntu-Budgie折腾记
查看>>
VS Code配置python运行环境
查看>>
个人VS Code配置文件JSON
查看>>
# 维护日志20180123
查看>>
# 运维日志20180213
查看>>
Lustre 2.x文件系统操作手册——前言
查看>>
# Lustre文件系统
查看>>
# 理解Lustre网络(LNet)
查看>>
Note_python(01)
查看>>
Note_python(02)
查看>>
Note_python(03)
查看>>
Note_python(04)
查看>>
Note_python(05)
查看>>