美诚资源网

服务注册发现之服务注册中心设计原理与Golang实现

admin 124
内容提要

通过本文您将get如下知识:

微服务为什么引入服务注册发现

服务注册中心设计原理

Golang代码实现服务注册中心

为什么引入服务注册发现

从单体架构转向微服务架构过程中,当服务调用其他服务时,如何找到正确的服务地址是最基础问题。服务拆分的早期,将服务调用域名写死到代码或配置文件中,然后通过Host配置或DNS域名解析进行路由寻址,服务有多个实例,还会加入负载均衡(Nginx、F5)。

(服务域名配置模式)

但人工维护慢慢会出现瓶颈和问题:新增服务或服务扩容,所有依赖需要新增修改配置;某台服务器挂了还要手动摘流量;服务上下线变更时效慢;人工配置错误或漏配;RPC类型服务不能满足这时你会想如果能让服务自动化完成配置(注册)和查找(发现)就好了,于是乎服务注册发现就应运而生。

(服务注册发现模式)

可以看出,所有服务提供者在上下线时都会告知服务注册中心,服务消费者要查找服务直接从注册中心拉取。一切都变得更加美好,那么服务注册中心该如何实现呢?简单!优秀的开源项目已有一大把,大名鼎鼎的Zookeeper、Eureka,还有后期之秀Consul、Nacos、Etcd,当然有些算是分布式KV存储,要实现服务注册发现仍需些额外工作。如何技术选型,是AP模式更好还是CP模式更好?今天先抛开这些开源项目,我们亲自动手来实现一个服务注册中心,深入理解其设计原理,逐行代码分析与实践。PS:本文项目代码参考bilibilidiscover开源项目进行改造。

注册中心实现原理

首先进行功能需求分析,作为服务注册中心,要实现如下基本功能:

服务注册:接受来自服务提交的注册信息,并保存起来

服务下线:接受服务的主动下线请求,并将服务从注册信息表中删除

服务获取:调用方从注册中心拉取服务信息

服务续约:服务健康检查,服务通过心跳保持(主动续约)告知注册中心服务可用

服务剔除:注册中心将长时间不续约的服务实例从注册信息表中删除

服务中心首先要维护一个服务地址注册信息列表(简称注册表)。通俗理解注册表就像手机通讯录,记录了所有联系人(服务)的电话(服务地址),通过联系人姓名(服务名称)即可找到。

那么如何存储注册表呢?最普遍认知想到存数据库(Redis这种内存数据库),Zookeeper、Etcd本身作为分布式KV存储天然具有成为注册中心的优势,但这些都会引入新组件,要考虑其稳定性及性能。那么我们可以直接将注册信息存到内存中,这时候你会想如果服务挂了内存数据丢了怎么办?这个问题后面我们会想办法解决。

首先构建一个注册表Registry数据结构,定义如下:

typeRegistrystruct{
appsmap[string]*Application

}

apps记录应用服务Application的信息,使用map结构,key为应用服务的唯一标识,值为应用服务结构类型

lock读写锁,保障并发读写安全

应用服务Application结构如下:

typeApplicationstruct{
appidstring
instancesmap[string]*Instance
latestTimestampint64

}

appid记录应用服务唯一标识

lock读写锁,保障并发读写安全

latestTimestamp记录更新时间

instances记录服务实例Instance的信息,使用map结构,key为实例的hostname(唯一标识),值为实例结构类型

服务实例Instance的结构如下:

typeInstancestruct{
Envstring`json:"env"`
AppIdstring`json:"appid"`
Hostnamestring`json:"hostname"`
Addrsstring`json:"addrs"`
Versionstring`json:"version"`
Statusuint32`json:"status"`

RegTimestampint64`json:"reg_timestamp"`
UpTimestampint64`json:"up_timestamp"`
RenewTimestampint64`json:"renew_timestamp"`
DirtyTimestampint64`json:"dirty_timestamp"`
LatestTimestampint64`json:"latest_timestamp"`
}

