手写RPC框架---02 Posted on 2024-01-07 22:41:24 2024-01-07 22:41:24 by Author 摘要 上一章讲解了消费方在RPC框架中具体的实现过程,该章讲解提供方在RPC框架的过程,并且提供了相应的源码以及相应的参考文件。 # 手写RPC框架---02 1. 上一章主要讲解了在RPC框架中调用方的具体实现过程,这一篇章讲讲RPC的提供方具体实现过程。首先大概讲讲提供方实现的流程,才能更好的理解其实现过程。 - 首先提供方也是一个单体的微服务应用,在该服务启动的时候,需要进行一些初始化操作,比如连接注册中心,序列化方式的初始化,启动Netty服务端程序,监听某个端口,用来解释消费者发送的数据,还有启动线程池等初始化相关的工作。 - 其次,需要使用反射机制加载哪些bean提供了相关的服务,然后将该ben提供的服务向注册中心注册(注册相关服务的元数据信息),这样消费者才能从注册中心发现并且获取相应的服务。之后就是等待相应的端口请求的数据,之后对数据解析以及执行相应的方法,返回数据等过程。该过程的具体实现如下代码所示: ```java public class ProviderPostProcessor implements InitializingBean,BeanPostProcessor, EnvironmentAware { private Logger logger = LoggerFactory.getLogger(ProviderPostProcessor.class); RpcProperties rpcProperties; // 此处在window环境下改为127.0.0.1 private static String serverAddress = "0.0.0.0"; private final Map<String, Object> rpcServiceMap = new HashMap<>(); @Override public void afterPropertiesSet() throws Exception { Thread t = new Thread(() -> { try { startRpcServer(); } catch (Exception e) { logger.error("start rpc server error.", e); } }); t.setDaemon(true); t.start(); SerializationFactory.init(); RegistryFactory.init(); LoadBalancerFactory.init(); FilterConfig.initServiceFilter(); ThreadPollFactory.setRpcServiceMap(rpcServiceMap); } private void startRpcServer() throws InterruptedException { int serverPort = rpcProperties.getPort(); EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .option(ChannelOption.SO_KEEPALIVE, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new RpcEncoder()) .addLast(new RpcDecoder()) .addLast(new ServiceBeforeFilterHandler()) .addLast(new RpcRequestHandler()) .addLast(new ServiceAfterFilterHandler()); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, serverPort).sync(); logger.info("server addr {} started on port {}", this.serverAddress, serverPort); channelFuture.channel().closeFuture().sync(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info("ShutdownHook execute start..."); logger.info("Netty NioEventLoopGroup shutdownGracefully..."); logger.info("Netty NioEventLoopGroup shutdownGracefully2..."); boss.shutdownGracefully(); worker.shutdownGracefully(); logger.info("ShutdownHook execute end..."); }, "Allen-thread")); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } /** * 服务注册 * @param bean * @param beanName * @return * @throws BeansException */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Class<?> beanClass = bean.getClass(); // 找到bean上带有 RpcService 注解的类 RpcService rpcService = beanClass.getAnnotation(RpcService.class); if (rpcService != null) { // 可能会有多个接口,默认选择第一个接口 String serviceName = beanClass.getInterfaces()[0].getName(); if (!rpcService.serviceInterface().equals(void.class)){ serviceName = rpcService.serviceInterface().getName(); } String serviceVersion = rpcService.serviceVersion(); try { // 服务注册 int servicePort = rpcProperties.getPort(); // 获取注册中心 ioc RegistryService registryService = RegistryFactory.get(rpcProperties.getRegisterType()); ServiceMeta serviceMeta = new ServiceMeta(); // 服务提供方地址 serviceMeta.setServiceAddr("服务提供方地址"); serviceMeta.setServicePort(servicePort); serviceMeta.setServiceVersion(serviceVersion); serviceMeta.setServiceName(serviceName); registryService.register(serviceMeta); // 缓存 rpcServiceMap.put(RpcServiceNameBuilder.buildServiceKey(serviceMeta.getServiceName(),serviceMeta.getServiceVersion()), bean); logger.info("register server {} version {}",serviceName,serviceVersion); } catch (Exception e) { logger.error("failed to register service {}", serviceVersion, e); } } return bean; } @Override public void setEnvironment(Environment environment) { RpcProperties properties = RpcProperties.getInstance(); PropertiesUtils.init(properties,environment); rpcProperties = properties; logger.info("读取配置文件成功"); } } ``` - ProviderPostProcessor该类是在项目启动的时候,是由@Import注解自动加载注入Spring框架里的类,该类实现了InitializingBean,BeanPostProcessor, EnvironmentAware这三个类,具体的作用如下: - 实现InitializingBean类,是所有的bean初始化过程中,动态加载相应的组件,以及开启线程池,和开启Netty服务端进行监听某个端口。 - 实现BeanPostProcessor类,是通过反射机制在该项目中查找类带有@RpcService注解的所有类,之后通过向注册中心注册该类的元数据信息。同时并且缓存该类,用于后期的方法执行。 - 实现EnvironmentAware类,是为了动态加载开发者的配置信息,然后是的服务提供方的环境是由开发者自己配置的,而不是框架默认设置的,用来动态更新服务的上下文环境的。 以上就是提供方启动类需要完成的工作以及任务,接下来就是对消费者发过来的请求进行协议解析以及方法执行的具体过程,该过程使用线程池机制执行相应的任务,在服务端口接受到数据时,将这些数据分发到不同的线程中,提供执行效率。该过程步骤为,当接受到一个请求,就从线程池里面取出一个线程进行执行相应的请求,执行的过程为:从请求协议中得使用那个服务的类,以及使用该类的那个方法以及参数, 然后从提供方服务的缓存中得到相应的服务类,使用代理机制执行相应的方法,最后将返回结果封装为响应协议发送给调用方。具体实现如下所示: ```java public class ThreadPollFactory { private static Logger logger = LoggerFactory.getLogger(ThreadPollFactory.class); private static ThreadPoolExecutor slowPoll; private static ThreadPoolExecutor fastPoll; private static volatile ConcurrentHashMap<String, AtomicInteger> slowTaskMap = new ConcurrentHashMap(); private static int corSize = Runtime.getRuntime().availableProcessors(); // 缓存服务 该缓存放这里不太好,应该作一个统一 Config 进行管理 private static Map<String, Object> rpcServiceMap; static { slowPoll = new ThreadPoolExecutor(corSize / 2, corSize, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2000), r->{ Thread thread = new Thread(r); thread.setName("slow poll-"+r.hashCode()); thread.setDaemon(true); return thread; }); fastPoll = new ThreadPoolExecutor(corSize, corSize*2, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), r->{ Thread thread = new Thread(r); thread.setName("fast poll-"+r.hashCode()); thread.setDaemon(true); return thread; }); startClearMonitor(); } private ThreadPollFactory(){} public static void setRpcServiceMap(Map<String, Object> rpcMap){ rpcServiceMap = rpcMap; } /** * 清理慢请求 */ private static void startClearMonitor(){ Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(()->{ slowTaskMap.clear(); },5,5,TimeUnit.MINUTES); } public static void submitRequest(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol){ final RpcRequest request = protocol.getBody(); String key = request.getClassName() + request.getMethodName() + request.getServiceVersion(); ThreadPoolExecutor poll = fastPoll; if (slowTaskMap.containsKey(key) && slowTaskMap.get(key).intValue() >= 10){ poll = slowPoll; } poll.submit(()->{ RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>(); final MsgHeader header = protocol.getHeader(); RpcResponse response = new RpcResponse(); long startTime = System.currentTimeMillis(); try { final Object result = submit(ctx, protocol); response.setData(result); response.setDataClass(result == null ? null : result.getClass()); header.setStatus((byte) MsgStatus.SUCCESS.ordinal()); } catch (Exception e) { // 执行业务失败则将异常返回 header.setStatus((byte) MsgStatus.FAILED.ordinal()); response.setException(e); logger.error("process request {} error", header.getRequestId(), e); }finally { long cost = System.currentTimeMillis() - startTime; System.out.println("cost time:" + cost); if(cost > 1000){ final AtomicInteger timeOutCount = slowTaskMap.putIfAbsent(key, new AtomicInteger(1)); if (timeOutCount!=null){ timeOutCount.incrementAndGet(); } } } resProtocol.setHeader(header); resProtocol.setBody(response); logger.info("执行成功: {},{},{},{}",Thread.currentThread().getName(),request.getClassName(),request.getMethodName(),request.getServiceVersion()); ctx.fireChannelRead(resProtocol); }); } private static Object submit(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) throws Exception{ MsgHeader header = protocol.getHeader(); header.setMsgType((byte) MsgType.RESPONSE.ordinal()); final RpcRequest request = protocol.getBody(); // 执行具体业务 return handle(request); } // 调用方法 private static Object handle(RpcRequest request) throws Exception { String serviceKey = RpcServiceNameBuilder.buildServiceKey(request.getClassName(), request.getServiceVersion()); // 获取服务信息 Object serviceBean = rpcServiceMap.get(serviceKey); if (serviceBean == null) { throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName())); } // 获取服务提供方信息并且创建 Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getData(); FastClass fastClass = FastClass.create(serviceClass); int methodIndex = fastClass.getIndex(methodName, parameterTypes); // 调用方法并返回结果 return fastClass.invoke(methodIndex, serviceBean, parameters); } } ``` - 以上的过程就是服务提供方具体的实现方式。该项目借鉴Xhy提供的代码,代码连接为:[Xhy-Xhy-Rpc](https://gitee.com/XhyQAQ/xhy-rpc),本人在该项目基础上解决了一些小bug(比如调用方法时,参数列表只能传递一个参数,参数类型只能是String类型的),经过本人小小的改动,在进行方法调用的时候实现可以传入多个参数,以及提供方自己定义的类型。同时为了理解SPI机制,自己在原有的项目中加了一个以nacos为注册中心的扩展类。以及对这些改动的地方提供了具体的测试代码。更改后的项目地址为:[RPC-V2](https://gitee.com/kexianqun/rpc-v2)。其中,在对项目理解过程中,自己整理了每个包中每个文件的作用,然后梳理成了paf文档,如果感兴趣想看的可以点击这里[下载](https://e-blog.oss-cn-shanghai.aliyuncs.com/keke/RPC%E6%A1%86%E6%9E%B6.pdf)。
{{ item.content }}
{{ child.content }}