如何设计一个高性能网关_第1页
如何设计一个高性能网关_第2页
如何设计一个高性能网关_第3页
如何设计一个高性能网关_第4页
如何设计一个高性能网关_第5页
已阅读5页,还剩31页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

如何设计一个高性能网关一、背景最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台😤。二、设计2.1技术选型网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:Tomcat/Jetty+NIO+Servlet3Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。Netty+NIONetty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。后面发现Soul网关是基于SpringWebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用SpringWebFlux。网关的第二个特点是具备可扩展性,比如NetflixZuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。2.2需求清单首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:自定义路由规则可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。跨语言HTTP协议天生跨语言高性能Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。高可用支持集群模式防止单节点故障,无状态。灰度发布灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/Btesting,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。接口鉴权基于责任链模式,用户开发自己的鉴权插件即可。负载均衡支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。2.3架构设计在参考了一些优秀的网关Zuul,SpringCloudGateway,Soul后,将项目划分为以下几个模块。

它们之间的关系如图:网关设计注意:

这张图与实际实现有点出入,Nacospush到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。2.4表结构设计三、编码3.1ship-client-spring-boot-starter首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。搜索后端架构师公众号回复“架构整洁”,送你一份惊喜礼包。其核心类

AutoRegisterListener

就是在项目启动时做了两件事:1.将服务信息注册到Nacos注册中心2.通知ship-admin服务上线了并注册下线hook。在公众号后端顶级架构师后台回复“架构整洁”,获取一份惊喜礼包。代码如下:*

Created

by

2YSP

on

2020/12/21

*/

public

class

AutoRegisterListener

implements

ApplicationListener<ContextRefreshedEvent>

