200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > netty实现简单的rpc 支持服务集群

netty实现简单的rpc 支持服务集群

时间:2022-04-18 02:17:20

相关推荐

netty实现简单的rpc 支持服务集群

netty实现简单的rpc,支持服务集群

前言简介环境准备Netty 处理器链设计消费者RPC代理工厂设计netty rpc消费者核心设计netty rpc生产者核心设计服务注册、发现以集群演示Demo尾言相关链接

前言

简介

最近了解了下netty相关知识,简单实现一个基于nettyrpcdemo,参考了几篇文章,其中这篇清幽之地大佬的RPC基本原理以及如何用Netty来实现RPC 非常不错 ,给我不少启迪,关于rpc的相关知识可以阅读该文,本文主要对如何阐述如何使用netty实现rpc。与清幽之地大佬的demo相比,增添

1. 按服务名加载集群,采用随机选择服务方式负载

2. 每个实例可以同时作为消费者和生产者

demo主要围绕以下两图开发:

Dubbo 架构图

rpc调用示意图

环境准备

主要环境nettySpringBootzookeeperzkclient

netty作为通信基础框架SpringBoot作为基础框架,打包、测试、运行zookeeper作为服务注册中心zkclientzookeeper客户端

Netty 处理器链设计

netty开发,处理器链pipeline设计是非常重要的一个过程,从byte数据入站,设计不同的入站处理器将其一步步转换我们可直接使用的数据类型,且因为tcp连接保证数据传输的先后顺序,但由于网络传输拥塞窗口等原因影响,可能会发生拆包、粘包情况,可以借助netty框架内置的处理器和自定义数据协议方式解决。出站处理器设计则是逆向上述过程,将我们的数据转化为byte的过程。

使用到的处理器和相关实体类

1. ByteMsgToDataMsgDecoder借助netty本身的ReplayingDecoder加自定义数据协议的方式解决拆、粘包问题,将入站数数解码为DataMsg类型的解码器

public class ByteMsgToDataMsgDecoder extends ReplayingDecoder<DataMsg> {protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);DataMsg msg = new DataMsg(length, content);out.add(msg);}}

数据传输实体DataMsg

@Data@AllArgsConstructorpublic class DataMsg {//数据长度private int length;//数据private byte[] data;}

2. DataMsgToByteMsgEncoder将出站DataMsg编码为Byte的解码器

public class DataMsgToByteMsgEncoder extends MessageToByteEncoder<DataMsg> {protected void encode(ChannelHandlerContext ctx, DataMsg msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getData());}}

3.DataMsgToRequestOrResponseConvert根据入参将DataMsg其中数据转换为请求、响应数据,根据服务消费者和生产者入站数据不同,进行不同的反序列化操作。