Env服务环境标识,如online、dev、test

AppId应用服务的唯一标识

Hostname服务实例的唯一标识

Addrs服务实例的地址,可以是http或rpc地址,多个地址可以维护数组

Version服务实例版本

Status服务实例状态,用于控制上下线

xxTimestamp依次记录服务实例注册时间戳,上线时间戳,最近续约时间戳,脏时间戳(后面解释),最后更新时间戳

注册表及相关依赖的结构体构建完成了,梳理一下所有概念和关系。注册表Registry中存放多个应用服务Application,每个应用服务又会有多个服务实例Instance,服务实例中存储服务的具体地址和其他信息。

功能目标:接受来自服务提交的注册信息,并保存到注册表中。先初始化注册表NewRegistry,根据提交信息构建实例NewInstance,然后进行注册写入。

funcNewRegistry*Registry{
registry:=Registry{
apps:make(map[string]*Application),
}
returnregistry
}
funcNewInstance(req*RequestRegister)*Instance{
now:=
instance:=Instance{
Env:,
AppId:,
Hostname:,
Addrs:,
Version:,
Status:,
RegTimestamp:now,
UpTimestamp:now,
RenewTimestamp:now,
DirtyTimestamp:now,
LatestTimestamp:now,
}
returninstance
}
r:=NewRegistry
instance:=NewInstance(req)
(instance,)

注册时,先从apps中查找是否已注册过,根据唯一标识key=appid+env确定。如果没有注册过,先新建应用app,然后将instance加入到app中,最后app放入注册表中。这里分别使用了读锁和写锁,保障数据安全同时,尽量减少锁时间和锁抢占影响。