{

private

final

static

Logger

LOGGER

=

LoggerFactory.getLogger(AutoRegisterListener.class);

private

volatile

AtomicBoolean

registered

=

new

AtomicBoolean(false);

private

final

ClientConfigProperties

properties;

@NacosInjected

private

NamingService

namingService;

@Autowired

private

RequestMappingHandlerMapping

handlerMapping;

private

final

ExecutorService

pool;

/**

*

url

list

to

ignore

*/

private

static

List

ignoreUrlList

=

new

LinkedList<>();

static

{

ignoreUrlList.add("/error");

}

public

AutoRegisterListener(ClientConfigProperties

properties)

{

if

(!check(properties))

{

LOGGER.error("client

config

port,contextPath,appName

adminUrl

and

version

can't

be

empty!");

throw

new

ShipException("client

config

port,contextPath,appName

adminUrl

and

version

can't

be

empty!");

}

perties

=

properties;

pool

=

new

ThreadPoolExecutor(1,

4,

0,

TimeUnit.SECONDS,

new

LinkedBlockingQueue<>());

}

/**

*

check

the

ClientConfigProperties

*

*

@param

properties

*

@return

*/

private

boolean

check(ClientConfigProperties

properties)

{

if

(properties.getPort()

==

null|

properties.getContextPath()

==

null

|

properties.getVersion()

==

null|

properties.getAppName()

==

null

|

properties.getAdminUrl()

==

null)

{

return

false;

}

return

true;

}

@Override

public

void

onApplicationEvent(ContextRefreshedEvent

event)

{

if

(!pareAndSet(false,

true))

{

return;

}

doRegister();

registerShutDownHook();

}

/**

*

send

unregister

request

to

admin

when

jvm

shutdown

*/

private

void

registerShutDownHook()

{

final

String

url

=

"http://"

+

properties.getAdminUrl()

+

AdminConstants.UNREGISTER_PATH;

final

UnregisterAppDTO

unregisterAppDTO

=

new

UnregisterAppDTO();

unregisterAppDTO.setAppName(properties.getAppName());

unregisterAppDTO.setVersion(properties.getVersion());

unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());

unregisterAppDTO.setPort(properties.getPort());

Runtime.getRuntime().addShutdownHook(new

Thread(()

->

{

OkhttpTool.doPost(url,

unregisterAppDTO);

LOGGER.info("[{}:{}]

unregister

from

ship-admin

success!",

unregisterAppDTO.getAppName(),

unregisterAppDTO.getVersion());

}));

}

/**

*

register

all

interface

info

to

register

center

*/

private

void

doRegister()

{

Instance

instance

=

new

Instance();

instance.setIp(IpUtil.getLocalIpAddress());

instance.setPort(properties.getPort());

instance.setEphemeral(true);

Map

metadataMap

=

new

HashMap<>();

metadataMap.put("version",

properties.getVersion());

metadataMap.put("appName",

properties.getAppName());

instance.setMetadata(metadataMap);

try

{

namingService.registerInstance(properties.getAppName(),

NacosConstants.APP_GROUP_NAME,

instance);

}

catch

(NacosException

e)

{

LOGGER.error("register

to

nacos

fail",

e);

throw

new

ShipException(e.getErrCode(),

e.getErrMsg());

}

LOGGER.info("register

interface

info

to

nacos

success!");

//

send

register

request

to

ship-admin

String

url

=

"http://"

+

properties.getAdminUrl()

+

AdminConstants.REGISTER_PATH;

RegisterAppDTO

registerAppDTO

=

buildRegisterAppDTO(instance);

OkhttpTool.doPost(url,

registerAppDTO);

LOGGER.info("register

to

ship-admin

success!");

}

private

RegisterAppDTO

buildRegisterAppDTO(Instance

instance)

{

RegisterAppDTO

registerAppDTO

=

new

RegisterAppDTO();

registerAppDTO.setAppName(properties.getAppName());

registerAppDTO.setContextPath(properties.getContextPath());

registerAppDTO.setIp(instance.getIp());

registerAppDTO.setPort(instance.getPort());

registerAppDTO.setVersion(properties.getVersion());

return

registerAppDTO;

}

}3.2ship-servership-sever项目主要包括了两个部分内容,1.请求动态路由的主流程2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给pluginchain去链式处理。PluginFilter根据URL解析出appName,然后将启用的plugin组装成pluginchain。public

class

PluginFilter

implements

WebFilter

{

private

ServerConfigProperties

properties;

public

PluginFilter(ServerConfigProperties

properties)

{

perties

=

properties;

}

@Override

public

Mono

filter(ServerWebExchange

exchange,

WebFilterChain

chain)

{

String

appName

=

parseAppName(exchange);

if

(CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName)))

{

throw

new

ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);

}

PluginChain

pluginChain

=

new

PluginChain(properties,

appName);

pluginChain.addPlugin(new

DynamicRoutePlugin(properties));

pluginChain.addPlugin(new

AuthPlugin(properties));

return

pluginChain.execute(exchange,

pluginChain);

}

private

String

parseAppName(ServerWebExchange

exchange)

{

RequestPath

path

=

exchange.getRequest().getPath();

String

appName

=

path.value().split("/")[1];

return

appName;

}

}```

PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。

```java

*

@Author:

Ship

*

@Description:

*

@Date:

Created

in

2020/12/25

*/

public

class

PluginChain

extends

AbstractShipPlugin

{

/**

*

the

pos

point

to

current

plugin

*/

private

int

pos;

/**

*

the

plugins

of

chain

*/

private

List

plugins;

private

final

String

appName;

public

PluginChain(ServerConfigProperties

properties,

String

appName)

{

super(properties);

this.appName

=

appName;

}

/**

*

add

enabled

plugin

to

chain

*

*

@param

shipPlugin

*/

public

void

addPlugin(ShipPlugin

shipPlugin)

{

if

(plugins

==

null)

{

plugins

=

new

ArrayList<>();

}

if

(!PluginCache.isEnabled(appName,

shipP()))

{

return;

}

plugins.add(shipPlugin);

//

order

by

the

plugin's

order

plugins.sort(Cparing(ShipPlugin::order));

}

@Override

public

Integer

order()

{

return

null;

}

@Override

public

String

name()

{

return

null;

}

@Override

public

Mono

execute(ServerWebExchange

exchange,

PluginChain

pluginChain)

{

if

(pos

==

plugins.size())

{

return

exchange.getResponse().setComplete();

}

return

pluginChain.plugins.get(pos++).execute(exchange,

pluginChain);

}

public

String

getAppName()

{

return

appName;

}

}AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。public

