版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
如何设计一个高性能网关一、背景最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台😤。二、设计2.1技术选型网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:Tomcat/Jetty+NIO+Servlet3Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。Netty+NIONetty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。后面发现Soul网关是基于SpringWebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用SpringWebFlux。网关的第二个特点是具备可扩展性,比如NetflixZuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。2.2需求清单首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:自定义路由规则可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。跨语言HTTP协议天生跨语言高性能Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。高可用支持集群模式防止单节点故障,无状态。灰度发布灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/Btesting,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。接口鉴权基于责任链模式,用户开发自己的鉴权插件即可。负载均衡支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。2.3架构设计在参考了一些优秀的网关Zuul,SpringCloudGateway,Soul后,将项目划分为以下几个模块。
它们之间的关系如图:网关设计注意:
这张图与实际实现有点出入,Nacospush到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。2.4表结构设计三、编码3.1ship-client-spring-boot-starter首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。搜索后端架构师公众号回复“架构整洁”,送你一份惊喜礼包。其核心类
AutoRegisterListener
就是在项目启动时做了两件事:1.将服务信息注册到Nacos注册中心2.通知ship-admin服务上线了并注册下线hook。在公众号后端顶级架构师后台回复“架构整洁”,获取一份惊喜礼包。代码如下:*
Created
by
2YSP
on
2020/12/21
*/
public
class
AutoRegisterListener
implements
ApplicationListener<ContextRefreshedEvent>
{
private
final
static
Logger
LOGGER
=
LoggerFactory.getLogger(AutoRegisterListener.class);
private
volatile
AtomicBoolean
registered
=
new
AtomicBoolean(false);
private
final
ClientConfigProperties
properties;
@NacosInjected
private
NamingService
namingService;
@Autowired
private
RequestMappingHandlerMapping
handlerMapping;
private
final
ExecutorService
pool;
/**
*
url
list
to
ignore
*/
private
static
List
ignoreUrlList
=
new
LinkedList<>();
static
{
ignoreUrlList.add("/error");
}
public
AutoRegisterListener(ClientConfigProperties
properties)
{
if
(!check(properties))
{
LOGGER.error("client
config
port,contextPath,appName
adminUrl
and
version
can't
be
empty!");
throw
new
ShipException("client
config
port,contextPath,appName
adminUrl
and
version
can't
be
empty!");
}
perties
=
properties;
pool
=
new
ThreadPoolExecutor(1,
4,
0,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<>());
}
/**
*
check
the
ClientConfigProperties
*
*
@param
properties
*
@return
*/
private
boolean
check(ClientConfigProperties
properties)
{
if
(properties.getPort()
==
null|
properties.getContextPath()
==
null
|
properties.getVersion()
==
null|
properties.getAppName()
==
null
|
properties.getAdminUrl()
==
null)
{
return
false;
}
return
true;
}
@Override
public
void
onApplicationEvent(ContextRefreshedEvent
event)
{
if
(!pareAndSet(false,
true))
{
return;
}
doRegister();
registerShutDownHook();
}
/**
*
send
unregister
request
to
admin
when
jvm
shutdown
*/
private
void
registerShutDownHook()
{
final
String
url
=
"http://"
+
properties.getAdminUrl()
+
AdminConstants.UNREGISTER_PATH;
final
UnregisterAppDTO
unregisterAppDTO
=
new
UnregisterAppDTO();
unregisterAppDTO.setAppName(properties.getAppName());
unregisterAppDTO.setVersion(properties.getVersion());
unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
unregisterAppDTO.setPort(properties.getPort());
Runtime.getRuntime().addShutdownHook(new
Thread(()
->
{
OkhttpTool.doPost(url,
unregisterAppDTO);
LOGGER.info("[{}:{}]
unregister
from
ship-admin
success!",
unregisterAppDTO.getAppName(),
unregisterAppDTO.getVersion());
}));
}
/**
*
register
all
interface
info
to
register
center
*/
private
void
doRegister()
{
Instance
instance
=
new
Instance();
instance.setIp(IpUtil.getLocalIpAddress());
instance.setPort(properties.getPort());
instance.setEphemeral(true);
Map
metadataMap
=
new
HashMap<>();
metadataMap.put("version",
properties.getVersion());
metadataMap.put("appName",
properties.getAppName());
instance.setMetadata(metadataMap);
try
{
namingService.registerInstance(properties.getAppName(),
NacosConstants.APP_GROUP_NAME,
instance);
}
catch
(NacosException
e)
{
LOGGER.error("register
to
nacos
fail",
e);
throw
new
ShipException(e.getErrCode(),
e.getErrMsg());
}
LOGGER.info("register
interface
info
to
nacos
success!");
//
send
register
request
to
ship-admin
String
url
=
"http://"
+
properties.getAdminUrl()
+
AdminConstants.REGISTER_PATH;
RegisterAppDTO
registerAppDTO
=
buildRegisterAppDTO(instance);
OkhttpTool.doPost(url,
registerAppDTO);
LOGGER.info("register
to
ship-admin
success!");
}
private
RegisterAppDTO
buildRegisterAppDTO(Instance
instance)
{
RegisterAppDTO
registerAppDTO
=
new
RegisterAppDTO();
registerAppDTO.setAppName(properties.getAppName());
registerAppDTO.setContextPath(properties.getContextPath());
registerAppDTO.setIp(instance.getIp());
registerAppDTO.setPort(instance.getPort());
registerAppDTO.setVersion(properties.getVersion());
return
registerAppDTO;
}
}3.2ship-servership-sever项目主要包括了两个部分内容,1.请求动态路由的主流程2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给pluginchain去链式处理。PluginFilter根据URL解析出appName,然后将启用的plugin组装成pluginchain。public
class
PluginFilter
implements
WebFilter
{
private
ServerConfigProperties
properties;
public
PluginFilter(ServerConfigProperties
properties)
{
perties
=
properties;
}
@Override
public
Mono
filter(ServerWebExchange
exchange,
WebFilterChain
chain)
{
String
appName
=
parseAppName(exchange);
if
(CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName)))
{
throw
new
ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
}
PluginChain
pluginChain
=
new
PluginChain(properties,
appName);
pluginChain.addPlugin(new
DynamicRoutePlugin(properties));
pluginChain.addPlugin(new
AuthPlugin(properties));
return
pluginChain.execute(exchange,
pluginChain);
}
private
String
parseAppName(ServerWebExchange
exchange)
{
RequestPath
path
=
exchange.getRequest().getPath();
String
appName
=
path.value().split("/")[1];
return
appName;
}
}```
PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。
```java
*
@Author:
Ship
*
@Description:
*
@Date:
Created
in
2020/12/25
*/
public
class
PluginChain
extends
AbstractShipPlugin
{
/**
*
the
pos
point
to
current
plugin
*/
private
int
pos;
/**
*
the
plugins
of
chain
*/
private
List
plugins;
private
final
String
appName;
public
PluginChain(ServerConfigProperties
properties,
String
appName)
{
super(properties);
this.appName
=
appName;
}
/**
*
add
enabled
plugin
to
chain
*
*
@param
shipPlugin
*/
public
void
addPlugin(ShipPlugin
shipPlugin)
{
if
(plugins
==
null)
{
plugins
=
new
ArrayList<>();
}
if
(!PluginCache.isEnabled(appName,
shipP()))
{
return;
}
plugins.add(shipPlugin);
//
order
by
the
plugin's
order
plugins.sort(Cparing(ShipPlugin::order));
}
@Override
public
Integer
order()
{
return
null;
}
@Override
public
String
name()
{
return
null;
}
@Override
public
Mono
execute(ServerWebExchange
exchange,
PluginChain
pluginChain)
{
if
(pos
==
plugins.size())
{
return
exchange.getResponse().setComplete();
}
return
pluginChain.plugins.get(pos++).execute(exchange,
pluginChain);
}
public
String
getAppName()
{
return
appName;
}
}AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。public
abstract
class
AbstractShipPlugin
implements
ShipPlugin
{
protected
ServerConfigProperties
properties;
public
AbstractShipPlugin(ServerConfigProperties
properties)
{
perties
=
properties;
}
}```
ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。
```java
public
interface
ShipPlugin
{
/**
*
lower
values
have
higher
priority
*
*
@return
*/
Integer
order();
/**
*
return
current
plugin
name
*
*
@return
*/
String
name();
Mono
execute(ServerWebExchange
exchange,PluginChain
pluginChain);
}```
DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。
```java
*
@Author:
Ship
*
@Description:
*
@Date:
Created
in
2020/12/25
*/
public
class
DynamicRoutePlugin
extends
AbstractShipPlugin
{
private
final
static
Logger
LOGGER
=
LoggerFactory.getLogger(DynamicRoutePlugin.class);
private
static
WebClient
webClient;
private
static
final
Gson
gson
=
new
GsonBuilder().create();
static
{
HttpClient
httpClient
=
HttpClient.create()
.tcpConfiguration(client
->
client.doOnConnected(conn
->
conn.addHandlerLast(new
ReadTimeoutHandler(3))
.addHandlerLast(new
WriteTimeoutHandler(3)))
.option(ChannelOption.TCP_NODELAY,
true)
);
webClient
=
WebClient.builder().clientConnector(new
ReactorClientHttpConnector(httpClient))
.build();
}
public
DynamicRoutePlugin(ServerConfigProperties
properties)
{
super(properties);
}
@Override
public
Integer
order()
{
return
ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
}
@Override
public
String
name()
{
return
ShipPluginEnum.DYNAMIC_ROUTE.getName();
}
@Override
public
Mono
execute(ServerWebExchange
exchange,
PluginChain
pluginChain)
{
String
appName
=
pluginChain.getAppName();
ServiceInstance
serviceInstance
=
chooseInstance(appName,
exchange.getRequest());
//
LOGGER.info("selected
instance
is
[{}]",
gson.toJson(serviceInstance));
//
request
service
String
url
=
buildUrl(exchange,
serviceInstance);
return
forward(exchange,
url);
}
/**
*
forward
request
to
backend
service
*
*
@param
exchange
*
@param
url
*
@return
*/
private
Mono
forward(ServerWebExchange
exchange,
String
url)
{
ServerHttpRequest
request
=
exchange.getRequest();
ServerHttpResponse
response
=
exchange.getResponse();
HttpMethod
method
=
request.getMethod();
WebClient.RequestBodySpec
requestBodySpec
=
webClient.method(method).uri(url).headers((headers)
->
{
headers.addAll(request.getHeaders());
});
WebClient.RequestHeadersSpec
reqHeadersSpec;
if
(requireHttpBody(method))
{
reqHeadersSpec
=
requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
}
else
{
reqHeadersSpec
=
requestBodySpec;
}
//
nio->callback->nio
return
reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
.onErrorResume(ex
->
{
return
Mono.defer(()
->
{
String
errorResultJson
=
"";
if
(ex
instanceof
TimeoutException)
{
errorResultJson
=
"{\"code\":5001,\"message\":\"network
timeout\"}";
}
else
{
errorResultJson
=
"{\"code\":5000,\"message\":\"system
error\"}";
}
return
ShipResponseUtil.doResponse(exchange,
errorResultJson);
}).then(Mono.empty());
}).flatMap(backendResponse
->
{
response.setStatusCode(backendResponse.statusCode());
response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
return
response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
});
}
/**
*
weather
the
http
method
need
http
body
*
*
@param
method
*
@return
*/
private
boolean
requireHttpBody(HttpMethod
method)
{
if
(method.equals(HttpMethod.POST)|
method.equals(HttpMethod.PUT)|
method.equals(HttpMethod.PATCH))
{
return
true;
}
return
false;
}
private
String
buildUrl(ServerWebExchange
exchange,
ServiceInstance
serviceInstance)
{
ServerHttpRequest
request
=
exchange.getRequest();
String
query
=
request.getURI().getQuery();
String
path
=
request.getPath().value().replaceFirst("/"
+
serviceInstance.getAppName(),
"");
String
url
=
"http://"
+
serviceInstance.getIp()
+
":"
+
serviceInstance.getPort()
+
path;
if
(!StringUtils.isEmpty(query))
{
url
=
url
+
"?"
+
query;
}
return
url;
}
/**
*
choose
an
ServiceInstance
according
to
route
rule
config
and
load
balancing
algorithm
*
*
@param
appName
*
@param
request
*
@return
*/
private
ServiceInstance
chooseInstance(String
appName,
ServerHttpRequest
request)
{
List
serviceInstances
=
ServiceCache.getAllInstances(appName);
if
(CollectionUtils.isEmpty(serviceInstances))
{
LOGGER.error("service
instance
of
{}
not
find",
appName);
throw
new
ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
}
String
version
=
matchAppVersion(appName,
request);
if
(StringUtils.isEmpty(version))
{
throw
new
ShipException("match
app
version
error");
}
//
filter
serviceInstances
by
version
List
instances
=
serviceInstances.stream().filter(i
->
i.getVersion().equals(version)).collect(Collectors.toList());
//Select
an
instance
based
on
the
load
balancing
algorithm
LoadBalance
loadBalance
=
LoadBalanceFactory.getInstance(properties.getLoadBalance(),
appName,
version);
ServiceInstance
serviceInstance
=
loadBalance.chooseOne(instances);
return
serviceInstance;
}
private
String
matchAppVersion(String
appName,
ServerHttpRequest
request)
{
List
rules
=
RouteRuleCache.getRules(appName);
rules.sort(Cparing(AppRuleDTO::getPriority).reversed());
for
(AppRuleDTO
rule
:
rules)
{
if
(match(rule,
request))
{
return
rule.getVersion();
}
}
return
null;
}
private
boolean
match(AppRuleDTO
rule,
ServerHttpRequest
request)
{
String
matchObject
=
rule.getMatchObject();
String
matchKey
=
rule.getMatchKey();
String
matchRule
=
rule.getMatchRule();
Byte
matchMethod
=
rule.getMatchMethod();
if
(MatchObjectEnum.DEFAULT.getCode().equals(matchObject))
{
return
true;
}
else
if
(MatchObjectEnum.QUERY.getCode().equals(matchObject))
{
String
param
=
request.getQueryParams().getFirst(matchKey);
if
(!StringUtils.isEmpty(param))
{
return
StringTools.match(param,
matchMethod,
matchRule);
}
}
else
if
(MatchObjectEnum.HEADER.getCode().equals(matchObject))
{
HttpHeaders
headers
=
request.getHeaders();
String
headerValue
=
headers.getFirst(matchKey);
if
(!StringUtils.isEmpty(headerValue))
{
return
StringTools.match(headerValue,
matchMethod,
matchRule);
}
}
return
false;
}
}3.3数据同步app数据同步后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。对应代码ship-admin的NacosSyncListener*
@Author:
Ship
*
@Description:
*
@Date:
Created
in
2020/12/30
*/
@Configuration
public
class
NacosSyncListener
implements
ApplicationListener<ContextRefreshedEvent>
{
private
static
final
Logger
LOGGER
=
LoggerFactory.getLogger(NacosSyncListener.class);
private
static
ScheduledThreadPoolExecutor
scheduledPool
=
new
ScheduledThreadPoolExecutor(1,
new
ShipThreadFactory("nacos-sync",
true).create());
@NacosInjected
private
NamingService
namingService;
@Value("${nacos.discovery.server-addr}")
private
String
baseUrl;
@Resource
private
AppService
appService;
@Override
public
void
onApplicationEvent(ContextRefreshedEvent
event)
{
if
(event.getApplicationContext().getParent()
!=
null)
{
return;
}
String
url
=
"http://"
+
baseUrl
+
NacosConstants.INSTANCE_UPDATE_PATH;
scheduledPool.scheduleWithFixedDelay(new
NacosSyncTask(namingService,
url,
appService),
0,
30L,
TimeUnit.SECONDS);
}
class
NacosSyncTask
implements
Runnable
{
private
NamingService
namingService;
private
String
url;
private
AppService
appService;
private
Gson
gson
=
new
GsonBuilder().create();
public
NacosSyncTask(NamingService
namingService,
String
url,
AppService
appService)
{
this.namingService
=
namingService;
this.url
=
url;
this.appService
=
appService;
}
/**
*
Regular
update
weight,enabled
plugins
to
nacos
instance
*/
@Override
public
void
run()
{
try
{
//
get
all
app
names
ListView
services
=
namingService.getServicesOfServer(1,
Integer.MAX_VALUE,
NacosConstants.APP_GROUP_NAME);
if
(CollectionUtils.isEmpty(services.getData()))
{
return;
}
List
appNames
=
services.getData();
List
appInfos
=
appService.getAppInfos(appNames);
for
(AppInfoDTO
appInfo
:
appInfos)
{
if
(CollectionUtils.isEmpty(appInfo.getInstances()))
{
continue;
}
for
(ServiceInstance
instance
:
appInfo.getInstances())
{
Map
queryMap
=
buildQueryMap(appInfo,
instance);
String
resp
=
OkhttpTool.doPut(url,
queryMap,
"");
LOGGER.debug("response
:{}",
resp);
}
}
}
catch
(Exception
e)
{
LOGGER.error("nacos
sync
task
error",
e);
}
}
private
Map
buildQueryMap(AppInfoDTO
appInfo,
ServiceInstance
instance)
{
Map
map
=
new
HashMap<>();
map.put("serviceName",
appInfo.getAppName());
map.put("groupName",
NacosConstants.APP_GROUP_NAME);
map.put("ip",
instance.getIp());
map.put("port",
instance.getPort());
map.put("weight",
instance.getWeight().doubleValue());
NacosMetadata
metadata
=
new
NacosMetadata();
metadata.setAppName(appInfo.getAppName());
metadata.setVersion(instance.getVersion());
metadata.setPlugins(String.join(",",
appInfo.getEnabledPlugins()));
map.put("metadata",
StringTools.urlEncode(gson.toJson(metadata)));
map.put("ephemeral",
true);
return
map;
}
}
}ship-server再定时从Nacos拉取app数据更新到本地Map缓存。*
@Author:
Ship
*
@Description:
sync
data
to
local
cache
*
@Date:
Created
in
2020/12/25
*/
@Configuration
public
class
DataSyncTaskListener
implements
ApplicationListener<ContextRefreshedEvent>
{
private
static
ScheduledThreadPoolExecutor
scheduledPool
=
new
ScheduledThreadPoolExecutor(1,
new
ShipThreadFactory("service-sync",
true).create());
@NacosInjected
private
NamingService
namingService;
@Autowired
private
ServerConfigProperties
properties;
@Override
public
void
onApplicationEvent(ContextRefreshedEvent
event)
{
if
(event.getApplicationContext().getParent()
!=
null)
{
return;
}
scheduledPool.scheduleWithFixedDelay(new
DataSyncTask(namingService)
,
0L,
properties.getCacheRefreshInterval(),
TimeUnit.SECONDS);
WebsocketSyncCacheServer
websocketSyncCacheServer
=
new
WebsocketSyncCacheServer(properties.getWebSocketPort());
websocketSyncCacheServer.start();
}
class
DataSyncTask
implements
Runnable
{
private
NamingService
namingService;
public
DataSyncTask(NamingService
namingService)
{
this.namingService
=
namingService;
}
@Override
public
void
run()
{
try
{
//
get
all
app
names
ListView
services
=
namingService.getServicesOfServer(1,
Integer.MAX_VALUE,
NacosConstants.APP_GROUP_NAME);
if
(CollectionUtils.isEmpty(services.getData()))
{
return;
}
List
appNames
=
services.getData();
//
get
all
instances
for
(String
appName
:
appNames)
{
List
instanceList
=
namingService.getAllInstances(appName,
NacosConstants.APP_GROUP_NAME);
if
(CollectionUtils.isEmpty(instanceList))
{
continue;
}
ServiceCache.add(appName,
buildServiceInstances(instanceList));
List
pluginNames
=
getEnabledPlugins(instanceList);
PluginCache.add(appName,
pluginNames);
}
ServiceCache.removeExpired(appNames);
PluginCache.removeExpired(appNames);
}
catch
(NacosException
e)
{
e.printStackTrace();
}
}
private
List
getEnabledPlugins(List
instanceList)
{
Instance
instance
=
instanceList.get(0);
Map
metadata
=
instance.getMetadata();
//
plugins:
DynamicRoute,Auth
String
plugins
=
metadata.getOrDefault("plugins",
ShipPluginEnum.DYNAMIC_ROUTE.getName());
return
Arrays.stream(plugins.split(",")).collect(Collectors.toList());
}
private
List
buildServiceInstances(List
instanceList)
{
List
list
=
new
LinkedList<>();
instanceList.forEach(instance
->
{
Map
metadata
=
instance.getMetadata();
ServiceInstance
serviceInstance
=
new
ServiceInstance();
serviceInstance.setAppName(metadata.get("appName"));
serviceInstance.setIp(instance.getIp());
serviceInstance.setPort(instance.getPort());
serviceInstance.setVersion(metadata.get("version"));
serviceInstance.setWeight((int)
instance.getWeight());
list.add(serviceInstance);
});
return
list;
}
}
}路由规则数据同步同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。在公众号顶级顶级架构师后台回复“架构”,获取一份惊喜礼包。服务端WebsocketSyncCacheServer:*
@Author:
Ship
*
@Description:
*
@Date:
Created
in
2020/12/28
*/
public
class
WebsocketSyncCacheServer
extends
WebSocketServer
{
private
final
static
Logger
LOGGER
=
LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
private
Gson
gson
=
new
GsonBuilder().create();
private
MessageHandler
messageHandler;
public
WebsocketSyncCacheServer(Integer
port)
{
super(new
InetSocketAddress(port));
this.messageHandler
=
new
MessageHandler();
}
@Override
public
void
onOpen(WebSocket
webSocket,
ClientHandshake
clientHandshake)
{
LOGGER.info("server
is
open");
}
@Override
public
void
onClose(WebSocket
webSocket,
int
i,
String
s,
boolean
b)
{
LOGGER.info("websocket
server
close...");
}
@Override
public
void
onMessage(WebSocket
webSocket,
String
message)
{
LOGGER.info("websocket
server
receive
message:\n[{}]",
message);
this.messageHandler.handler(message);
}
@Override
public
void
onError(WebSocket
webSocket,
Exception
e)
{
}
@Override
public
void
onStart()
{
LOGGER.info("websocket
server
start...");
}
class
MessageHandler
{
public
void
handler(String
message)
{
RouteRuleOperationDTO
operationDTO
=
gson.fromJson(message,
RouteRuleOperationDTO.class);
if
(CollectionUtils.isEmpty(operationDTO.getRuleList()))
{
return;
}
Map>
map
=
operationDTO.getRuleList()
.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
if
(OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
|
OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType()))
{
RouteRuleCache.add(map);
}
else
if
(OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType()))
{
RouteRuleCache.remove(map);
}
}
}
}客户端WebsocketSyncCacheClient:*
@Author:
Ship
*
@Description:
*
@Date:
Created
in
2020/12/28
*/
@Component
public
class
WebsocketSyncCacheClient
{
private
final
static
Logger
LOGGER
=
LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
private
WebSocketClient
client;
private
RuleService
ruleService;
private
Gson
gson
=
new
GsonBuilder().create();
public
WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}")
String
serverWebSocketUrl,
RuleService
ruleService)
{
if
(StringUtils.isEmpty(serverWebSocketUrl))
{
throw
new
ShipException(ShipExceptionEnum.CONFIG_ERROR);
}
this.ruleService
=
ruleService;
ScheduledThreadPoolExecutor
executor
=
new
ScheduledThreadPoolExecutor(1,
new
ShipThreadFactory("websocket-connect",
true).create());
try
{
client
=
new
WebSocketClient(new
URI(
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 河南省周口市第七中学2022-2023学年高三物理下学期期末试卷含解析
- 湖南省衡阳市祁东县育贤中学高三物理测试题含解析
- 数学集体备课的活动总结
- 中学教学设计教案:生命不可承受之重(上课教案)
- 县人民医院呼吸内科门诊病历
- 生活区空调租赁合同
- 重庆曾家镇中学2022年高三物理摸底试卷含解析
- 医院健康教育工作计划6篇
- 安徽省六安市金寨县2023-2024学年八年级下学期6月期末历史试题
- 山东省青岛市莱西朴木中学2022-2023学年高三物理知识点试题含解析
- 超声波课后习题答案
- 架桥机过孔步骤介绍
- 河南理工继电保护课程设计
- 统计学大作业一
- 室内刮大白施工方案
- 医疗器械公司员工培训方案模板
- 《羿射九日》生字课件
- 系统宕机处理流程规范及方法
- 安徽高中毕业生登记表(共7页)
- 基于8086的温度系统
- 公司研发项目立项申请表
评论
0/150
提交评论