func(r*Registry)Register(instance*Instance,latestTimestampint64)(*Application,*){
key:=getKey(,)

app,ok:=[key]

if!ok{//newapp
app=NewApplication()
}
//addinstance
_,isNew:=(instance,latestTimestamp)
ifisNew{//todo}
//addintoregistryapps

[key]=app

returnapp,nil
}

新建应用服务app,初始化instances

funcNewApplication(appidstring)*Application{
returnApplication{
appid:appid,
instances:make(map[string]*Instance),
}
}

将服务主机实例instance加入应用app中,注意判断是否已存在,存在根据脏时间戳DirtyTimestamp比对,是否进行替换,添加实例信息,更新最新时间latestTimestamp,并返回实例。

func(app*Application)AddInstance(in*Instance,latestTimestampint64)(*Instance,bool){


appIns,ok:=[]
ifok{//exist
=
//dirtytimestamp
{
("registerexistdirtytimestamp")
in=appIns
}
}
//addorupdateinstances
[]=in
(latestTimestamp)
returnIns:=new(Instance)
*returnIns=*in
returnreturnIns,!ok
}

返回!ok(isNew)表明,本次服务注册时,实例为新增还是替换,用来维护服务健康信息(后面会再次提到)。

服务注册完成了,编写测试用例看下效果。

varreq={AppId:"",Hostname:"myhost",Addrs:string{""},Status:1}
funcTestRegister(t*){
r:=
instance:=(req)
app,_:=(instance,)

}

功能目标:查找已注册的服务获取信息,可以指定条件查找,也可以全量查找。这里以指定过滤条件appid、env和status为例。

r:=
fetchData,err:=(,,,0)

根据appid和env组合成key,然后从注册表的apps中获取应用app,然后通过app获取服务实例GetInstance

func(r*Registry)Fetch(env,appidstring,statusuint32,latestTimeint64)(*FetchData,*){
app,ok:=(appid,env)
if!ok{
returnnil,
}
(status,latestTime)
}
func(r*Registry)getApplication(appid,envstring)(*Application,bool){
key:=getKey(appid,env)

app,ok:=[key]

returnapp,ok
}

根据app获取所有应用实例,并用status过滤,这里对返回结果instances中的Addr进行了拷贝返回一个新的切片。

func(app*Application)GetInstance(statusuint32,latestTimeint64)(*FetchData,*){


iflatestTime={
returnnil,
}
fetchData:=FetchData{
Instances:make([]*Instance,0),
LatestTimestamp:,
}
varexistsbool
for_,instance:={
{
exists=true
newInstance:=copyInstance(instance)

}
}
if!exists{
returnnil,
}
returnfetchData,nil
}
//deepcopy
funccopyInstance(src*Instance)*Instance{
dst:=new(Instance)
*dst=*src
//copyaddrs
=make(string,len())
fori,addr:={
[i]=addr
}
returndst
}

编写测试用例,先注册再获取,看到可以正常获取到信息。

功能目标:接受服务的下线请求,并将服务从注册信息列表中删除。通过传入env,appid,hostname三要素信息进行对应服务实例的取消。

r:=
(,,,0)

根据appid和env找到对象的app,然后删除app中对应的hostname。如果hostname后为空,那么将app从注册表中清除。

func(r*Registry)Cancel(env,appid,hostnamestring,latestTimestampint64)(*Instance,*){
("actioncancel")
//findapp
app,ok:=(appid,env)
if!ok{
returnnil,
}
instance,ok,insLen:=(hostname,latestTimestamp)
if!ok{
returnnil,
}
//ifinstancesisempty,deleteappfromapps
ifinsLen==0{

delete(,getKey(appid,env))

}
returninstance,nil
}
func(app*Application)Cancel(hostnamestring,latestTimestampint64)(*Instance,bool,int){
newInstance:=new(Instance)


appIn,ok:=[hostname]
if!ok{
returnnil,ok,0
}
//deletehostname
delete(,hostname)
=latestTimestamp
(latestTimestamp)
*newInstance=*appIn
returnnewInstance,true,len()
}

编写测试用例先注册,再取消,然后获取信息,发现404notfound。

功能目标:实现服务的健康检查机制,服务注册后,如果没有取消,那么就应该在注册表中,可以随时查到,如果某个服务实例挂了,能否自动的从注册表中删除,保障注册表中的服务实例都是正常的。

通常有两种方式做法:注册中心(服务端)主动探活,通过请求指定接口得到正常响应来确认;服务实例(客户端)主动上报,调用续约接口进行续约,续约设有时效TTL(timetolive)。两种方式各有优缺点,大家可以思考一下,不同的注册中心也采用了不同的方式,这里选型第二种方案。

r:=
(,,)

根据appid和env找到对象的app,再根据hostname找到对应主机实例,更新其RenewTimestamp为当前时间。

func(r*Registry)Renew(env,appid,hostnamestring)(*Instance,*){
app,ok:=(appid,env)
if!ok{
returnnil,
}
in,ok:=(hostname)
if!ok{
returnnil,
}
returnin,nil
}
func(app*Application)Renew(hostnamestring)(*Instance,bool){


appIn,ok:=[hostname]
if!ok{
returnnil,ok
}
=
returncopyInstance(appIn),true
}

功能目标:既然有服务定期续约,那么对应的如果服务没有续约呢?服务如果下线可以使用Cancel进行取消,但如果服务因为网络故障或挂了导致不能提供服务,那么可以通过检查它是否按时续约来判断,把TTL达到阈值的服务实例剔除(Cancel),实现服务的被动下线。

首先在新建注册表时开启一个定时任务,新启一个goroutine来实现。

funcNewRegistry*Registry{
++
}

配置定时检查的时间间隔,默认60秒,通过Tick定时器开启evict。

func(r*Registry)evictTask{
ticker:=()
for{
select{
case-ticker:

}
}
}

遍历注册表的所有apps,然后再遍历其中的instances,如果当前时间减去实例上一次续约时间达到阈值(默认90秒),那么将其加入过期队列中。这里并没有直接将过期队列所有实例都取消,考虑GC以及本地时间漂移的因素,设定了一个剔除的上限evictionLimit,随机剔除一些过期实例。

func(r*Registry)evict{
now:=
varexpiredInstances*Instance
apps:=
varregistryLenint
for_,app:=rangeapps{
registryLen+=
allInstances:=
for_,instance:=rangeallInstances{
(){
expiredInstances=app(expiredInstances,instance)
}
}
}
evictionLimit:=registryLen-int(float64(registryLen)*)
expiredLen:=len(expiredInstances)
ifexpiredLenevictionLimit{
expiredLen=evictionLimit
}

ifexpiredLen==0{
return
}
fori:=0;iexpiredLen;i++{
j:=i+(len(expiredInstances)-i)
expiredInstances[i],expiredInstances[j]=expiredInstances[j],expiredInstances[i]
expiredInstance:=expiredInstances[i]
(,,,now)
}
}

剔除上限数量,是通过当前注册表大小(注册表所有instances实例数)减去触发自我保护机制的阈值(当前注册表大小*保护自我机制比例值),保护机制稍后会具体解释。

剔除过期时,采用了Knuth-Shuffle算法,也叫公平洗牌算法来实现随机剔除。当然如果expiredLen=evictionLimit,随机剔除的意义不大,如果前者大于后者,随机剔除能最大程度保障,剔除的实例均匀分散到所有应用实例中,降低某服务被全部清空的风险。公平洗牌算法实现也比较简单,循环遍历过期列表,将当前数与特定随机数交换,和我们打牌时两两交换洗牌过程类似,它实现了O(n)的时间复杂度,由Knuth发明。

功能目标:既然服务会定期剔除超时未续约的服务,那么假设一种情况,网络一段时间发生了异常,所有服务都没成功续约,这时注册中心是否将所有服务全部剔除?当然不行!所以,我们需要一个自我保护的机制防止此类事情的发生。

怎么设计自我保护机制呢?按短时间内失败的比例达到某特定阈值就开启保护,保护模式下不进行服务剔除。所以我们需要一个统计模块,续约成功+1。默认情况下,服务剔除每60秒执行一次,服务续约每30秒执行一次,那么一个服务实例在检查时应该有2次续约。

typeGuardstruct{
renewCountint64
lastRenewCountint64
needRenewCountint64
thresholdint64

}

renewCount记录所有服务续约次数,每执行一次renew加1

lastRenewCount记录上一次检查周期(默认60秒)服务续约统计次数

needRenewCount记录一个周期总计需要的续约数,按一次续约30秒,一周期60秒,一个实例就需要2次,所以服务注册时+2,服务取消时-2

threshold通过needRenewCount和阈值比例(0.85)确定触发自我保护的值

func(gd*Guard)incrNeed{


+=int64(/)
=int64(float64()*)
}
func(gd*Guard)decrNeed{


=int64(/)
=int64(float64()*)
}
func(gd*Guard)setNeed(countint64){


=count*int64(/)
=int64(float64()*)
}
func(gd*Guard)incrCount{
(,1)
}

在注册表中增加Guard模块并初始化,在服务注册成功,服务取消,服务续约时操作统计。

typeRegistrystruct{
++gd*Guard
}
funcNewRegistry*Registry{
r:=Registry{
++gd:new(Guard),
}
}
func(r*Registry)Register(){
ifisNew{
++
}
}
func(r*Registry)Cancel(){
++
}
func(r*Registry)Renew(){
++
}

在服务剔除前进行上一周期计数统计,并判断是否达到自我保护开启状态。

func(gd*Guard)storeLastCount{
(,(,0))
}
func(gd*Guard)selfProtectStatusbool{
()()
}

如果开启自我保护,那么续约时间超过阈值(默认90秒)忽略不会剔除。但如果续约时间超过最大阈值(默认3600秒),那么不管是否开启保护都要剔除。因为自我保护只是保护短时间由于网络原因未续约的服务,长时间未续约大概率已经有问题了。

func(r*Registry)evictTask{
case-ticker:
++

}
}
func(r*Registry)evict{
delta:=
++if!protectStatusdeltaint64()||
deltaint64(){
expiredInstances=app(expiredInstances,instance)
}
}