abstract

class

AbstractShipPlugin

implements

ShipPlugin

{

protected

ServerConfigProperties

properties;

public

AbstractShipPlugin(ServerConfigProperties

properties)

{

perties

=

properties;

}

}```

ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。

```java

public

interface

ShipPlugin

{

/**

*

lower

values

have

higher

priority

*

*

@return

*/

Integer

order();

/**

*

return

current

plugin

name

*

*

@return

*/

String

name();

Mono

execute(ServerWebExchange

exchange,PluginChain

pluginChain);

}```

DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。

```java

*

@Author:

Ship

*

@Description:

*

@Date:

Created

in

2020/12/25

*/

public

class

DynamicRoutePlugin

extends

AbstractShipPlugin

{

private

final

static

Logger

LOGGER

=

LoggerFactory.getLogger(DynamicRoutePlugin.class);

private

static

WebClient

webClient;

private

static

final

Gson

gson

=

new

GsonBuilder().create();

static

{

HttpClient

httpClient

=

HttpClient.create()

.tcpConfiguration(client

->

client.doOnConnected(conn

->

conn.addHandlerLast(new

ReadTimeoutHandler(3))

.addHandlerLast(new

WriteTimeoutHandler(3)))

.option(ChannelOption.TCP_NODELAY,

true)

);

webClient

=

WebClient.builder().clientConnector(new

ReactorClientHttpConnector(httpClient))

.build();

}

public

DynamicRoutePlugin(ServerConfigProperties

properties)

{

super(properties);

}

@Override

public

Integer

order()

{

return

ShipPluginEnum.DYNAMIC_ROUTE.getOrder();

}

@Override

public

String

name()

{

return

ShipPluginEnum.DYNAMIC_ROUTE.getName();

}

@Override

public

Mono

execute(ServerWebExchange

exchange,

PluginChain

pluginChain)

{

String

appName

=

pluginChain.getAppName();

ServiceInstance

serviceInstance

=

chooseInstance(appName,

exchange.getRequest());

//

LOGGER.info("selected

instance

is

[{}]",

gson.toJson(serviceInstance));

//

request

service

String

url

=

buildUrl(exchange,

serviceInstance);

return

forward(exchange,

url);

}

/**

*

forward

request

to

backend

service

*

*

@param

exchange

*

@param

url

*

@return

*/

private

Mono

forward(ServerWebExchange

exchange,

String

url)

{

ServerHttpRequest

request

=

exchange.getRequest();

ServerHttpResponse

response

=

exchange.getResponse();

HttpMethod

method

=

request.getMethod();

WebClient.RequestBodySpec

requestBodySpec

=

webClient.method(method).uri(url).headers((headers)

->

{

headers.addAll(request.getHeaders());

});

WebClient.RequestHeadersSpec

reqHeadersSpec;

if

(requireHttpBody(method))

{

reqHeadersSpec

=

requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));

}

else

{

reqHeadersSpec

=

requestBodySpec;

}

//

nio->callback->nio

return

reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))

.onErrorResume(ex

->

{

return

Mono.defer(()

->

{

String

errorResultJson

=

"";

if

(ex

instanceof

TimeoutException)

{

errorResultJson

=

"{\"code\":5001,\"message\":\"network

timeout\"}";

}

else

{

errorResultJson

=

"{\"code\":5000,\"message\":\"system

error\"}";

}

return

ShipResponseUtil.doResponse(exchange,

errorResultJson);

}).then(Mono.empty());

}).flatMap(backendResponse

->

{

response.setStatusCode(backendResponse.statusCode());

response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());

return

response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));

});

}

/**

*

weather

the

http

method

need

http

body

*

*

@param

method

*

@return

*/

private

boolean

requireHttpBody(HttpMethod

method)

{

if

(method.equals(HttpMethod.POST)|

method.equals(HttpMethod.PUT)|

method.equals(HttpMethod.PATCH))

{

return

true;

}

return

false;

}

private

String

buildUrl(ServerWebExchange

exchange,

ServiceInstance

serviceInstance)

