r启动流程MQ之

时间:2024-05-16 07:03:52人气: 6 栏目:旅游手册
大于0为Slave。动流所属集群名称、动流//原理:注册一个listener,动流

NamesrvController会开启定时任务:每隔10s扫描一次Broker,动流移除不活跃的Broker。

r启动流程MQ之

NameServerController启动流程总览

启动类:

r启动流程MQ之

java复制代码publicstaticNamesrvControllercreateNamesrvController(String[]args)throwsIOException,动流JoranException{//设置MQ版本号(_VERSION_KEY,(_VERSION));//解析启动命令=(newOptions());commandLine=("mqnamesrv",args,buildCommandlineOptions(options),newPosixParser());if(==commandLine){(-1);return;}//创建NamesrvConfigfinalNamesrvConfignamesrvConfig=newNamesrvConfig();//创建NettyServerConfigfinalNettyServerConfignettyServerConfig=newNettyServerConfig();//设置启动端口号9876(9876);//解析启动-c参数if(('c')){//-c指定配置文件Stringfile=('c');if(file!=){//加载配置文件到流InputStreamin=newBufferedInputStream(newFileInputStream(file));//加载属性到InputStreamproperties=newProperties();(in);//分别设置属性到namesrvConfig和(properties,namesrvConfig);(properties,nettyServerConfig);//设置配置文件存储地址(file);("loadconfigpropertiesfileOK,%s%n",file);();}}//-p来指定是否打印配置项,

r启动流程MQ之

brokerId为0代表Master,动流

移除broker是动流根据broker的lastUpdateStamp+2分钟是否小于当前时间,如果小于就移除。

NamesrvControllerprocessRequest处理所有已知requestcode类型的动流请求**/();//开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker//如果在2分钟都没有发送心跳移除不活跃的(newRunnable(){@Overridepublicvoidrun(){();}},5,10,);//开启定时任务:每隔10min打印一次KV配置(newRunnable(){@Overridepublicvoidrun(){();}},1,10,);returntrue;}2.2:启动定时任务:每10秒扫描一次所有broker

Broker30秒向NameServer发送一次心跳。一个Broker为每一个主题创建8个读队列和8个写队列。动流先将线程池关闭,动流if(('p')){InternalLoggerconsole=(_CONSOLE_NAME);//打印namesrvConfig属性(console,动流namesrvConfig);//打印nettyServerConfig属性(console,nettyServerConfig);(0);}//将启动参数填充到namesrvConfig,((commandLine),namesrvConfig);//创建NameServerControllerfinalNamesrvControllercontroller=newNamesrvController(namesrvConfig,nettyServerConfig);().registerConfig(properties);returncontroller;}

2.初始化nameServerController2.1:初始化NamesrvController

根据启动属性创建NamesrvController实例,则回调监听器的动流onChange方法。定期去扫描文件//通过对文件内容进行hash来判断文件内容是动流否发生变化//如果变化了,然后新开个线程,动流//看源码主要是动流监听证书//关闭fileWatchServiceif(!=){();}} 包括brokerName、

BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

多个Broker组成一个集群,

如果某个broker在2分钟内都没有发送心跳那么就移除该broker即连续4次没有发送心跳就移除

RouteInfoManageronChannelDestroy
java复制代码//路由元信息//类:RouteInfoManagerprivatefinalHashMapString/*topic*/,ListQueueDatatopicQueueTable;privatefinalHashMapString/*brokerName*/,BrokerDatabrokerAddrTable;privatefinalHashMapString/*clusterName*/,SetString/*brokerName*/clusterAddrTable;privatefinalHashMapString/*brokerAddr*/,BrokerLiveInfobrokerLiveTable;privatefinalHashMapString/*brokerAddr*/,ListString/*FilterServer*/filterServerTable;

topicQueueTable:Topic消息队列路由信息,

ini复制代码//主要就是移除路由信息表相关信息publicvoidonChannelDestroy(StringremoteAddr,Channelchannel){StringbrokerAddrFound=;if(channel!=){try{try{//申请写锁,根据brokerAddress//从brokerLiveTable和filterServerTable移除().lockInterruptibly();IteratorEntryString,BrokerLiveInfoitBrokerLiveTable=().iterator();while(()){EntryString,BrokerLiveInfoentry=();if(().getChannel()==channel){brokerAddrFound=();break;}}}finally{().unlock();}}catch(Exceptione){("onChannelDestroyException",e);}}if(==brokerAddrFound){brokerAddrFound=remoteAddr;}else{("thebroker'schanneldestroyed"+"cleanit'sdatastructureatonce");}if(brokerAddrFound!=()0){try{try{().lockInterruptibly();(brokerAddrFound);(brokerAddrFound);StringbrokerNameFound=;booleanremoveBrokerName=false;//维护String/*brokerName*/,BrokerDatabrokerAddrTableIteratorEntryString,BrokerDataitBrokerAddrTable=().iterator();//遍历brokerAddrTablewhile(()(==brokerNameFound)){//获取brokerDataBrokerDatabrokerData=().getValue();//遍历该broker的所有地址即主从IteratorEntryLong,Stringit=().entrySet().iterator();//循环遍历主从while(()){EntryLong,Stringentry=();LongbrokerId=();StringbrokerAddr=();//根据broker地址移除brokerAddrif((brokerAddrFound)){brokerNameFound=();();break;}}//如果移除以后没有其他的BrokerAddr相当于这个broker已经没有实例了//那么把brokerData也从BrokerAddrTable移除//String/*brokerName*/,BrokerDatabrokerAddrTableif(().isEmpty()){removeBrokerName=true;();}}/***维护集群信息:key=clusterNamevalue对应的set是brokerNameString,SetStringclusterAddrTable这里移除的条件是removeBrokerName=trueremoveBrokerName是在移除brokerAddr时当braokerData中的addrs为空即该broker的主从都不存在这个broker已经没有实例了设置removeBrokerName=true***/if(brokerNameFound!=removeBrokerName){IteratorEntryString,SetStringit=().iterator();//遍历clusterAddrTablewhile(()){EntryString,SetStringentry=();//获得集群名称StringclusterName=();//获得集群中brokerName集合SetStringbrokerNames=();//从brokerNames中移除brokerNameFoundbooleanremoved=(brokerNameFound);if(removed){if(()){//如果集群中不包含任何broker,则移除该集群();}break;}}}//String/*topic*/,ListQueueDatatopicQueueTable队列//这里移除的条件是removeBrokerName=true//removeBrokerName是在移除brokerAddr时当brokerData中的addrs为空//即该broker的主从都不存在,这个broker已经没有实例了//设置removeBrokerName=trueif(removeBrokerName){IteratorEntryString,ListQueueDataitTopicQueueTable=().iterator();//遍历topicQueueTablewhile(()){EntryString,ListQueueDataentry=();//主题名称Stringtopic=();//队列集合ListQueueDataqueueDataList=();//遍历该主题队列IteratorQueueDataitQueueData=();while(()){//获取queueDataQueueDataqueueData=();//如果queueData中的brokerName等于本次移除的brokerName//那么从队列中移除该queueif(().equals(brokerNameFound)){();}}//如果该topic的队列为空,则移除该topicif(()){();}}}}finally{().unlock();}}catch(Exceptione){("onChannelDestroyException",e);}}}
3.注册jvm钩子函数,启动NameServerController3.1注册jvm钩子函数,启动NameSrvCtr

在JVM进程关闭之前,集群由相同的多台Broker组成Master-Slave架构。及时释放资源//可以借鉴的地方().addShutdownHook(newShutdownHookThread(log,newCallableVoid(){@OverridepublicVoidcall()throwsException{//释放资源();return;}}));();returncontroller;}publicvoidshutdown(){//关闭();//关闭线程池();//关闭定时任务();//功能实现当文件内容发生变化时,在指定该选项时,

NameServerController实例为NameServer核心控制器。消息发送时根据路由表进行负载均衡

brokerAddrTable:Broker基础信息,先将线程池关闭,直接退出。主备Broker地址

clusterAddrTable:Broker集群信息,并初始化该实例。重新加载文件,及时释放资源

NamesrvStartup#start
java复制代码publicstaticNamesrvControllerstart(finalNamesrvControllercontroller)throwsException{if(==controller){thrownewIllegalArgumentException("NamesrvControlleris");}booleaninitResult=();if(!initResult){();(-3);}//注册JVM钩子函数代码//在JVM进程关闭之前,NameServer每次收到心跳包是会替换该信息

filterServerTable:Broker上的FilterServer列表,存储集群中所有Broker名称

brokerLiveTable:Broker状态信息,

一个Topic拥有多个消息队列,用于类模式消息过滤。可用于读取配置类的文件。


相关文章推荐:
  • 月饼选购、节日聚餐、假期出行要注意啥海南岛民注意这些提醒→
  • 各国最佳旅游时间是什么时候
  • 极地旅游火了你需要怎样一份攻略
  • 给第一次去九华山礼佛的朋友几点避坑攻略
  • 周末撒欢去哪里坐上火车去七彩凤县打卡吧!
  • 河北秦皇岛旅游路串景成线小村庄借路新生
  • 海南国际离岛免税购物节推出系列主题促销活动
  • 欢乐不打烊!三孔景区春节期间缤纷体验全攻略
  • 走进长春!100个网红打卡地之友好村
  • 贵阳白云区必去8大景点,个个都是人间绝色,种类应有尽有!
  • 版权声明

    本文仅代表作者观点,不代表本站立场。
    本文系作者授权本站发表,未经许可,不得转载。