博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka Architecture
阅读量:5968 次
发布时间:2019-06-19

本文共 2367 字,大约阅读时间需要 7 分钟。

hot3.png

Kafka 主函数在kafka.Kafka.scala 如下:

      val serverConfig = new KafkaConfig(props)      KafkaMetricsReporter.startReporters(serverConfig.props)      val kafkaServerStartble = new KafkaServerStartable(serverConfig)            kafkaServerStartble.startup

读取命令行中第一个参数作为配置文件

bin/zookeeper-server-start.sh config/zookeeper.properties

kafka.utils.Utils从中读取配置项 生成props 作为KafkaConfig参数

KafkaConfig构造函数还是已VerifiableProperties作为参数

KafkaMetricsReporter调用startReporters(props

class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {  private var server: KafkaServer = null  def startup() {      server.startup()  }}

KafkaServerStartable看上去就是一个KafkaServer ……  直接看kafka.server.KafkaServer吧

    /* start scheduler */    kafkaScheduler.startup()    /* setup zookeeper */    zkClient = initZk()    /* start log manager */    logManager = createLogManager(zkClient)    logManager.startup()    socketServer = new SocketServer(config.brokerId,      config.hostName,      config.port,      config.numNetworkThreads,      config.queuedMaxRequests,      config.socketSendBufferBytes,      config.socketReceiveBufferBytes,      config.socketRequestMaxBytes)    socketServer.startup()    replicaManager = new ReplicaManager(config,       time,       zkClient,       kafkaScheduler,       logManager,       isShuttingDown)    kafkaController = new KafkaController(config, zkClient)    /* start processing requests */    apis = new KafkaApis(socketServer.requestChannel,       replicaManager,       zkClient,       config.brokerId,       config, kafkaController)          requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,       socketServer.requestChannel,       apis,       config.numIoThreads)    Mx4jLoader.maybeLoad()    replicaManager.startup()    kafkaController.startup()    topicConfigManager = new TopicConfigManager(zkClient, logManager)    topicConfigManager.startup()    /* tell everyone we are alive */    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId,       config.advertisedHostName,       config.advertisedPort,       config.zkSessionTimeoutMs,       zkClient)          kafkaHealthcheck.startup()    registerStats()    startupComplete.set(true);

读kafka源码到了这里,突然间有一种豁然开朗的感觉,

在这里先后启动了

KafkaScheduler、LogManager、SocketServer、ReplicaManager、KakfaController、KafkaApi ....

好戏~ 就此上演 待我~ 细说从头

转载于:https://my.oschina.net/darionyaphet/blog/285010

你可能感兴趣的文章
关于在VS2005中编写DLL遇到 C4251 警告的解决办法
查看>>
提高信息安全意识对网络勒索病毒说不
查看>>
使用Jquery 加载页面时调用JS
查看>>
css+div+jquery弹出层
查看>>
求职相关(链接,不定期更新)
查看>>
maya pyside 多个窗口实例 报错 解决
查看>>
我的友情链接
查看>>
通知中心
查看>>
我的友情链接
查看>>
MVC中的三个模块
查看>>
Line: 220 - com/opensymphony/xwork2/spring/SpringObjectFactory.java:220:-1
查看>>
oracle 常用命令大汇总
查看>>
2012年春运火车票电话和网上订票技巧、攻略
查看>>
根据request获取请求路径
查看>>
mysql 并行复制
查看>>
傲不可长,欲不可纵,乐不可极,志不可满——提高个人修养
查看>>
linux系统增加swap容量的方法
查看>>
后台调用gps
查看>>
HTML5标签的语义认知和理解(1)
查看>>
MySQL日志功能详解(2)
查看>>