前提
前置文章:
在前置的《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》一文中已经定义了一个相对简单的RPC
私有协议,并且实现了对应的编码和解码模块。这篇文章基于协议篇,完成Server
端代码调用的编写。考虑到目前相对主流的IOC
容器是Spring
,这里选用了spring-boot-starter
(非MVC
容器,只是单纯管理Bean
),依赖JDK1.8+
。
思路
首先RPC
私有协议定义了Client
端会传过来四个和服务调用息息相关的字段:接口全类名interfaceName
、方法名methodName
、方法参数签名字符串数组methodArgumentSignatures
(可选,这个参数不是必须传入的)以及方法参数数组methodArguments
(可选,空方法列表的时候不需要传入参数)。主要流程如下:
- 把
Server
端的所有服务端(实现)类交由IOC
容器托管。
Client
端发起RPC
请求。
- 通过前面提到的最多四个参数,从
Server
服务实例的IOC
容器中匹配出吻合度最高的一个方法java.lang.reflect.Method
实例、该方法实例的宿主类以及宿主类对应的Bean
实例,如果这一步匹配的目标方法超过1个或者为0个,可以直接返回异常信息。
- 把前一步得到的
Method
实例、宿主类Bean
实例,结合方法参数数组methodArguments
进行反射调用,得到调用结果。
Server
端把响应结果封装到payload
通过私有协议发送回Client
端。
Server端代码实现
为了暂时方便起见,部分数组入参被重新封装为ArrayList
,实际上编写RPC
框架的时候应该优先考虑性能问题,像JDK
提供的集合类库等等应该尽可能少用(以ArrayList
为例,扩容的时候存在底层Object[]
拷贝,造成性能损失和额外的内存消耗),极尽可能使用基本类型和数组。
先定义方法匹配器MethodMatcher
相关的类:
public interface MethodMatcher {
MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input); }
@EqualsAndHashCode @Data public class MethodMatchInput {
private String interfaceName;
private String methodName;
private List<String> methodArgumentSignatures;
private int methodArgumentArraySize; }
@Data public class MethodMatchOutput {
private Method targetMethod;
private Class<?> targetClass;
private Class<?> targetUserClass;
private Object target;
private List<Class<?>> parameterTypes; }
|
目标方法匹配的逻辑大致如下:
- 方法名称和方法实例的宿主类型一定作为匹配条件的一部分。
- 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配。
- 如果没有传入参数签名列表,那么使用参数的数量进行匹配。
- 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配。
- 考虑到方法匹配解析的过程相对耗时,需要把结果缓存起来。
分析至此,可以基于反射,编写一个抽象的方法匹配器BaseMethodMatcher
,然后把获取宿主类信息的功能委托到子类:
public class MethodMatchException extends RuntimeException {
public MethodMatchException(String message) { super(message); }
public MethodMatchException(String message, Throwable cause) { super(message, cause); }
public MethodMatchException(Throwable cause) { super(cause); } }
@Data public class HostClassMethodInfo {
private Class<?> hostClass; private Class<?> hostUserClass; private Object hostTarget; }
@Slf4j abstract class BaseMethodMatcher implements MethodMatcher {
private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap();
@Override public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) { return cache.computeIfAbsent(input, in -> { try { MethodMatchOutput output = new MethodMatchOutput(); Class<?> interfaceClass = Class.forName(in.getInterfaceName()); HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass); List<Method> targetMethods = Lists.newArrayList(); ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> { String methodName = method.getName(); Class<?> declaringClass = method.getDeclaringClass(); List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures()) .map(mas -> { List<Class<?>> list = Lists.newArrayList(); mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null))); return list; }).orElse(Lists.newArrayList()); output.setParameterTypes(inputParameterTypes); if (!inputParameterTypes.isEmpty()) { List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes()); return Objects.equals(methodName, in.getMethodName()) && Objects.equals(info.getHostUserClass(), declaringClass) && Objects.equals(parameterTypes, inputParameterTypes); } if (in.getMethodArgumentArraySize() > 0) { List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes()); return Objects.equals(methodName, in.getMethodName()) && Objects.equals(info.getHostUserClass(), declaringClass) && in.getMethodArgumentArraySize() == parameterTypes.size();
} return Objects.equals(methodName, in.getMethodName()) && Objects.equals(info.getHostUserClass(), declaringClass);
}); if (targetMethods.size() != 1) { throw new MethodMatchException(String.format("查找到目标方法数量不等于1,interface:%s,method:%s", in.getInterfaceName(), in.getMethodName())); } Method targetMethod = targetMethods.get(0); output.setTargetClass(info.getHostClass()); output.setTargetMethod(targetMethod); output.setTargetUserClass(info.getHostUserClass()); output.setTarget(info.getHostTarget()); return output; } catch (Exception e) { log.error("查找匹配度最高的方法失败,输入参数:{}", JSON.toJSONString(in), e); if (e instanceof MethodMatchException) { throw (MethodMatchException) e; } else { throw new MethodMatchException(e); } } }); }
abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass); }
|
接着,通过接口类型获取宿主类的功能就委托给Spring
实现,从IOC
容器中获取,定义SpringMethodMatcher
:
@Component public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware {
private DefaultListableBeanFactory beanFactory;
@Override public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) beanFactory; }
@Override HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) { HostClassMethodInfo info = new HostClassMethodInfo(); Object bean = beanFactory.getBean(interfaceClass); info.setHostTarget(bean); info.setHostClass(bean.getClass()); info.setHostUserClass(ClassUtils.getUserClass(bean.getClass())); return info; } }
|
至此,目标方法匹配的模块已经编写完毕,接下来需要处理方法参数列表的反序列化。编写协议的时候,笔者把方法参数列表methodArguments
存放在Object
数组中,传输的时候序列化为byte
数组,经过协议解析之后,方法参数列表的实际类型为ByteBuf
数组(这是因为Netty
中的字节容器就是ByteBuf
),那么需要考虑把ByteBuf
数组转换为目标方法的参数类型实例。主要步骤如下:
- 如果方法参数列表为空,那么什么都不用做,也就是调用了无参数的方法。
- 如果方法参数列表不为空同时方法参数类型列表不为空,优先选用方法参数类型列表进行转换。
- 如果方法参数列表不为空同时方法参数类型列表为空,则使用
Method#getParameterTypes()
得到的方法参数列表类型进行转换。
定义一个方法参数转换器接口MethodArgumentConverter
:
public interface MethodArgumentConverter {
ArgumentConvertOutput convert(ArgumentConvertInput input); }
@Data public class ArgumentConvertInput {
private Method method;
private List<Class<?>> parameterTypes;
private List<Object> arguments; }
@Data public class ArgumentConvertOutput {
private Object[] arguments; }
|
方法参数转换器的默认实现如下:
@Slf4j @Component public class DefaultMethodArgumentConverter implements MethodArgumentConverter {
private final Serializer serializer = FastJsonSerializer.X;
@Override public ArgumentConvertOutput convert(ArgumentConvertInput input) { ArgumentConvertOutput output = new ArgumentConvertOutput(); try { if (null == input.getArguments() || input.getArguments().isEmpty()) { output.setArguments(new Object[0]); return output; } List<Class<?>> inputParameterTypes = input.getParameterTypes(); int size = inputParameterTypes.size(); if (size > 0) { Object[] arguments = new Object[size]; for (int i = 0; i < size; i++) { ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i); int readableBytes = byteBuf.readableBytes(); byte[] bytes = new byte[readableBytes]; byteBuf.readBytes(bytes); arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i)); byteBuf.release(); } output.setArguments(arguments); return output; } Class<?>[] parameterTypes = input.getMethod().getParameterTypes(); int len = parameterTypes.length; Object[] arguments = new Object[len]; for (int i = 0; i < len; i++) { ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i); int readableBytes = byteBuf.readableBytes(); byte[] bytes = new byte[readableBytes]; byteBuf.readBytes(bytes); arguments[i] = serializer.decode(bytes, parameterTypes[i]); byteBuf.release(); } output.setArguments(arguments); return output; } catch (Exception e) { throw new ArgumentConvertException(e); } } }
|
所有前置工作都完成了,现在编写一个Server
端的入站处理器ServerHandler
,暂时不做代码逻辑优化,只做实现,把反射调用的模块直接在此类中编写:
@Component @Slf4j public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> {
@Autowired private MethodMatcher methodMatcher;
@Autowired private MethodArgumentConverter methodArgumentConverter;
@Override protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception { log.info("服务端接收到:{}", packet); MethodMatchInput input = new MethodMatchInput(); input.setInterfaceName(packet.getInterfaceName()); input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures()) .map(Lists::newArrayList).orElse(Lists.newArrayList())); input.setMethodName(packet.getMethodName()); Object[] methodArguments = packet.getMethodArguments(); input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0); MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input); log.info("查找目标实现方法成功,目标类:{},宿主类:{},宿主方法:{}", output.getTargetClass().getCanonicalName(), output.getTargetUserClass().getCanonicalName(), output.getTargetMethod().getName() ); Method targetMethod = output.getTargetMethod(); ArgumentConvertInput convertInput = new ArgumentConvertInput(); convertInput.setArguments(input.getMethodArgumentArraySize() > 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList()); convertInput.setMethod(output.getTargetMethod()); convertInput.setParameterTypes(output.getParameterTypes()); ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput); ReflectionUtils.makeAccessible(targetMethod); Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments()); ResponseMessagePacket response = new ResponseMessagePacket(); response.setMagicNumber(packet.getMagicNumber()); response.setVersion(packet.getVersion()); response.setSerialNumber(packet.getSerialNumber()); response.setAttachments(packet.getAttachments()); response.setMessageType(MessageType.RESPONSE); response.setErrorCode(200L); response.setMessage("Success"); response.setPayload(JSON.toJSONString(result)); log.info("服务端输出:{}", JSON.toJSONString(response)); ctx.writeAndFlush(response); } }
|
编写一个Server
的启动类ServerApplication
,在Spring
容器启动之后,启动Netty
服务:
@SpringBootApplication(scanBasePackages = "club.throwable.server") @Slf4j public class ServerApplication implements CommandLineRunner {
@Value("${netty.port:9092}") private Integer nettyPort;
@Autowired private ServerHandler serverHandler;
public static void main(String[] args) throws Exception { SpringApplication.run(ServerApplication.class, args); }
@Override public void run(String... args) throws Exception { int port = nettyPort; ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new RequestMessagePacketDecoder()); ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(serverHandler); } }); ChannelFuture future = bootstrap.bind(port).sync(); log.info("启动NettyServer[{}]成功...", port); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
|
最后,编写契约包和契约实现:
- ch0-custom-rpc-protocol 项目根目录 - club.throwable - utils 工具类 - protocol 协议 - exception 异常 - contract 契约 - HelloService 契约接口 - server 服务端 - contract - DefaultHelloService 契约接口实现
|
public interface HelloService {
String sayHello(String name); }
@Service public class DefaultHelloService implements HelloService {
@Override public String sayHello(String name) { return String.format("%s say hello!", name); } }
|
先启动服务端ServerApplication
,再启动上一节提到的TestProtocolClient
,输出结果:
// 服务端日志 2020-01-15 00:05:57.898 INFO 14420 --- [ main] club.throwable.server.ServerApplication : 启动NettyServer[9092]成功... 2020-01-15 00:06:05.980 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)]) 2020-01-15 00:06:07.448 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello 2020-01-15 00:06:07.521 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
// 客户端日志 00:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 启动NettyClient[9092]成功... ...省略... 00:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到来自服务端的响应消息,消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
|
可见RPC
调用成功。
小结
编写RPC
的Server
端技巧在于处理目标方法和宿主类的查找,在转换方法参数的时候,需要考虑简化处理和提高效率,剩下的就是做好异常处理和模块封装。限于篇幅,后面会先分析Client
端的处理,再分析心跳处理、服务端优化、甚至是对接注册中心等等,在Netty
、SpringBoot
等优秀框架的加持下编写一个RPC
框架其实并不困难,困难的是性能优化和生态圈的支持。
Demo
项目地址:
(本文完 c-1-d e-a-20200115)