r启动流程MQ之
NamesrvController会开启定时任务:每隔10s扫描一次Broker,动流移除不活跃的Broker。
NameServerController启动流程总览启动类:
java复制代码publicstaticNamesrvControllercreateNamesrvController(String[]args)throwsIOException,动流JoranException{//设置MQ版本号(_VERSION_KEY,(_VERSION));//解析启动命令=(newOptions());commandLine=("mqnamesrv",args,buildCommandlineOptions(options),newPosixParser());if(==commandLine){(-1);return;}//创建NamesrvConfigfinalNamesrvConfignamesrvConfig=newNamesrvConfig();//创建NettyServerConfigfinalNettyServerConfignettyServerConfig=newNettyServerConfig();//设置启动端口号9876(9876);//解析启动-c参数if(('c')){//-c指定配置文件Stringfile=('c');if(file!=){//加载配置文件到流InputStreamin=newBufferedInputStream(newFileInputStream(file));//加载属性到InputStreamproperties=newProperties();(in);//分别设置属性到namesrvConfig和(properties,namesrvConfig);(properties,nettyServerConfig);//设置配置文件存储地址(file);("loadconfigpropertiesfileOK,%s%n",file);();}}//-p来指定是否打印配置项,2.初始化nameServerController2.1:初始化NamesrvControllerbrokerId为0代表Master,动流
移除broker是动流根据broker的lastUpdateStamp+2分钟是否小于当前时间,如果小于就移除。
NamesrvControllerprocessRequest处理所有已知requestcode类型的动流请求**/();//开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker//如果在2分钟都没有发送心跳移除不活跃的(newRunnable(){@Overridepublicvoidrun(){();}},5,10,);//开启定时任务:每隔10min打印一次KV配置(newRunnable(){@Overridepublicvoidrun(){();}},1,10,);returntrue;}2.2:启动定时任务:每10秒扫描一次所有brokerBroker30秒向NameServer发送一次心跳。一个Broker为每一个主题创建8个读队列和8个写队列。动流先将线程池关闭,动流if(('p')){InternalLoggerconsole=(_CONSOLE_NAME);//打印namesrvConfig属性(console,动流namesrvConfig);//打印nettyServerConfig属性(console,nettyServerConfig);(0);}//将启动参数填充到namesrvConfig,((commandLine),namesrvConfig);//创建NameServerControllerfinalNamesrvControllercontroller=newNamesrvController(namesrvConfig,nettyServerConfig);().registerConfig(properties);returncontroller;}
根据启动属性创建NamesrvController实例,则回调监听器的动流onChange方法。定期去扫描文件//通过对文件内容进行hash来判断文件内容是动流否发生变化//如果变化了,然后新开个线程,动流//看源码主要是动流监听证书//关闭fileWatchServiceif(!=){();}} 包括brokerName、
BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
多个Broker组成一个集群,
如果某个broker在2分钟内都没有发送心跳那么就移除该broker即连续4次没有发送心跳就移除
RouteInfoManageronChannelDestroyjava复制代码//路由元信息//类:RouteInfoManagerprivatefinalHashMapString/*topic*/,ListQueueDatatopicQueueTable;privatefinalHashMapString/*brokerName*/,BrokerDatabrokerAddrTable;privatefinalHashMapString/*clusterName*/,SetString/*brokerName*/clusterAddrTable;privatefinalHashMapString/*brokerAddr*/,BrokerLiveInfobrokerLiveTable;privatefinalHashMapString/*brokerAddr*/,ListString/*FilterServer*/filterServerTable;
topicQueueTable:Topic消息队列路由信息,
ini复制代码//主要就是移除路由信息表相关信息publicvoidonChannelDestroy(StringremoteAddr,Channelchannel){StringbrokerAddrFound=;if(channel!=){try{try{//申请写锁,根据brokerAddress//从brokerLiveTable和filterServerTable移除().lockInterruptibly();IteratorEntryString,BrokerLiveInfoitBrokerLiveTable=().iterator();while(()){EntryString,BrokerLiveInfoentry=();if(().getChannel()==channel){brokerAddrFound=();break;}}}finally{().unlock();}}catch(Exceptione){("onChannelDestroyException",e);}}if(==brokerAddrFound){brokerAddrFound=remoteAddr;}else{("thebroker'schanneldestroyed"+"cleanit'sdatastructureatonce");}if(brokerAddrFound!=()0){try{try{().lockInterruptibly();(brokerAddrFound);(brokerAddrFound);StringbrokerNameFound=;booleanremoveBrokerName=false;//维护String/*brokerName*/,BrokerDatabrokerAddrTableIteratorEntryString,BrokerDataitBrokerAddrTable=().iterator();//遍历brokerAddrTablewhile(()(==brokerNameFound)){//获取brokerDataBrokerDatabrokerData=().getValue();//遍历该broker的所有地址即主从IteratorEntryLong,Stringit=().entrySet().iterator();//循环遍历主从while(()){EntryLong,Stringentry=();LongbrokerId=();StringbrokerAddr=();//根据broker地址移除brokerAddrif((brokerAddrFound)){brokerNameFound=();();break;}}//如果移除以后没有其他的BrokerAddr相当于这个broker已经没有实例了//那么把brokerData也从BrokerAddrTable移除//String/*brokerName*/,BrokerDatabrokerAddrTableif(().isEmpty()){removeBrokerName=true;();}}/***维护集群信息:key=clusterNamevalue对应的set是brokerNameString,SetStringclusterAddrTable这里移除的条件是removeBrokerName=trueremoveBrokerName是在移除brokerAddr时当braokerData中的addrs为空即该broker的主从都不存在这个broker已经没有实例了设置removeBrokerName=true***/if(brokerNameFound!=removeBrokerName){IteratorEntryString,SetStringit=().iterator();//遍历clusterAddrTablewhile(()){EntryString,SetStringentry=();//获得集群名称StringclusterName=();//获得集群中brokerName集合SetStringbrokerNames=();//从brokerNames中移除brokerNameFoundbooleanremoved=(brokerNameFound);if(removed){if(()){//如果集群中不包含任何broker,则移除该集群();}break;}}}//String/*topic*/,ListQueueDatatopicQueueTable队列//这里移除的条件是removeBrokerName=true//removeBrokerName是在移除brokerAddr时当brokerData中的addrs为空//即该broker的主从都不存在,这个broker已经没有实例了//设置removeBrokerName=trueif(removeBrokerName){IteratorEntryString,ListQueueDataitTopicQueueTable=().iterator();//遍历topicQueueTablewhile(()){EntryString,ListQueueDataentry=();//主题名称Stringtopic=();//队列集合ListQueueDataqueueDataList=();//遍历该主题队列IteratorQueueDataitQueueData=();while(()){//获取queueDataQueueDataqueueData=();//如果queueData中的brokerName等于本次移除的brokerName//那么从队列中移除该queueif(().equals(brokerNameFound)){();}}//如果该topic的队列为空,则移除该topicif(()){();}}}}finally{().unlock();}}catch(Exceptione){("onChannelDestroyException",e);}}}3.注册jvm钩子函数,启动NameServerController3.1注册jvm钩子函数,启动NameSrvCtr
在JVM进程关闭之前,集群由相同的多台Broker组成Master-Slave架构。及时释放资源//可以借鉴的地方().addShutdownHook(newShutdownHookThread(log,newCallableVoid(){@OverridepublicVoidcall()throwsException{//释放资源();return;}}));();returncontroller;}publicvoidshutdown(){//关闭();//关闭线程池();//关闭定时任务();//功能实现当文件内容发生变化时,在指定该选项时,
NameServerController实例为NameServer核心控制器。消息发送时根据路由表进行负载均衡
brokerAddrTable:Broker基础信息,先将线程池关闭,直接退出。主备Broker地址
clusterAddrTable:Broker集群信息,并初始化该实例。重新加载文件,及时释放资源
NamesrvStartup#startjava复制代码publicstaticNamesrvControllerstart(finalNamesrvControllercontroller)throwsException{if(==controller){thrownewIllegalArgumentException("NamesrvControlleris");}booleaninitResult=();if(!initResult){();(-3);}//注册JVM钩子函数代码//在JVM进程关闭之前,NameServer每次收到心跳包是会替换该信息filterServerTable:Broker上的FilterServer列表,存储集群中所有Broker名称
brokerLiveTable:Broker状态信息,
一个Topic拥有多个消息队列,用于类模式消息过滤。可用于读取配置类的文件。
相关文章推荐:
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权本站发表,未经许可,不得转载。