【SpringBoot】整合Netty实现RPC
线程模型
Netty高性能架构设计
简单了解React线程模型,参考文章【五分钟快速理解 Reactor 模型】
举例说明:Reactor的三种线程模型
线程模型1:传统阻塞 I/O 服务模型
模型特点:
- 采用阻塞
IO
模式获取输入的数据
- 每个链接都需要独立的线程完成数据的输入,业务处理、数据返回。
问题分析:
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在
read
操作,造成线程资源浪费。
线程模型2:Reactor 模式
- Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
- 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式(分派模式)
- Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键
针对传统阻塞I/O服务模型的2个缺点,解决方案如下:
- 基于
I/O
复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。Reactor
对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher
) 3. 通知者模式(notifier
)
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
ServiceHandler(Reactor)是个阻塞对象,去监听事件;此时工作线程池(业务处理线程)是空闲状态,当某个连接发送数据后,由ServiceHandler监听到相应的事件,再通过eventDispatch(事件分发)方法分派给工作线程池挑选空闲状态的线程去处理业务,处理完该客户端连接业务后回到空闲状态再等待新的客户端连接业务。
核心组成
- Reactor: Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。它就像公司的电话接钱员,它接听来自客户的电话并将线路转移到适当的联系人。
- Handlers: 处理程序执行IO事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
单 Reactor 单线程
方案说明:
- Select是前面IO复用模型介绍的标准网络编程APl,可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发
- 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
- 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
- Handler会完成Read→业务处理→Send的完整业务流程
结合实例:服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写 等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的IO案例就属于这种模型。
模型分析
- 优点: 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
- 缺点: 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 缺点: 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
- 使用场景: 客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况
单 Reactor 多线程
方案说明:
- Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发
- Reactor对象如果接收到建立连接事件请求,则由Acceptor(接收器)通过accept方法处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件业务
- Reactor对象如果不是连接事件请求,则由Reactor分发调用客户端连接对应的Handler来处理,此时Handler是持有客户端连接的channel
- Handler只负责响应事件,不做具体的业务处理,通过read方法读取数据后,会分发给后面的worker线程池的某个线程处理业务
- Worker线程池会分配独立线程完成真正的业务,并将结果返回给Handler
- Handler收到响应后,通过send方法将结果返回给client
模型分析
- 优点:可以充分的利用多核
cpu
的处理能力
- 缺点:多线程数据共享和访问比较复杂,
reactor
处理所有的事件的监听和响应请求,在单线程运行,在高并发场景容易出现性能瓶颈。
主从 Reactor 多线程
方案说明:
- Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
- 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor
- SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
- 当有新事件发生时,SubReactor就会调用对应的handler处理
- handler通过read读取数据,分发给后面的worker线程处理
- worker线程池分配独立的worker线程进行业务处理,并返回结果
- handler收到响应的结果后,再通过send将结果返回给client
- Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个SubReactor
模型分析
- 优点: 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
- 优点: 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据
- 缺点: 编程复杂度较高
- 结合实例: 这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持
Netty模型
参考文档:https://blog.csdn.net/qq_35751014/article/details/104443715
- Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写。BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。
- NioEventLoopGroup相当于一个线程管理器(类似于ExecutorServevice),它下面维护很多个NioEventLoop线程。
- 在初始化这两个BossGroup和WorkerGroup线程组时,默认会在每个Group中生成CPU*2个NioEventLoop线程数
- 当n个连接来了,Group默认会按照连接请求的顺序分别将这些连接分给各个NioEventLoop去处理。
- 同时Group还负责管理EventLoop的生命周期。
- NioEventLoop表示一个不断循环的执行处理任务的线程
- 它维护了一个线程和任务队列。
- 每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。
- 每个NioEventLoop相当于Selector,负责处理多个Channel上的事件
- 每增加一个请求连接,NioEventLoopGroup就将这个请求依次分发给它下面的NioEventLoop处理。
- 每个
Boss NioEventLoop
循环执行的步骤有3步:
- 轮询
accept
事件
- 处理
accept
事件,与client
建立连接,生成NioSocketChannel
,并将其注册到某个Worker NioEventLoop
的selector
上。
- 处理任务队列到任务,即
runAllTasks
- 每个
Worker NioEventLoop
循环执行的步骤:
- 轮询
read
,write
事件
- 处理
I/O
事件,即read
,write
事件,在对应的NioSocketChannel
中进行处理
- 处理任务队列的任务,即
runAllTasks
- 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中维护了一个ChannelHandlerContext链表,而ChannelHandlerContext则保存了Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。如图所示,Channel和pipeline一一对应,ChannelHandler和ChannelHandlerContext一一对应。
ChannelHandler
是一个接口,负责处理或拦截I/O
操作,并将其转发到Pipeline
中的下一个处理Handler
进行处理。
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
先实现简单的Netty通信
服务端示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(new SimpleChannelInboundHandler<Object>() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客户端连接啦。。。客户端地址:{}", ctx.channel().remoteAddress()); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { log.info("服务端接收到的数据:{}", o.toString()); String str = o.toString(); str = str.replace("吗", ""); str = str.replace("?", "!"); str = str.replace("? ", "! "); channelHandlerContext.writeAndFlush(str); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly(); channelFuture.channel().closeFuture(); }
|
客户端示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(new SimpleChannelInboundHandler<Object>() {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("哈哈哈"); }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { log.info("客户端接收到的数据:{}", o.toString()); } }); } });
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); channel.writeAndFlush(msg + "\r\n"); } channelFuture.channel().closeFuture(); }
|
快启动试试看把,不过需要注意的是,得先启动服务端哦~
SpringBoot + Netty4实现rpc框架
好了,接下来就让我们进入正题,让我们利用我们所学的知识去实现自己一个简单的rpc框架吧
简单说下RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。让两个服务之间调用就像调用本地方法一样。
RPC时序图:
RPC流程:
- 【客户端】发起调用
- 【客户端】数据编码
- 【客户端】发送编码后的数据到服务端
- 【服务端】接收客户端发送的数据
- 【服务端】对数据进行解码
- 【服务端】处理消息业务并返回结果值
- 【服务端】对结果值编码
- 【服务端】将编码后的结果值回传给客户端
- 【客户端】接收结果值
- 【客户端】解码结果值
- 【客户端】处理返回数据业务
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.58.Final</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.8</version> </dependency> </dependencies>
|
编写服务端
自定义消息协议:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class RpcMessage implements Serializable { private static final long serialVersionUID = 430507739718447406L;
private String name;
private String methodName;
private Class<?>[] parTypes;
private Object[] pars;
private Object result; }
|
自定义Rpc注解:
1 2 3 4 5 6 7 8 9 10
|
@Target(value = {ElementType.TYPE, ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RpcServer { }
|
定义ServerHandle
业务处理器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
@Slf4j @ChannelHandler.Sharable public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware { private Map<String, Object> serviceMap;
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class); log.info("被@RpcServer注解加载的Bean: {}", beansWithAnnotation); if (beansWithAnnotation.size() > 0) { Map<String, Object> map = new ConcurrentHashMap<>(16); for (Object o : beansWithAnnotation.values()) { Class<?> anInterface = o.getClass().getInterfaces()[0]; map.put(anInterface.getName(), o); } serviceMap = map; } }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客户端连接了: {}", ctx.channel().remoteAddress()); super.channelActive(ctx); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("异常信息"); cause.printStackTrace(); super.exceptionCaught(ctx, cause); }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { log.info("客户端发送的消息:{}", rpcMessage); Object service = serviceMap.get(rpcMessage.getName()); Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes()); method.setAccessible(true); Object result = method.invoke(service, rpcMessage.getPars()); rpcMessage.setResult(JSONUtil.toJsonStr(result)); log.info("回给客户端的消息:{}", rpcMessage); channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE); } }
|
定义NettyServer
端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
@Slf4j public class NettyServer {
private final ServerHandle serverHandle;
private Channel channel;
public NettyServer(ServerHandle serverHandle) { this.serverHandle = serverHandle; }
public void start(int port) { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(serverHandle); } });
final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("服务端启动-端口: {}", port); channel = channelFuture.channel(); channel.closeFuture().syncUninterruptibly(); } catch (Exception e) { boss.shutdownGracefully(); worker.shutdownGracefully(); } }
public void stop() { channel.close(); } }
|
自定义rpc配置属性类:
1 2 3 4 5 6 7 8 9 10
|
@Component @ConfigurationProperties(prefix = "netty") @Data public class NettyRpcProperties { private int serverPort; }`
|
创建Server端启动配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
@Slf4j @Configuration @EnableConfigurationProperties(NettyRpcProperties.class) public class ServerBeanConfig {
private final NettyRpcProperties nettyRpcProperties;
@Autowired public ServerBeanConfig(NettyRpcProperties nettyRpcProperties) { this.nettyRpcProperties = nettyRpcProperties; }
@Bean public ServerHandle serverHandle() { return new ServerHandle(); }
@Bean public NettyServer nettyServer(ServerHandle handle) { NettyServer nettyServer = new NettyServer(handle);
return nettyServer; }
@Component static class NettyServerStart implements ApplicationRunner { private final NettyServer nettyServer; private final NettyRpcProperties properties;
@Autowired NettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) { this.nettyServer = nettyServer; this.properties = properties; }
@Override public void run(ApplicationArguments args) throws Exception { log.info("===============ApplicationRunner"); if (nettyServer != null) { nettyServer.start(properties.getServerPort()); } } } }
|
注入Spring容器
此时有两种方式让该配置自动注入Spring容器生效:
自动注入
在resource目录下创建META-INF目录,创建spring.factories文件
在该文件里写上
org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包路径:xxx.xxx.xxx}.${配置类:ServerBeanConfig}
配置好之后,在SpringBoot启动时会自动加载该配置类。
通过注解注入
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ServerBeanConfig.class}) public @interface EnableNettyServer { }
|
编写客户端
创建客户端处理器`ClientHandle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Slf4j @ChannelHandler.Sharable public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> {
private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap;
public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) { this.rpcMessageConcurrentMap = rpcMessageConcurrentMap; }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { log.info("客户端收到服务端消息:{}", rpcMessage); rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage); } }
|
创建客户端启动类NettyClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
@Slf4j public class NettyClient {
private Channel channel;
private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>();
public RpcMessage send(int port, final RpcMessage rpcMessage) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(new ClientHandle(rpcMessageConcurrentMap)); } }); final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly(); log.info("连接服务端成功: " + channelFuture.channel().remoteAddress()); channel = channelFuture.channel(); channel.writeAndFlush(rpcMessage); log.info("发送数据成功:{}", rpcMessage); channel.closeFuture().syncUninterruptibly(); return rpcMessageConcurrentMap.get(channel); } catch (Exception e) { log.error("client exception", e); return null; } finally { group.shutdownGracefully(); rpcMessageConcurrentMap.remove(channel); } }
public void stop() { channel.close(); } }
|
定义Netty客户端Bean后置处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
@Slf4j public class NettyClientBeanPostProcessor implements BeanPostProcessor {
private final NettyClient nettyClient;
public NettyClientBeanPostProcessor(NettyClient nettyClient) { this.nettyClient = nettyClient; }
@Override public Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException { Class<?> beanClass = bean.getClass(); do { Field[] fields = beanClass.getDeclaredFields(); for (Field field : fields) { if (field.getAnnotation(RpcServer.class) != null) { field.setAccessible(true); try { Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient)); field.set(bean, o); log.info("创建代理类 ===>>> {}", beanName); } catch (IllegalAccessException e) { log.error(e.getMessage()); } } } } while ((beanClass = beanClass.getSuperclass()) != null); return bean; }
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; }
static class ClientInvocationHandle implements InvocationHandler { private final NettyClient nettyClient;
public ClientInvocationHandle(NettyClient nettyClient) { this.nettyClient = nettyClient; }
@Override public Object invoke(Object proxy, Method method, Object[] args) { RpcMessage rpcMessage = RpcMessage.builder() .name(method.getDeclaringClass().getName()) .methodName(method.getName()) .parTypes(method.getParameterTypes()) .pars(args) .build(); RpcMessage send = nettyClient.send(1111, rpcMessage); log.info("接收到服务端数据:{}, 返回结果值 ====》》》》{}", send, send.getResult()); return send.getResult(); } } }
|
定义客户端配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Configuration public class ClientBeanConfig {
@Bean public NettyClient nettyClient() { return new NettyClient(); }
@Bean public NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) { return new NettyClientBeanPostProcessor(nettyClient); } }
|
最后和服务端一样,注入Spring容器
1 2 3 4 5 6 7 8 9 10 11
|
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ClientBeanConfig.class}) public @interface EnableNettyClient { }
|
至此我们的SpringBoot + Netty4的就已经实现了最最简单的rpc框架模式了;然后我们就可以引用我们自己的rpc依赖了。
最后再执行一下maven命令
netty-rpc-examples例子
接口服务
pom里啥也没有。。。
定义一个接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public interface Test1Api {
void test();
void test(int id, String name);
String testStr(int id);
Object testObj(); }
|
rpc-server服务端
正常的SpringBoot工程
引入pom
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>cn.happyloves.rpc</groupId> <artifactId>netty-rpc</artifactId> <version>0.0.1</version> </dependency>
<dependency> <groupId>cn.happyloves.netty.rpc.examples.api</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
|
配置属性
1 2 3 4 5
| spring.application.name=rpc-server
server.port=8080 netty.server-port=1111
|
创建一个实体类
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Data public class Account implements Serializable { private static final long serialVersionUID = 667178018106218163L; private Integer id;
private String name; private String username; private String password; }
|
创建Server实现Test1Api接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
@Slf4j @Service @RpcServer public class TestServiceImpl implements Test1Api { @Override public void test() { log.info("111111111"); }
@Override public void test(int id, String name) { log.info("222222222,{},{}", id, name); }
@Override public String testStr(int id) { log.info("33333333333333333,{}", id); return "33333333333333333 " + id; }
@Override public Object testObj() { log.info("444444444444444444"); Account account = new Account(); account.setName("张三"); return account; } }
|
最后在SpringBoot启动类上加上@EnableNettyServer
1 2 3 4 5 6 7 8 9 10 11
|
@EnableNettyServer @SpringBootApplication public class RpcServerApplication { public static void main(String[] args) { SpringApplication.run(RpcServerApplication.class, args); } }
|
rpc-server客户端
引入pom依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>cn.happyloves.rpc</groupId> <artifactId>netty-rpc</artifactId> <version>0.0.1</version> </dependency> <dependency> <groupId>cn.happyloves.netty.rpc.examples.api</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
|
创建Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@RestController public class ClientController { @RpcServer private Test1Api testServiceImpl;
@GetMapping("/test1") public void test() { testServiceImpl.test(); }
@GetMapping("/test2") public void test(int id, String name) { testServiceImpl.test(id, name); }
@GetMapping("/test3") public String testStr(int id) { return testServiceImpl.testStr(id); }
@GetMapping("/test4") public Object testObj() { return testServiceImpl.testObj(); } }
|
最后在启动类上加上注解@EnableNettyClient
1 2 3 4 5 6 7
| @EnableNettyClient @SpringBootApplication public class RpcClientApplication { public static void main(String[] args) { SpringApplication.run(RpcClientApplication.class, args); } }
|
先运行服务端,在运行客户端,然后在调用客户端接口就可以看到服务端能够接收到客户端发来的消息,然后服务端处理并返回,客户端接收并返回。。。
至此,一个小demo就完成了。
当然啦,后续还有很多需求需要处理的,比方说当前demo中客户端每次通信都需要创建一个实例去连接、服务的注册、客户端和服务端是同一个应用等等,这个后面再慢慢完善吧