引言
RPC框架对于一个公司的系统来讲应该是一种非常重要的基础能力,承载各系统之间的各种远程调用,比如公司的HSF、TR。如何也实现一个简单的RPC框架,这个通常需要借助一些网络通信框架来实现,自己基于socket的编写实现从时间成本,稳定性上来讲并不推荐。JAVA应用借助比较流行的netty或者JVM自带的RMI来实现,而C应用可以利用像libevent库进行构建。
netty的优势
netty作为后起之秀,借鉴了很多前者优秀的经验。它是基于java nio包扩展的一个高性能高并发的异步网络通信框架,对比原来的java io包,做了很多的改进。最大的变化在于编程模型的改变,原来的输入输出(inputstream/outputstream)对于一次操作来讲是单向的,只能进行读或写操作,需要内部构造两个stream才能完成。而nio则使用channel的方式,读写共用一个管道对象,每个连接等于一个管道(实际使用中连接同一远程服务的多个请求可以共用同一个管道,是不是很高效,后面会有介绍)。
注:本文所指的RPC并不是APP端向网关发起的请求,而是指后台各系统之间的调用。APP端的RPC是走http请求,再由网关经过后台RPC请求到各业务方后台。
工作原理
一般的tcp server开启服务大概是如下几个步骤:bind(port) listen() accept() read() write(),netty也采用此种方式。不过是采用NIO的多路复用Selector模式来实现。参考以下NIO的实例。
selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
上述模型乍一看是一个同步阻塞的,但实际上netty做了改进程,由一个boss线程进行循环收集就绪的IO事件(OP_CONNECT,OP_READ,OP_WRITE,OP_ACCEPT),然后初始化成一个channel,并进行dispatch到worker线程处理(Reactor模式),从应用层上看则是一个异步非阻塞。
netty tcp server构造示例:由一个boss线程组和一个worker线程组构成
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
boss线程收集到就绪IO事件以后如何分发给worker线程组的?
可以参看NioServerSocketChannel.java
protected int doReadMessages(List<Object> buf) throws Exception {
//收集IO事件,并构造成一个channel
SocketChannel ch = javaChannel().accept();
...
boss线程中设置worker线程注册此channel,后续读写操作交由worker线程处理。ServerBootstrap.java
try {
childGroup.register(channel).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
childGroup是初始化NettyServer时传入的线程组,register方法里面做了比较多的设置,主要是针对channel的操作类ChannelHandler。
编码、解码及半包问题
上述的设计有点类似Serlvet中的filter,数据会传递到注册的每一个handler中,当然netty中是有顺序区分的。ChannelHandler的作用主要是解码、编码、数据转换、业务处理、写数据等,以下是一个经典的设置。
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//解码
pipeline.addLast("decoder", new ObjectDecoder());
//编码
pipeline.addLast("encoder", new ObjectEncoder());
//业务处理
pipeline.addLast("business", new BusinessHandler());
...
需要注意的是编解码类并不解决半包的问题,2种解决办法:
使用LengthFieldBasedFrameDecoder和LengthFieldPrepender,分别在构造函数里面设置长度字段。
自定义ChannelHandler,继承ByteToMessageDecoder,并实现decode方法。
decode()方法主要是从ByteBuf中收集数据,然后转换成Object并加到out这个List中,当然这个处理过程比较复杂,举一个场景:比如客户端发送了2048个字节,收包的缓冲区大小是1024,则下面的这个decode方法会执行两次。第一次收到的包会存储在cumulation的变量中,这就解释了为什么一般编解码器不标注@Sharable的原因。不同的channel不能共享cumulation这个ByteBuf数据。
/*
Decode the from one bytebuf to an other. This method will be called till either the input bytebuf has nothing to read when return from this method or till nothing was read from the input bytebuf
*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
序列化
上面的编码、解码也可以理解成序列化,如果服务之间调用量不大,选择java默认的序列化或者json,对于调用量大,追求传输效率的可以选择hessian或者protobuf,tr调用中默认使用的是hessian。
public byte[] encode(Object obj) throws CodecException {
ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(byteArray);
output.setSerializerFactory(serializerFactory);
try {
output.writeObject(obj);
output.close();
} catch (IOException e) {
throw new CodecException("IOException occurred when Hessian serializer encode!", e);
}
protobuf方式序列化:
有一个需要注意的地方就是在channelHandler中设置pb encoder/decoder时只能指定一种decoder.
pipeline.addLast(new ProtobufDecoder(BussinessPB.getDefaultInstance()));
上面例子中相当于是proto文件被写死了,在数据的头部时传入业务的cmdCode或者type,动态去解析PB格式。
发送数据
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush(bytebuf).addListener(ChannelFutureListener.CLOSE);
上面的例子中发送完数据以后,会关闭连接。但实际应用中我们更希望连接能够重用,毕竟内部系统之间重复建连无法发挥企业内部的网络性能优势。这个地方可以构造一个conntionPool,这里不再敖述。
异步变同步
客户端发起一次调用,需要等待服务端的响应,因为nio是异步的,如何实现呢。
public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(RemotingCommand response) {
this.responseCommand = (ResponseCommand) response;
this.countDownLatch.countDown();
}
client端发送完请求后,利用countDownLatch的特性阻塞当前线程,直到countDownLatch的值为0.也可以使用BlockingQueque,requestId对应一个BlockingQueque,接收数据时的线程put response到BlockingQueque,前面的线程take这个BlockingQueque
调用方式
对于调用方来讲,一般拿到的是服务方的接口。比如com.xxx.Service,使用动态代理的方式构建出一个com.xxx.Service的代理类,在反射的方法里面,进行RPC调用。从调用方视角上看,这完全是一个本地调用。比如:
LiveFacade service = RpcServiceFactory.getRpcService(LiveFacade.class, rpcClient);
service.doXXX();
对于服务提供方来讲,在应用启动的时候收集发布对外的服务,一般这个过程是结合Spring来做,调用方请求过来时,可以根据请求的方法,参数进行反射调用。
路由寻址
客户端调用时需要知道服务提供方的IP地址、端口号,这个一般是通过ConfigServer获取,这里不在敖述,参考下面的流程图,如果需要自己实现一个ConfigServer,借助zookeeper来实现。
- 大小: 159.2 KB
分享到:
相关推荐
本专题主要通过三个章节实现一个rpc通信的基础功能,来学习RPC服务中间件是如何开发和使用。章节内以源码加说明实战方式来讲解,请尽可能下载源码学习。 - 手写RPC框架第一章《自定义配置xml》 - 手写RPC框架第二章...
netty是一个高性能高度封装的nio框架,本文实现用netty实现一个简化的rpc框架
BootNettyRpc 是一个采用Netty实现的Rpc框架,适用于Spring Boot项目,支持Spring Cloud
用Netty实现一个简单的RPC框架,基本上rpc主要的知识点都涉及到了,包括协议的定义,序列化反序列化,动态代理,Spring自动装配,Netty编解码器等。可以通过这个项目加强对Netty的学习掌握,也可以加深对RPC的理解。...
基于Zookeeper+Netty+Protostuff实现的简单RPC框架源码,代码内有详细注释
基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java )
如何用Netty写一个自己的RPC框架.pdf
基于netty的手写rpc框架。
Netty核心原理剖析与RPC实践手抄版本,基本复刻了全部内容,如有丢失请私聊
forest, 基于netty轻量的高性能分布式RPC服务框架
定义一个接口: public interface HelloService { String hello(String name); String hello(Person person); } 用注解@NettyRpcService实现接口: @NettyRpcService(HelloService.class, version = "1.0") ...
利用netty4.0定义的rpc通信框架
基于netty轻量的高性能分布式RPC服务框架
NettyRPC, 在联网的,但另一个RPC框架基于 NettyRPC另外一个基于网联网的RPC框架。特性简单,小代码库,易于学习 API非常快速,高性能完全非阻塞异步调用,同步调用,单向调用。长期持久连接,自动重新连接到服务器...
1、下载后导入eclipse(maven工程) 2、先运行ServerTest 3、再运行ClientTest,即可看到输出结果 4、保证可用,亲手编写,欢迎大家指正不足
simple-rpc 是一款基于 netty 的 RPC 框架,现有功能: 基本的客户端、服务端交互 提供代理实现接口 spring 集成, xml配置和Java Config配置方式
可参考博客http://blog.csdn.net/u013177446/article/details/66473066 使用netty/反射/序列化反序列化等技术是一个一个简单的RPC框架
基于Netty重构RPC框架.rar