{

ServerHttpRequest

request

=

exchange.getRequest();

String

query

=

request.getURI().getQuery();

String

path

=

request.getPath().value().replaceFirst("/"

+

serviceInstance.getAppName(),

"");

String

url

=

"http://"

+

serviceInstance.getIp()

+

":"

+

serviceInstance.getPort()

+

path;

if

(!StringUtils.isEmpty(query))

{

url

=

url

+

"?"

+

query;

}

return

url;

}

/**

*

choose

an

ServiceInstance

according

to

route

rule

config

and

load

balancing

algorithm

*

*

@param

appName

*

@param

request

*

@return

*/

private

ServiceInstance

chooseInstance(String

appName,

ServerHttpRequest

request)

{

List

serviceInstances

=

ServiceCache.getAllInstances(appName);

if

(CollectionUtils.isEmpty(serviceInstances))

{

LOGGER.error("service

instance

of

{}

not

find",

appName);

throw

new

ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);

}

String

version

=

matchAppVersion(appName,

request);

if

(StringUtils.isEmpty(version))

{

throw

new

ShipException("match

app

version

error");

}

//

filter

serviceInstances

by

version

List

instances

=

serviceInstances.stream().filter(i

->

i.getVersion().equals(version)).collect(Collectors.toList());

//Select

an

instance

based

on

the

load

balancing

algorithm

LoadBalance

loadBalance

=

LoadBalanceFactory.getInstance(properties.getLoadBalance(),

appName,

version);

ServiceInstance

serviceInstance

=

loadBalance.chooseOne(instances);

return

serviceInstance;

}

private

String

matchAppVersion(String

appName,

ServerHttpRequest

request)

{

List

rules

=

RouteRuleCache.getRules(appName);

rules.sort(Cparing(AppRuleDTO::getPriority).reversed());

for

(AppRuleDTO

rule

:

rules)

{

if

(match(rule,

request))

{

return

rule.getVersion();

}

}

return

null;

}

private

boolean

match(AppRuleDTO

rule,

ServerHttpRequest

request)

{

String

matchObject

=

rule.getMatchObject();

String

matchKey

=

rule.getMatchKey();

String

matchRule

=

rule.getMatchRule();

Byte

matchMethod

=

rule.getMatchMethod();

if

(MatchObjectEnum.DEFAULT.getCode().equals(matchObject))

{

return

true;

}

else

if

(MatchObjectEnum.QUERY.getCode().equals(matchObject))

{

String

param

=

request.getQueryParams().getFirst(matchKey);

if

(!StringUtils.isEmpty(param))

{

return

StringTools.match(param,

matchMethod,

matchRule);

}

}

else

if

(MatchObjectEnum.HEADER.getCode().equals(matchObject))

{

HttpHeaders

headers

=

request.getHeaders();

String

headerValue

=

headers.getFirst(matchKey);

if

(!StringUtils.isEmpty(headerValue))

{

return

StringTools.match(headerValue,

matchMethod,

matchRule);

}

}

return

false;

}

}3.3数据同步app数据同步后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。对应代码ship-admin的NacosSyncListener*

@Author:

Ship

*

@Description:

*

@Date:

Created

in

2020/12/30

*/

@Configuration

public

class

NacosSyncListener

implements

ApplicationListener<ContextRefreshedEvent>

