美诚资源网

Nacos 配置中心原理解析

admin 52

前言

当配置中心的内容发生变更时,客户端是如何获取到最新内容的?

监听数据变更的Long-Polling长轮询是如何实现的?

在客户端集群模式中,如何做到只更改某一台客户端的配置内容?

当Nacos挂掉后,客户端还可以获取数据吗?

简介

动态配置服务是Nacos其中的关键特性之一,动态配置服务可以让您以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。动态配置消除了配置变更时重新部署应用和服务的需要,让配置管理变得更加高效和敏捷。配置中心化管理让实现无状态服务变得更简单,让服务按需弹性扩展变得更容易。

我们可以通过Nacos控制台来发布配置,也可以使用Nacos提供的REST接口来发布配置。

发布配置获取配置

Nacos配置中心分为服务端和客户端,服务端提供REST接口查询、更改配置,客户端SDK通过封装了服务端的REST接口来获取配置

客户端实现原理

使用SDK获取配置demo

StringserverAddr="{serverAddr}";StringdataId="{dataId}";Stringgroup="{group}";Propertiesproperties=newProperties();("serverAddr",serverAddr);//创建ConfigServiceConfigServiceconfigService=(properties);//获取dataId的配置Stringcontent=(dataId,group,5000);(content);//动态监听配置,当dataId数据发生变更时会调用receiveConfigInfo方法(dataId,group,newListener(){@OverridepublicvoidreceiveConfigInfo(StringconfigInfo){("recieve1:"+configInfo);}@OverridepublicExecutorgetExecutor(){returnnull;}});

demo中首先获取了dataId的配置内容,并且为该内容添加了监听器,当dataId内容发生变化时会回调receiveConfigInfo方法获取最新的内容

获取配置原理解析

获取配置的主要方法是NacosConfigService类的getConfigInner方法,该方法优先从本地文件中获取配置,如果没有本地文件,则通过HTTPREST接口从服务端获取配置并将配置保存到本地快照文件中,如果从服务端获取配置失败,则会从快照文件中获取配置。

@OverridepublicStringgetConfig(StringdataId,Stringgroup,longtimeoutMs)throwsNacosException{returngetConfigInner(namespace,dataId,group,timeoutMs);}

ConfigService的getConfig方法调用的是getConfigInner方法