思考下,服务续约比例未达到85%就会触发自我保护,还记不记得在服务剔除那块有一个剔除数量上限不能超过15%,这里就match了,否则还没来得及进入自我保护程序就把服务都剔除了。

最后增加一个定时器,如果超过一定时间(15分钟),重新计算下当前实例数,重置保护阈值,降低脏数据风险。

func(r*Registry)evictTask{
resetTicker:=()
for{
select{
case-resetTicker:
varcountint64
for_,app:={
count+=int64()
}
(count)
}
}
}

目前注册中心基本功能已实现,需要对外提供服务了,我们采用gin来实现一个web服务,接受http请求进行服务的注册、查找、续约、下线操作,这样保障注册中心可以方便的接受来自任何语言客户端请求。

funcmain{
//initconfig
c:=("c","","configfilepath")

config,err:=(*c)
iferr!=nil{
("loadconfigerror:",err)
return
}
//globaldiscovery
=(config)
//initrouterandstartserver
router:=
srv:={
Addr:,
Handler:router,
}
gofunc{
iferr:=;err!=nilerr!={
("listen:%s\n",err)
}
}
}

增加一个discovery结构,并开启一个全局变量,该结构中维护注册表Registry,然后就可以根据注册表实现各种操作了。

typeDiscoverystruct{
config*
protectedbool
Registry*Registry
}
funcNewDiscovery(config*)*Discovery{
dis:=Discovery{
protected:false,
config:config,
Registry:NewRegistry,//initregistry
}
returndis
}
//initdiscovery
varDiscovery*