public class DataMsgToRequestOrResponseConvert extends SimpleChannelInboundHandler<DataMsg> {private Logger logger= LoggerFactory.getLogger(DataMsgToRequestOrResponseConvert.class);//需要转换目标类,即Response 或者 Requestprivate Class<?> target;public DataMsgToRequestOrResponseConvert(Class<?> target) {this.target = target;}protected void channelRead0(ChannelHandlerContext ctx, DataMsg msg) throws Exception {final byte[] data = msg.getData();Object object = JsonUtil.bytesToObject(data, target);ctx.fireChannelRead(object);logger.info("{}数据:{}",target.getSimpleName(),JsonUtil.objectToJsonString(object));}}

请求数据类Request封装消费者rpc服务时的参数

@Datapublic class Request {//请求IDprivate String id;// 类名private String className;// 函数名称private String methodName;// 参数类型private Class<?>[] parameterTypes;// 参数列表private Object[] parameters;}

响应数据类Response封装rpc生产者返回的数据

@Datapublic class Response {//请求IDprivate String requestId;//代码private int code;//错误信息private String errorMsg;//数据private Object data;}

NettyServerRequestHandler生产者核心处理器(后续切入)NettyClientResponseHandler消费者核心处理(后续切入)

服务生产者链式示意图:

服务消费者链式示意图:

消费者RPC代理工厂设计

为所有服务消费者对目标服务生产者的调用,生成代理,从代理转入NettyClientResponseHandler类处理,借助spring ApplicationContextAware、以及jdk的动态代理实现上述操作。

相关核心代码

spring bean工具类

@Componentpublic class SpringBeanUtil implements ApplicationContextAware, InitializingBean {static ApplicationContext context;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtil.context = applicationContext;}public static <T> T getBeanByClass(Class<?> clazz) {return (T) context.getBean(clazz);}@Overridepublic void afterPropertiesSet() throws Exception {//获取含有指定注解的beanMap<String, Object> beans = context.getBeansWithAnnotation(RpcClientTarget.class);Iterator<Object> iterator = beans.values().iterator();while (iterator.hasNext()) {Object target = iterator.next();Field[] fields = target.getClass().getDeclaredFields();for (Field field : fields) {//获取目标bean有需要被代理的fieldif (field.getAnnotationsByType(MyResource.class) != null) {field.setAccessible(true);try {field.set(target, ProxyBeanUtil.getProxy(field.getType()));} catch (IllegalAccessException e) {e.printStackTrace();}}}}}}

jdk代理工具类

@Componentpublic class RpcFactory implements InvocationHandler {@AutowiredNettyClient client;Logger logger = LoggerFactory.getLogger(this.getClass());public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.getDeclaringClass() == Object.class) {logger.info("method:{}代理执行,为Object声明的不进行代理执行", method.getName());return method.invoke(proxy, args);}logger.info("method:{}代理执行", method.getName());Request request = new Request();request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameters(args);request.setParameterTypes(method.getParameterTypes());request.setId(UUID.randomUUID().toString());String serverName = method.getDeclaringClass().getAnnotation(RpcClient.class).name();Response response = client.sendMsgV2(serverName,request);Class<?> returnType = method.getReturnType();//返回错误if (response.getCode() == ErrorCodeEnums.FAIL.getCode()) {throw new Exception(response.getErrorMsg());}/**1.返回结果是基础类型或者String类型,直接返回*2.其他类型转换为该类型*/if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {return response.getData();} else if (Collection.class.isAssignableFrom(returnType)) {return JsonUtil.jsonStringToObject(response.getData().toString(), Object.class);} else if (Map.class.isAssignableFrom(returnType)) {return JsonUtil.jsonStringToObject(response.getData().toString(), Map.class);} else {Object data = response.getData();return JsonUtil.jsonStringToObject(data.toString(), returnType);}}}

相关注解类集合

/*** @author jxy* @className RpcClientTarget* @description 含有rpc的目标类* @date /3/12 21:52*/@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)public @interface RpcClientTarget {}/*** @author : porteryao* @description : Rpc客户端注解* @date : /3/11 11:14*/@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)public @interface RpcClient {String name();}@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)public @interface MyResource {}

demo Controller

@RestController@RpcClientTarget //方便命中public class TestController {@MyResourceprivate StuService stuService;@GetMapping("/test")public String test() {StuInfo info = new StuInfo("jxt", 18);return stuService.sayHello(info);}}

service 接口

@RpcClient(name = "stuservice")public interface StuService {String sayHello(StuInfo stuInfo);}

上述操作可将需要代理执行rpc的接口,转到我们自定义的netty相关代码执行

netty rpc消费者核心设计

根据需要代理执行service上注解RpcClient获取其服务名,并从连接管理器ConnectManager获取通道通道不为空,则发送数据,并在阻塞队列中等待返回将返回结果按照service上的method封装

相关方法代码:

public Response sendMsgV2(String appName, Request request) throws InterruptedException {Channel channel = connectManage.chooseChannelByServiceName(appName);if (channel == null || (!channel.isActive())) {logger.error(appName + "无可用服务!");Response res = new Response();res.setCode(ErrorCodeEnums.FAIL.getCode());res.setErrorMsg(appName + "无可用服务!");return res;}//发送请求return clientResponseHandler.sendMsg(channel, request).take();}/*** 等待响应队列*/private Map<String, BlockingQueue<Response>> questMap = new ConcurrentHashMap<>();/*** 将返回结果放在阻塞队列*/public BlockingQueue<Response> sendMsg(Channel channel, Request request) {return getResponses(channel, request, questMap);}static BlockingQueue<Response> getResponses(Channel channel, Request request,Map<String, BlockingQueue<Response>> questMap) {BlockingQueue<Response> blockingQueue = new LinkedBlockingQueue<>();questMap.put(request.getId(), blockingQueue);byte[] bytes = JsonUtil.objectToJsonByteArray(request);DataMsg dataMsg = new DataMsg(bytes.length, bytes);channel.writeAndFlush(dataMsg);return blockingQueue;}static BlockingQueue<Response> getResponses(Channel channel, Request request,Map<String, BlockingQueue<Response>> questMap) {BlockingQueue<Response> blockingQueue = new LinkedBlockingQueue<>();questMap.put(request.getId(), blockingQueue);byte[] bytes = JsonUtil.objectToJsonByteArray(request);DataMsg dataMsg = new DataMsg(bytes.length, bytes);channel.writeAndFlush(dataMsg);return blockingQueue;}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) {BlockingQueue<Response> responseBlockingQueue = questMap.get(response.getRequestId());responseBlockingQueue.add(response);questMap.remove(response.getRequestId());}

由于netty是异步执行,所以这里需要发送者等待,笔者这里take()是无限期等待,可以设置一定时间等待,避免服务端长时间无响应造成调用方阻塞(后续会优化)。

致此,我们完成了rpc调用示意图,关于消费者调用设计的部分。

netty rpc生产者核心设计

生产者这块相对来说比较简单,根据消费者请求数据执行方法,并返回数据。

启动时将自身netty连接信息注册到注册中心接受请求时按Request 其中class类型查询目标bean执行目标bean Request 请求的方法将结果序列化返回

核心方法

protected void channelRead0(ChannelHandlerContext ctx, Request request)throws Exception {if (serviceMap.isEmpty()) {logger.warn("没有需要rpc调用的服务");return;}final Object instance = serviceMap.get(request.getClassName());if (instance != null) {Class<?> instanceClass = instance.getClass();Method method = instanceClass.getDeclaredMethod(request.getMethodName(), request.getParameterTypes());method.setAccessible(true);//避免权限问题Object o = method.invoke(instance,parametersConvert(request.getParameterTypes(), request.getParameters()));Response response = new Response();response.setCode(ErrorCodeEnums.SUCCESS.getCode());response.setData(o);response.setErrorMsg(ErrorCodeEnums.SUCCESS.getMsg());response.setRequestId(request.getId());String jsonString = JsonUtil.objectToJsonString(response);byte[] data = jsonString.getBytes(CharsetUtil.UTF_8);DataMsg dataMsg = new DataMsg(data.length, data);ctx.writeAndFlush(dataMsg);}}

服务注册、发现以集群

ps:本文篇幅较长,且这块由于逻辑处理复杂,笔者会在闲暇之余补充在单独的文章

主要步骤:

服务提供者按照application name在zookeeper注册其节点,节点类型为临时节点,便于zookeeper客户端感知消费者根据服务调用时检测目标服务是否已存在通道,初次需从zookeeper获取,并设置监听根据zookeeper节点信息创建通道连接zookeeper客户端监听到某服务注册的节点发送变化,并更新对应服务通道

演示Demo

到这了,来看看演示吧:

zookeeper已注册节点

idea启动情况:

clientApp:9002实例调用ServerApp

由于是随机数负载,可能需要尝试几次才能看到效果

ServerApp:9007实例调用clientApp

上述demo,即一个实例即可成为消费者也可成为生产者,并在调用时负载。

尾言

笔者仅做个人学习,措辞不合理,知识不够深入,请勿怪!Thank

相关链接

参考文献

[1]: /p/8876c9f3cd7f

仓库地址

gitee /junxiaoyao/jxy_netty_rpc_study

注:master分支为不包含 zookeeper注册中心的demo,如需查看请切换至feature/auto_registry_zookeepe分支

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。