privateStringgetConfigInner(Stringtenant,StringdataId,Stringgroup,longtimeoutMs)throwsNacosException{group=null2defaultGroup(group);(dataId,group);ConfigResponsecr=newConfigResponse();(dataId);(tenant);(group);//1优先使用本地配置Stringcontent=((),dataId,group,tenant);if(content!=null){("[{}][get-config]getfailoverok,dataId={},group={},tenant={},config={}",(),dataId,group,tenant,(content));(content);(null,cr);content=();returncontent;}//2如果没有本地配置,则获取服务器中的配置try{String[]ct=(dataId,group,tenant,timeoutMs);(ct[0]);(null,cr);content=();returncontent;}catch(NacosExceptionioe){if(_RIGHT==()){throwioe;}("[{}][get-config]getfromservererror,dataId={},group={},tenant={},msg={}",(),dataId,group,tenant,());}//3如果获取服务器配置失败,则获取本地的快照数据("[{}][get-config]getsnapshotok,dataId={},group={},tenant={},config={}",(),dataId,group,tenant,(content));content=((),dataId,group,tenant);(content);(null,cr);content=();returncontent;}

getConfigInner方法优先从本地文件获取配置,本地文件默认是不存在的,因此如果想用本地配置覆盖远程配置只需要在本地新建配置文件即可,Nacos会优先使用本地文件,本地文件配置的路径为:

/{}/nacos/config/fixed-{serverName}/data/config-data/{group}/{dataId}

如果没有本地配置,则调用REST接口从服务器中获取配置,调用接口为:

/v1/cs/configs?dataId={dataId}group={group}

如果从服务器获取配置失败,则从本地快照数据中获取,每次从服务器获取数据时都会更新本地快照数据,快照文件的路径为:

/{}/nacos/config/fixed-{serverName}/snapshot/{group}/{dataId}

getConfig方法只是获取一次配置文件内容,当配置发生变更后还需要通过上面添加的监听器来获得最新的配置

监听配置原理解析

当通过addListener注册了监听器后,NacosConfigService类会使用ClientWorker类的checkConfigInfo方法创建LongPollingRunnable长轮询线程去监听服务端的配置,默认3000个数据为一组创建一个LongPollingRunnable线程,长轮询连接默认超时时间为30秒,在30秒内如果监听的数据有任何变化会立即返回最新的数据,如果30秒内数据没有任何变化,则会结束当前的监听并开启下一轮监听。

在NacosConfigService类初始化时创建了ClientWorker对象,ClientWorker负责获取Nacos的配置以及创建长轮询连接监听Client中所有使用过的配置。

=newClientWorker(,,properties)

NacosConfigService中初始化ClientWorker对象,在ClientWorker构造方法中开启了以10毫秒的间隔去创建一个默认超时事件为30秒的长轮询连接去监听本地数据的变化,当数据发生变化时则更新本地数据,否则继续监听。

(newRunnable(){publicvoidrun(){try{();}catch(Throwablevar2){("["+()+"][sub-check]rotatecheckerror",var2);}}},1L,10L,);

创建监听线程监听配置变化

publicvoidcheckConfigInfo(){//=().size();//把本地数据进行分组,默认每3000个数据为一组,每组会开启一个长轮询监听线程intlongingTaskCount=(int)(listenerSize/());if(longingTaskCountcurrentLongingTaskCount){for(inti=(int)currentLongingTaskCount;ilongingTaskCount;i++){//开启长轮询线程(newLongPollingRunnable(i));}currentLongingTaskCount=longingTaskCount;}}

LongPollingRunnable是长轮询线程

//使用长轮询获取变更的数据列表ListStringchangedGroupKeys=checkUpdateDataIds(cacheDatas,inInitializingCacheList);for(StringgroupKey:changedGroupKeys){//…………//通过变更的数据key调用REST接口获取最新的数据内容String[]ct=getServerConfig(dataId,group,tenant,3000L);CacheDatacache=().get((dataId,group,tenant));(ct[0]);//…………}//开启下一轮30秒长轮询监听(this);

LongPollingRunnable线程首先通过checkUpdateDataIds中的长轮询连接监听数据,如果数据有变更则更新本地数据,否则开启下一轮的监听checkUpdateDataIds方法调用的是checkUpdateConfigStr开启长轮询监听

ListStringcheckUpdateConfigStr(StringprobeUpdateString,booleanisInitializingCacheList)throwsException{MapString,Stringparams=newHashMapString,String(2);//监听配置的(_MODIFY_REQUEST,probeUpdateString);MapString,Stringheaders=newHashMapString,String(2);//长轮询超时时间默认30秒("Long-Pulling-Timeout",""+timeout);//………………try{//调用Nacos服务端的/listener接口开始监听配置变化longreadTimeoutMs=timeout+(long)(timeout1);HttpRestResultStringresult=(_CONTROLLER_PATH+"/listener",headers,params,(),readTimeoutMs);//返回监听的内容,如果配置发生了变化那么result就是最新的数据,如果配置没有发生变化那么result=nullif(()){setHealthServer(true);returnparseUpdateDataIdResponse(());}else{setHealthServer(false);("[{}][check-update]getchangeddataIderror,code:{}",(),());}}catch(Exceptione){setHealthServer(false);("["+()+"][check-update]getchangeddataIdexception",e);throwe;}();}

checkUpdateConfigStr方法发起长轮询连接来监听Nacos的配置是否有变化,如果在30秒内配置发生了变化则会立即返回新的数据,如果在30秒内没有任何数据变化,则会返回NULL,同时会开启下一轮30秒的监听。

服务端实现原理

ConfigController类提供了发布获取配置的REST接口,我们分别看下发布配置和获取配置的实现原理。

发布配置原理解析

发布配置时会先把数据持久化到存储引擎上,一般是mysql或者是Nacos内置的derby数据库,完成数据持久化之后会将数据变更包装成ConfigDataChangeEvent事件,通过向外广播数据变更事件,所有订阅了ConfigDataChangeEvent事件的消费方会收到数据变更事件。

ConfigDataChangeEvent事件订阅者在收到事件消息后,会先通过HTTPREST接口通知Nacos集群中的所有机器,集群在接收到通知后会先更新本地的内存数据,然后将数据变更事件包装成LocalDataChangeEvent事件通过向外广播本地数据变更事件,所有订阅了LocalDataChangeEvent事件的消费方会收到数据变更事件。

LocalDataChangeEvent事件订阅者在收到事件消息后,会创建一个线程来遍历所有的客户端长轮询连接监听的数据是否包含此次事件中的变更数据,如果变更的数据有客户端正在监听,则直接通过长连接把数据返回给客户端。

发布配置的REST接口为ConfigController中的publishConfig方法

@PostMapping@Secured(action=,parser=)publicBooleanpublishConfig(HttpServletRequestrequest,HttpServletResponseresponse,@RequestParam(value="dataId")StringdataId,@RequestParam(value="group")Stringgroup,@RequestParam(value="tenant",required=false,defaultValue=)Stringtenant,@RequestParam(value="content")Stringcontent,@RequestParam(value="tag",required=false)Stringtag,@RequestParam(value="appName",required=false)StringappName,@RequestParam(value="src_user",required=false)StringsrcUser,@RequestParam(value="config_tags",required=false)StringconfigTags,@RequestParam(value="desc",required=false)Stringdesc,@RequestParam(value="use",required=false)Stringuse,@RequestParam(value="effect",required=false)Stringeffect,@RequestParam(value="type",required=false)Stringtype,@RequestParam(value="schema",required=false)Stringschema)throwsNacosException{/**省略*/finalStringsrcIp=(request);finalStringrequestIpApp=(request);/**省略*/finalTimestamptime=();StringbetaIps=("betaIps");ConfigInfoconfigInfo=newConfigInfo(dataId,group,tenant,appName,content);(type);//更新持久化数据(configInfo,betaIps,srcIp,srcUser,time,true);//广播数据变更事件(newConfigDataChangeEvent(true,dataId,group,tenant,()));returntrue;}

publishConfig方法先将数据进行持久化,然后将数据变更包装成ConfigDataChangeEvent事件通过向外广播,ConfigDataChangeEvent事件的消息订阅类是AsyncNotifyService,它在构造方法中调用注册了Subscriber事件处理类,因此onEvent调用的是方法

publicAsyncNotifyService(ServerMemberManagermemberManager){=memberManager;//订阅事件类型(,);//订阅事件处理类(newSubscriber(){@OverridepublicvoidonEvent(Eventevent){//只处理事件为ConfigDataChangeEvent的类if(eventinstanceofConfigDataChangeEvent){ConfigDataChangeEventevt=(ConfigDataChangeEvent)event;longdumpTs=;StringdataId=;Stringgroup=;Stringtenant=;Stringtag=;//Nacos集群列表,因为我是单机运行模式,所以ipList是本机节点CollectionMemberipList=();//将ConfigDataChangeEvent事件转换为NotifySingleTask任务并将任务放入到队列中QueueNotifySingleTaskqueue=newLinkedListNotifySingleTask();//每个集群都创建一个NotifySingleTask任务for(Membermember:ipList){(newNotifySingleTask(dataId,group,tenant,tag,dumpTs,(),));}//将队列数据保证成AsyncTask对象并使用线程池执行AsyncTask的run方法(newAsyncTask(nacosAsyncRestTemplate,queue));}}});}

AsyncNotifyService的onEvent方法在接收到事件数据后将数据包装成AsyncTask任务,并使用线程池处理AsyncTask,如果Nacos集群存在多个N个节点,则相应创建N个AsyncTask任务

classAsyncTaskimplementsRunnable{@Overridepublicvoidrun(){executeAsyncInvoke();}privatevoidexecuteAsyncInvoke(){while(!()){NotifySingleTasktask=();StringtargetIp=();if((targetIp)){//检查集群中当前服务是否健康,如果服务是下线状态则延时执行任务booleanunHealthNeedDelay=(targetIp);if(unHealthNeedDelay){((),(),(),null,(),(),_EVENT_UNHEALTH,0,);//后台延时执行任务asyncTaskExecute(task);}else{Headerheader=();(_HEADER_LAST_MODIFIED,(()));(_HEADER_OP_HANDLE_IP,());if(){("isBeta","true");}//调用集群的dataChangeREST接口//=;group=(,header,,,newAsyncNotifyCallBack(task));}}}}}

AsyncTask会根据当前集群节点的健康状态来延时或者直接调用集群节点的/nacos/v1/cs/communication/dataChangeREST接口来更新每个集群中的内存数据

@GetMapping("/dataChange")publicBooleannotifyConfigInfo(HttpServletRequestrequest,@RequestParam("dataId")StringdataId,@RequestParam("group")Stringgroup,@RequestParam(value="tenant",required=false,defaultValue=)Stringtenant,@RequestParam(value="tag",required=false)Stringtag){dataId=();group=();StringlastModified=(_HEADER_LAST_MODIFIED);longlastModifiedTs=(lastModified)?-1:(lastModified);StringhandleIp=(_HEADER_OP_HANDLE_IP);StringisBetaStr=("isBeta");//调用dump方法更新集群节点的if((isBetaStr)(isBetaStr)){(dataId,group,tenant,lastModifiedTs,handleIp,true);}else{(dataId,group,tenant,tag,lastModifiedTs,handleIp);}returntrue;}

dataChangeREST接口中调用了方法来更新节点的内存数据

publicvoiddump(StringdataId,Stringgroup,Stringtenant,longlastModified,StringhandleIp,booleanisBeta){StringgroupKey=(dataId,group,tenant);//将数据包装成DumpTask任务并加入到dumpTaskMgr任务列表中(groupKey,newDumpTask(groupKey,lastModified,handleIp,isBeta));}

dump方法将数据包装成DumpTask任务并加入到dumpTaskMgr任务列表中,由父类NacosDelayTaskExecuteEngine的processingExecutor线程池按照100毫秒的间隔执行processTasks方法

protectedvoidprocessTasks(){CollectionObjectkeys=getAllTaskKeys();for(ObjecttaskKey:keys){//删除taskAbstractDelayTasktask=removeTask(taskKey);if(null==task){continue;}//获取task对象应的ProcessorNacosTaskProcessorprocessor=getProcessor(taskKey);if(null==processor){getEngineLog().error("processornotfoundfortask,sodiscarded."+task);continue;}try{//如果执行失败加入重试任务if(!(task)){retryFailedTask(taskKey,task);}}catch(Throwablee){getEngineLog().error("Nacostaskexecuteerror:"+(),e);retryFailedTask(taskKey,task);}}}

processTasks每100毫秒会执行一次,获取Task对应的Processor处理类并执行其process方法,如果执行失败则重新加入到任务队列中,其中默认的Processor为DumpProcessor,因此调用的是方法

publicclassDumpProcessorimplementsNacosTaskProcessor{publicDumpProcessor(DumpServicedumpService){=dumpService;}@Overridepublicbooleanprocess(NacosTasktask){finalPersistServicepersistService=();DumpTaskdumpTask=(DumpTask)task;String[]pair=(());StringdataId=pair[0];Stringgroup=pair[1];Stringtenant=pair[2];longlastModified=();StringhandleIp=();booleanisBeta=();Stringtag=();=().namespaceId(tenant).dataId(dataId).group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);/**省略*///根据dataId、group、tenant从存储上查询最新数据ConfigInfocf=(dataId,group,tenant);((cf));//最新的Content数据((cf)?null:());((cf)?null:());(());}}finalDumpServicedumpService;}

在DumpProcessor的process方法会从存储上(mysql、derby)中查询最新的数据,然后调用dump方法更新内存中的md5值

publicstaticbooleandump(StringdataId,Stringgroup,Stringtenant,Stringcontent,longlastModifiedTs,Stringtype){StringgroupKey=(dataId,group,tenant);//获取key对应的缓存对象CacheItemci=makeSure(groupKey);(type);//写锁finalintlockResult=tryWriteLock(groupKey);assert(lockResult!=0);//加锁失败直接返回if(lockResult0){DUMP_("[dump-error]writelockfailed.{}",groupKey);returnfalse;}try{//更新后配置内容的md5值finalStringmd5=(content,);//新md5和内存中的旧md5值如果一致则不执行saveToDisk的判断if(((groupKey))){DUMP_("[dump-ignore]={},md5={},lastModifiedOld={},"+"lastModifiedNew={}",groupKey,md5,(groupKey),lastModifiedTs);}//如果存储在本地文件,则保存到磁盘上elseif(!()){(dataId,group,tenant,content);}//更新md5值updateMd5(groupKey,md5,lastModifiedTs);returntrue;}catch(IOExceptionioe){DUMP_("[dump-exception]savediskerror."+groupKey+","+(),ioe);if(()!=null){StringerrMsg=();if(NO_SPACE_(errMsg)||NO_SPACE_(errMsg)||(DISK_QUATA_CN)||(DISK_QUATA_EN)){//_("磁盘满自杀退出",ioe);(0);}}returnfalse;}finally{releaseWriteLock(groupKey);}}

dump方法将更新配置的md5值

publicstaticvoidupdateMd5(StringgroupKey,Stringmd5,longlastModifiedTs){CacheItemcache=makeSure(groupKey);if(==null||!(md5)){=md5;=lastModifiedTs;(newLocalDataChangeEvent(groupKey));}}

updateMd5方法将更新的数据包装成LocalDataChangeEvent事件并向事件订阅者广播,LocalDataChangeEvent事件的订阅者是LongPollingService,因此会调用方法

publicLongPollingService(){allSubs=newConcurrentLinkedQueueClientLongPolling();(newStatTask(),0L,10L,);//注册订阅事件类型(,);//注册订阅事件处理类(newSubscriber(){@OverridepublicvoidonEvent(Eventevent){if(isFixedPolling()){//Ignore.}else{if(eventinstanceofLocalDataChangeEvent){LocalDataChangeEventevt=(LocalDataChangeEvent)event;//将事件数据包装成DataChangeTask任务并使用线程池执行(newDataChangeTask(,,));}}}});}

LongPollingService的构造方法中注册了LocalDataChangeEvent事件,并将事件包装成DataChangeTask交给LongPolling线程池处理

/***allSubs存储了与client端的所有长链接列表*/finalQueueClientLongPollingallSubs;classDataChangeTaskimplementsRunnable{@Overridepublicvoidrun(){try{(groupKey);//循环所有client长连接for(IteratorClientLongPollingiter=();();){ClientLongPollingclientSub=();//判断是监听列表中是否存在当前groupKeyif((groupKey)){if(isBeta!(betaIps,)){continue;}if((tag)!()){continue;}getRetainIps().put(,());//删除监听关系();_("{}|{}|{}|{}|{}|{}|{}",(()-changeTime),"in-advance",((HttpServletRequest)()),"polling",(),,groupKey);//向被监听的client长连接发送结果((groupKey));}}}catch(Throwablet){_("datachangeerror:{}",(t));}}}voidsResponse(ListStringchangedGroups){//(null!=asyncTimeoutFuture){(false);}//生成响应内容generateResponse(changedGroups);}voidgenerateResponse(ListStringchangedGroups){//如果没有变更数据结束Tomcat的异步请求if(null==changedGroups){//();return;}HttpServletResponseresponse=(HttpServletResponse)();try{finalStringrespString=(changedGroups);//禁用缓存("Pragma","no-cache");("Expires",0);("Cache-Control","no-cache,no-store");(_OK);//返回变更的().println(respString);//结束Tomcat的异步请求();}catch(Exceptionex){PULL_((),ex);();}}

DataChangeTask每次执行时会在所有的QueueClientLongPollingallSubs长连接列表中查找有监听当前数据变更的Client,并将变更数据推送给Client,同时结束Client长连接轮询连接。

以上是整个发布配置的流程,代码比较长,需要仔细阅读,Long-Polling长轮询也是Nacos比较核心的特性。

获取配置原理解析

Nacos获取配置有2种方法,客户端可以通过HTTPGET获取一次配置内容,另外一种是客户端通过HTTPGET/listener长轮询的方式监听某个配置,当服务端配置发生变化时会将最新的配置推送给客户端。

1短连接获取一次数据

根据dataId、group等参数获取Nacos中的内容,在Nacos中每个配置项都是一个CacheItem对象,每个CacheItem对象中都包含一把读写锁,当客户端来读取数据时,先根据dataId等参数获取CacheItem,如果CacheItem不存在,则返回404,如果CacheItem存在会对其加read锁,如果加锁失败,则会重试,超过最大重试次数10次后仍然失败的,则返回409,如果加锁成功,则从数据库中读取数据返回给客户端,最后释放锁。

HTTPGET获取一次配置的REST接口为getConfig方法

@GetMapping@Secured(action=,parser=)publicvoidgetConfig(HttpServletRequestrequest,HttpServletResponseresponse,@RequestParam("dataId")StringdataId,@RequestParam("group")Stringgroup,@RequestParam(value="tenant",required=false,defaultValue=)Stringtenant,@RequestParam(value="tag",required=false)Stringtag)throwsIOException,ServletException,NacosException{//检查多租户参数(tenant);tenant=(tenant);//验证参数(dataId,group,"datumId","content");(tag);finalStringclientIp=(request);//调用doGetConfig获取配置(request,response,dataId,group,tenant,tag,clientIp);}

getConfig方法是获取配置的REST接口,调用了doGetConfig方法

publicStringdoGetConfig(HttpServletRequestrequest,HttpServletResponseresponse,StringdataId,Stringgroup,Stringtenant,Stringtag,StringclientIp)throwsIOException,ServletException{//将参数拼接成组合成groupKey字符串finalStringgroupKey=(dataId,group,tenant);StringautoTag=("Vipserver-Tag");StringrequestIpApp=(request);//对groupKey加读锁,Nacos会把所有的配置数据dump到内存中做缓存,每个缓存数据对象中都会包含一把读写锁//lockResult=0内存中不存在groupKey对应的数据//lockResult=1加锁成功//lockResult=-1加锁失败intlockResult=tryConfigReadLock(groupKey);finalStringrequestIp=(request);booleanisBeta=false;//加锁成功if(lockResult0){/***代码太长,省略非核心逻辑*/Stringmd5=;longlastModified=0L;//从缓存中获取CacheItem对象CacheItemcacheItem=(groupKey);//配置内容的md5md5=();//配置内容最后修改时间lastModified=();//如果是单机模式,直接从持久化的数据源读取数据,如mysql、derby,否则从文件系统读取数据if(()){configInfoBase=(dataId,group,tenant);}else{file=(dataId,group,tenant);}//容错处理//如果持久化数据源和文件都不存在数据返回数据不存在if(configInfoBase==nullfileNotExist(file)){//FIXMECacheItem//,(dataId,group,tenant,requestIpApp,-1,_EVENT_NOTFOUND,-1,requestIp);(_NOT_FOUND);().println("configdatanotexist");_NOT_FOUND+"";}/***代码太长,省略非核心逻辑*///禁用缓存.("Pragma","no-cache");("Expires",0);("Cache-Control","no-cache,no-store");if(()){("Last-Modified",lastModified);}else{fis=newFileInputStream(file);("Last-Modified",());}//返回数据if(()){out=();(());();();}else{().transferTo(0L,().size(),(()));}}//加锁数据返回数据不存在elseif(lockResult==0){//,(dataId,group,tenant,requestIpApp,-1,_EVENT_NOTFOUND,-1,requestIp);(_NOT_FOUND);().println("configdatanotexist");_NOT_FOUND+"";}//在尝试了10次加锁后失败,返回资源冲突else{PULL_("[client-get]clientIp={},{},getdataduringdump",clientIp,groupKey);(_CONFLICT);().println("requestedfileisbeingmodified,pleasetrylater.");_CONFLICT+"";}_OK+"";}

doGetConfig方法对CacheItem加读锁成功后从持久化数据层或者文件中读取配置内容,doGetConfig获取配置内容的逻辑比较简单

2长连接监听数据变更

根据客户端传入的probeModify监听的数据列表,先判断客户端是否支持长轮询,如果客户端支持长轮询,则开启长轮询连接,如果客户端不支持,则检测被监听数据probeModify中的数据在服务端是否存在变更,如果有直接返回最新的数据。

服务端长轮询是使用的一个延时线程ClientLongPolling实现的,用以阻塞客户端的连接,并且将线程ClientLongPolling加入到QueueClientLongPollingallSubs保存了起来,线程默认延时时间为客户端传入的Long-Pulling-Timeout减去0.5秒,因此延时时间一般是29.5秒,在29.5秒后延时线程会直接返回NULL,由客户端发起下一次长轮询请求,直接返回NULL的原因是因为如果在这29.5秒中如果被监听的probeModify数据发生了变化,会在发布配置时创建的DataChangeTask线程中会从QueueClientLongPollingallSubs延时线程列表中找到响应的ClientLongPolling线程,将线程取消,同时将最新的数据通过ClientLongPolling中保存的AsyncContext对象将数据推送给客户端,因为如果ClientLongPolling线程在29.5秒后执行了,说明在这期间没有数据变更,因此直接返回NULL。

HTTPLong-Polling长轮询监听接口为listener方法

@PostMapping("/listener")@Secured(action=,parser=)publicvoidlistener(HttpServletRequestrequest,HttpServletResponseresponse)throwsServletException,IOException{//设置异步参数("_SUPPORTED",true);//获取监听的dataIdStringprobeModify=("Listening-Configs");if((probeModify)){thrownewIllegalArgumentException("invalidprobeModify");}probeModify=(probeModify,);MapString,StringclientMd5Map;try{clientMd5Map=(probeModify);}catch(Throwablee){thrownewIllegalArgumentException("invalidprobeModify");}//开始长轮询监听(request,response,clientMd5Map,());}

_SUPPORTED是的新特性,支持异步处理请求

publicStringdoPollingConfig(HttpServletRequestrequest,HttpServletResponseresponse,MapString,StringclientMd5Map,intprobeRequestSize)throwsIOException{//如果客户端支持长轮询,将开启长轮询监听数据变更if((request)){(request,response,clientMd5Map,probeRequestSize);_OK+"";}//如果客户端不支持长轮询,则直接查找probeModify中修改的数据并返回结果ListStringchangedGroups=(request,response,clientMd5Map);StringoldResult=(changedGroups);StringnewResult=(changedGroups);Stringversion=(_VERSION_HEADER);if(version==null){version="2.0.0";}intversionNum=(version);//2.0.4之前的版本将新老MD5值放入header中if(versionNumSTART_LONG_POLLING_VERSION_NUM){(_MODIFY_RESPONSE,oldResult);(_MODIFY_RESPONSE_NEW,newResult);}else{("content",newResult);}("newcontent:"+newResult);//禁用缓存.("Pragma","no-cache");("Expires",0);("Cache-Control","no-cache,no-store");(_OK);_OK+"";}

doPollingConfig先判断客户端是否支持长轮询如果支持则使用长轮询监听,否则直接返回有变更数据的md5值

publicvoidaddLongPollingClient(HttpServletRequestreq,HttpServletResponsersp,MapString,StringclientMd5Map,intprobeRequestSize){Stringstr=(_POLLING_HEADER);StringnoHangUpFlag=(_POLLING_NO_HANG_UP_HEADER);StringappName=(_APPNAME_HEADER);Stringtag=("Vipserver-Tag");intdelayTime=(_DELAY_TIME,500);//获取长轮询超时时间,为避免客户端超时这里的超时时间减去了500毫秒//timeout一般为30-0.5=29.5秒longtimeout=(10000,(str)-delayTime);if(isFixedPolling()){timeout=(10000,getFixedPollingInterval());}else{longstart=();//查找有变化的数据ListStringchangedGroups=(req,rsp,clientMd5Map);//数据发生了变化if(()0){//直接返回变化的数据generateResponse(req,rsp,changedGroups);_("{}|{}|{}|{}|{}|{}|{}",()-start,"instant",(req),"polling",(),probeRequestSize,());return;}//如果监听的数据没有变化并且header中有Long-Pulling-Timeout-No-Hangup标示则直接结束elseif(noHangUpFlag!=(TRUE_STR)){_("{}|{}|{}|{}|{}|{}|{}",()-start,"nohangup",(req),"polling",(),probeRequestSize,());return;}}/***注意!!!*当数据没有变化时,代码会执行到这里,开始使用长轮询阻塞请求*/Stringip=(req);//开启异步支持finalAsyncContextasyncContext=();//Servlet的异步超时时间不正确,超时时间由自己来控制(0L);//创建ClientLongPolling线程并交给ConfigExecutor执行(newClientLongPolling(asyncContext,clientMd5Map,ip,probeRequestSize,timeout,appName,tag));}//通过Response返回变化的数据voidgenerateResponse(HttpServletRequestrequest,HttpServletResponseresponse,ListStringchangedGroups){if(null==changedGroups){return;}try{finalStringrespString=(changedGroups);//禁用缓存.("Pragma","no-cache");("Expires",0);("Cache-Control","no-cache,no-store");(_OK);().println(respString);}catch(Exceptionex){PULL_((),ex);}}

addLongPollingClient首先会检测一次数据是否有变化,如果有则通过generateResponse方法直接返回响应结果,否则创建ClientLongPolling线程开启长轮询连接,长轮询连接使用一个延时线程实现,延时时间从客户端的header中获取,默认为30s,实际上是29.5秒防止客户端超时。

/***allSubs存储了与client端的所有长链接列表*/finalQueueClientLongPollingallSubs;classClientLongPollingimplementsRunnable{@Overridepublicvoidrun(){//run方法创建了一个延时线程,延时时间为长轮询的超时时间30-0.5=29.5秒//也就是说在29.5秒之后会执行下面的run方法asyncTimeoutFuture=(newRunnable(){@Overridepublicvoidrun(){try{/***29.5秒之后开始执行*/getRetainIps().put(,());//删除订阅关系();//不会走这个分支if(isFixedPolling()){_("{}|{}|{}|{}|{}|{}",(()-createTime),"fix",((HttpServletRequest)()),"polling",(),probeRequestSize);ListStringchangedGroups=((HttpServletRequest)(),(HttpServletResponse)(),clientMd5Map);if(()0){sResponse(changedGroups);}else{sResponse(null);}}//因为过了29.5秒到了长轮询的超时时间,说明在29.5秒内没有数据发生过变化//因此发送空数据给client,由client开启下一次长轮询else{_("{}|{}|{}|{}|{}|{}",(()-createTime),"timeout",((HttpServletRequest)()),"polling",(),probeRequestSize);//发送空的数据给clientsResponse(null);}}catch(Throwablet){_("longpollingerror:"+(),());}}},timeoutTime,);//注意!!!//在创建完延时线程后,就将当前对象加入allSubs队列中了,allSubs存储了与client端的所有长链接列表(this);}}

ClientLongPolling会创建一个29.5秒的延时线程,并将当前长轮询对象加入到allSubs队列中,在29.5之内如果监听的数据发生了变化会由发布配置的DataChangeTask线程将变更数据发送给Client同时取消asyncTimeoutFuture这个延时线程,如果在29.5内监听的数据没有发送变化则发送空数据给Client,由Client开启下一次长轮询。

总结

Nacos客户端SDK在获取配置时会优先从本地文件中读取配置,也就是说如果不想从Nacos服务端中获取数据,可以在本地新建文件,这样就可以单独更改集群中某个机器的配置

Nacos客户端SDK在从Nacos服务器获取配置失败时,会从快照数据中读取配置,而快照数据是存储的上一次从服务器拉取的数据,当Nacos服务器挂掉后,从本地快照依然可以获取数据

Nacos服务端支持短连接获取配置和长轮询监听配置方式,长轮询监听是基于的异步特性实现的,由客户端发起长轮询请求,服务端使用延时线程阻塞请求,阻塞超时时间为30秒,在30秒内监听数据发生变化服务端将最新数据推送给客户端,在30秒外服务端返回NULL给客户端,由客户端发起下一次长轮询请求,继续监听。