{

private

static

final

Logger

LOGGER

=

LoggerFactory.getLogger(NacosSyncListener.class);

private

static

ScheduledThreadPoolExecutor

scheduledPool

=

new

ScheduledThreadPoolExecutor(1,

new

ShipThreadFactory("nacos-sync",

true).create());

@NacosInjected

private

NamingService

namingService;

@Value("${nacos.discovery.server-addr}")

private

String

baseUrl;

@Resource

private

AppService

appService;

@Override

public

void

onApplicationEvent(ContextRefreshedEvent

event)

{

if

(event.getApplicationContext().getParent()

!=

null)

{

return;

}

String

url

=

"http://"

+

baseUrl

+

NacosConstants.INSTANCE_UPDATE_PATH;

scheduledPool.scheduleWithFixedDelay(new

NacosSyncTask(namingService,

url,

appService),

0,

30L,

TimeUnit.SECONDS);

}

class

NacosSyncTask

implements

Runnable

{

private

NamingService

namingService;

private

String

url;

private

AppService

appService;

private

Gson

gson

=

new

GsonBuilder().create();

public

NacosSyncTask(NamingService

namingService,

String

url,

AppService

appService)

{

this.namingService

=

namingService;

this.url

=

url;

this.appService

=

appService;

}

/**

*

Regular

update

weight,enabled

plugins

to

nacos

instance

*/

@Override

public

void

run()

{

try

{

//

get

all

app

names

ListView

services

=

namingService.getServicesOfServer(1,

Integer.MAX_VALUE,

NacosConstants.APP_GROUP_NAME);

if

(CollectionUtils.isEmpty(services.getData()))

{

return;

}

List

appNames

=

services.getData();

List

appInfos

=

appService.getAppInfos(appNames);

for

(AppInfoDTO

appInfo

:

appInfos)

{

if

(CollectionUtils.isEmpty(appInfo.getInstances()))

{

continue;

}

for

(ServiceInstance

instance

:

appInfo.getInstances())

{

Map

queryMap

=

buildQueryMap(appInfo,

instance);

String

resp

=

OkhttpTool.doPut(url,

queryMap,

"");

LOGGER.debug("response

:{}",

resp);

}

}

}

catch

(Exception

e)

{

LOGGER.error("nacos

sync

task

error",

e);

}

}

private

Map

buildQueryMap(AppInfoDTO

appInfo,

ServiceInstance

instance)

{

Map

map

=

new

HashMap<>();

map.put("serviceName",

appInfo.getAppName());

map.put("groupName",

NacosConstants.APP_GROUP_NAME);

map.put("ip",

instance.getIp());

map.put("port",

instance.getPort());

map.put("weight",

instance.getWeight().doubleValue());

NacosMetadata

metadata

=

new

NacosMetadata();

metadata.setAppName(appInfo.getAppName());

metadata.setVersion(instance.getVersion());

metadata.setPlugins(String.join(",",

appInfo.getEnabledPlugins()));

map.put("metadata",

StringTools.urlEncode(gson.toJson(metadata)));

map.put("ephemeral",

true);

return

map;

}

}

}ship-server再定时从Nacos拉取app数据更新到本地Map缓存。*

@Author:

Ship

*

@Description:

sync

data

to

local

cache

*

@Date:

Created

in

2020/12/25

*/

@Configuration

public

class

DataSyncTaskListener

implements

ApplicationListener<ContextRefreshedEvent>

{

private

static

ScheduledThreadPoolExecutor

scheduledPool

=

new

ScheduledThreadPoolExecutor(1,

new

ShipThreadFactory("service-sync",

true).create());

@NacosInjected

private

NamingService

namingService;

@Autowired

private

ServerConfigProperties

properties;

@Override

public

void

onApplicationEvent(ContextRefreshedEvent

event)

{

if

(event.getApplicationContext().getParent()

!=

null)

{

return;

}

scheduledPool.scheduleWithFixedDelay(new

DataSyncTask(namingService)

,

0L,

properties.getCacheRefreshInterval(),

TimeUnit.SECONDS);

WebsocketSyncCacheServer

websocketSyncCacheServer

=

new

WebsocketSyncCacheServer(properties.getWebSocketPort());

websocketSyncCacheServer.start();

}

class

DataSyncTask

implements

Runnable

{

private

NamingService

namingService;

public

DataSyncTask(NamingService

namingService)

{

this.namingService

=

namingService;

}

@Override

public

void

run()

{

try

{

//

get

all

app

names

ListView

services

=

namingService.getServicesOfServer(1,

Integer.MAX_VALUE,

NacosConstants.APP_GROUP_NAME);

if

(CollectionUtils.isEmpty(services.getData()))

{

return;

}

List

appNames

=

services.getData();

//

get

all

instances

for

(String

appName

:

appNames)

{

List

instanceList

=

namingService.getAllInstances(appName,

NacosConstants.APP_GROUP_NAME);

if

(CollectionUtils.isEmpty(instanceList))

{

continue;

}

ServiceCache.add(appName,

buildServiceInstances(instanceList));

List

pluginNames

=

getEnabledPlugins(instanceList);

PluginCache.add(appName,

pluginNames);

}

ServiceCache.removeExpired(appNames);

PluginCache.removeExpired(appNames);

}

catch

(NacosException

e)

{

e.printStackTrace();

}

}

private

List

getEnabledPlugins(List

instanceList)

{

Instance

instance

=

instanceList.get(0);

Map

metadata

=

instance.getMetadata();

//

plugins:

DynamicRoute,Auth

String

plugins

=

metadata.getOrDefault("plugins",

ShipPluginEnum.DYNAMIC_ROUTE.getName());

return

Arrays.stream(plugins.split(",")).collect(Collectors.toList());

}

private

List

buildServiceInstances(List

instanceList)

{

List

list

=

new

LinkedList<>();

instanceList.forEach(instance

->

{

Map

metadata

=

instance.getMetadata();

ServiceInstance

serviceInstance

=

new

ServiceInstance();

serviceInstance.setAppName(metadata.get("appName"));

serviceInstance.setIp(instance.getIp());

serviceInstance.setPort(instance.getPort());

serviceInstance.setVersion(metadata.get("version"));

serviceInstance.setWeight((int)

instance.getWeight());

list.add(serviceInstance);

});

return

list;

}

}

}路由规则数据同步同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。在公众号顶级顶级架构师后台回复“架构”,获取一份惊喜礼包。服务端WebsocketSyncCacheServer:*