绑定url路由和Handler,以注册为例,接受请求入参,调用进行注册,成功返回。

("api/register",)
funcRegisterHandler(c*){

ife:=(req);e!=nil{
err:=
(,{
"code":,
"message":,
})
return
}
//bindinstance
instance:=(req)
==0||{
err:=
(,{
"code":,
"message":,
})
return
}
//dirtytime
{
=
}
(instance,)
(,{
"code":200,
"message":"",
"data":"",
})
}

接着要实现平滑重启,在main启动时增加接收信号后关闭服务。

funcmain{
//
//gracefulrestart
quit:=make()
(quit,,,,)
-quit
("shutdowndiscoveryserver")
ctx,cancel:=(,5*)
defercancel
iferr:=(ctx);err!=nil{
("servershutdownerror:",err)
}
select{
:
("timeoutof5seconds")
}
("serverexiting")
}

实现效果如图所示:

使用gomodule管理依赖的三方包(gin和yaml)

api存放http服务路由以及对应处理函数

cmd存放编译入口main文件

configs存放全局配置和全局常量

global存放全局结构变量

model存放注册表结构模型及主要逻辑

总结与问题

注册中心功能实现

至此,一个单机版的注册中心就可以工作了,但生产环境单点肯定是不能容忍的,因此有必要实现一个注册中心集群。那么是否部署多个注册中心实例就可以了,当然.不行!这只能保障有多个注册中心节点,而每个节点中维护自己的注册表,那么就需要进行注册表数据同步。多节点数据同步又会涉及著名的一致性问题,这时Paxos、Raft、ZAB、Gossip等算法名词涌现,而我们将使用P2P(PeertoPeer)对等网络协议来实现。关于集群设计与实现我们将在后续文章中展开。

相关代码:

参考阅读

如何通过事务消息保障抢购业务的分布式一致性

下一代云原生消息流平台ApachePulsar消息保留和过期策略设计

构建通用WebSocket推送网关的设计与实践

JDK16对ZGC的增强

不要以DRY之名,发明低代码DSL去残害你的同事

技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。

高可用架构

改变互联网的构建方式