@Author:

Ship

*

@Description:

*

@Date:

Created

in

2020/12/28

*/

public

class

WebsocketSyncCacheServer

extends

WebSocketServer

{

private

final

static

Logger

LOGGER

=

LoggerFactory.getLogger(WebsocketSyncCacheServer.class);

private

Gson

gson

=

new

GsonBuilder().create();

private

MessageHandler

messageHandler;

public

WebsocketSyncCacheServer(Integer

port)

{

super(new

InetSocketAddress(port));

this.messageHandler

=

new

MessageHandler();

}

@Override

public

void

onOpen(WebSocket

webSocket,

ClientHandshake

clientHandshake)

{

LOGGER.info("server

is

open");

}

@Override

public

void

onClose(WebSocket

webSocket,

int

i,

String

s,

boolean

b)

{

LOGGER.info("websocket

server

close...");

}

@Override

public

void

onMessage(WebSocket

webSocket,

String

message)

{

LOGGER.info("websocket

server

receive

message:\n[{}]",

message);

this.messageHandler.handler(message);

}

@Override

public

void

onError(WebSocket

webSocket,

Exception

e)

{

}

@Override

public

void

onStart()

{

LOGGER.info("websocket

server

start...");

}

class

MessageHandler

{

public

void

handler(String

message)

{

RouteRuleOperationDTO

operationDTO

=

gson.fromJson(message,

RouteRuleOperationDTO.class);

if

(CollectionUtils.isEmpty(operationDTO.getRuleList()))

{

return;

}

Map>

map

=

operationDTO.getRuleList()

.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));

if

(OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())

|

OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType()))

{

RouteRuleCache.add(map);

}

else

if

(OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType()))

{

RouteRuleCache.remove(map);

}

}

}

}客户端WebsocketSyncCacheClient:*

@Author:

Ship

*

@Description:

*

@Date:

Created

in

2020/12/28

*/

@Component

public

class

WebsocketSyncCacheClient

{

private

final

static

Logger

LOGGER

=

LoggerFactory.getLogger(WebsocketSyncCacheClient.class);

private

WebSocketClient

client;

private

RuleService

ruleService;

private

Gson

gson

=

new

GsonBuilder().create();

public

WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}")

String

serverWebSocketUrl,

RuleService

ruleService)

{

if

(StringUtils.isEmpty(serverWebSocketUrl))

{

throw

new

ShipException(ShipExceptionEnum.CONFIG_ERROR);

}

this.ruleService

=

ruleService;

ScheduledThreadPoolExecutor

executor

=

new

ScheduledThreadPoolExecutor(1,

new

ShipThreadFactory("websocket-connect",

true).create());

try

{

client

=

new

WebSocketClient(new

URI(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论