博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rpc框架(一)要点介绍
阅读量:4171 次
发布时间:2019-05-26

本文共 7930 字,大约阅读时间需要 26 分钟。

Rpc框架笔记

rpc是远程过程调用,可做到像本地一样调用远程服务,是一种进程间通信方式。

1.本地函数调用

本地函数即为常用的函数方法,通过传入参数或者直接调用即可得出输出,操作过程在同一个进程中进行。

2.Socket

Socket可支持在不同进程间进行通信,这需约定一个通信协议进行传参和函数调用,返回结果。

3.RPC框架

RPC能够像调用本地一样调用远程服务,用户不需要关心其底层的实现。

RPC框架的要点

RPC框架中,我们关心的有如下几个部分:

  • 动态代理
  • 序列化
  • 网络传输
  • 通讯协议

1.通讯协议(Protocol)

协议要确定通信双方的格式(请求对象和响应对象)

协议对象RpcProtocol

public class RpcProtocol implements Serializable {
private static final long serialVersionUID = -1102180003395190700L; // service host private String host; // service port private int port; // service info list private List
serviceInfoList; public String toJson() {
String json = JsonUtil.objectToJson(this); return json; } public static RpcProtocol fromJson(String json) {
return JsonUtil.jsonToObject(json, RpcProtocol.class); } @Override public boolean equals(Object o) {
if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RpcProtocol that = (RpcProtocol) o; return port == that.port && Objects.equals(host, that.host) && isListEquals(serviceInfoList, that.getServiceInfoList()); } private boolean isListEquals(List
thisList, List
thatList) {
if (thisList == null && thatList == null) {
return true; } if ((thisList == null && thatList != null) || (thisList != null && thatList == null) || (thisList.size() != thatList.size())) {
return false; } return thisList.containsAll(thatList) && thatList.containsAll(thisList); } @Override public int hashCode() {
return Objects.hash(host, port, serviceInfoList.hashCode()); } @Override public String toString() {
return toJson(); } public String getHost() {
return host; } public void setHost(String host) {
this.host = host; } public int getPort() {
return port; } public void setPort(int port) {
this.port = port; } public List
getServiceInfoList() {
return serviceInfoList; } public void setServiceInfoList(List
serviceInfoList) {
this.serviceInfoList = serviceInfoList; }}

请求对象RpcRequest

public class RpcRequest implements Serializable {
private static final long serialVersionUID = -2524587347775862771L; //Id用于验证服务器请求和响应是否匹配 private String requestId; private String className; private String methodName; private Class
[] parameterTypes; private Object[] parameters; private String version; public String getRequestId() {
return requestId; } public void setRequestId(String requestId) {
this.requestId = requestId; } public String getClassName() {
return className; } public void setClassName(String className) {
this.className = className; } public String getMethodName() {
return methodName; } public void setMethodName(String methodName) {
this.methodName = methodName; } public Class
[] getParameterTypes() {
return parameterTypes; } public void setParameterTypes(Class
[] parameterTypes) {
this.parameterTypes = parameterTypes; } public Object[] getParameters() {
return parameters; } public void setParameters(Object[] parameters) {
this.parameters = parameters; } public String getVersion() {
return version; } public void setVersion(String version) {
this.version = version; }}

响应对象 RpcResponse

public class RpcResponse implements Serializable {
private static final long serialVersionUID = 8215493329459772524L; private String requestId; private String error; private Object result; public boolean isError() {
return error != null; } public String getRequestId() {
return requestId; } public void setRequestId(String requestId) {
this.requestId = requestId; } public String getError() {
return error; } public void setError(String error) {
this.error = error; } public Object getResult() {
return result; } public void setResult(Object result) {
this.result = result; }}

2.序列化

序列话将对象数据信息转换为用于存储或传输的信息,当服务器收到序列化后的信息会对其进行反序列化读取。

Kryo序列化

public class KryoSerializer extends Serializer {
private KryoPool pool = KryoPoolFactory.getKryoPoolInstance(); @Override public
byte[] serialize(T obj) {
Kryo kryo = pool.borrow(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output out = new Output(byteArrayOutputStream); try {
kryo.writeObject(out, obj); out.close(); return byteArrayOutputStream.toByteArray(); } catch (Exception ex) {
throw new RuntimeException(ex); } finally {
try {
byteArrayOutputStream.close(); } catch (IOException e) {
throw new RuntimeException(e); } pool.release(kryo); } } @Override public
Object deserialize(byte[] bytes, Class
clazz) {
Kryo kryo = pool.borrow(); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input in = new Input(byteArrayInputStream); try {
Object result = kryo.readObject(in, clazz); in.close(); return result; } catch (Exception ex) {
throw new RuntimeException(ex); } finally {
try {
byteArrayInputStream.close(); } catch (IOException e) {
throw new RuntimeException(e); } pool.release(kryo); } }}

3.编码器,解码器

在协议格式和序列化方式确定前提下,还要用编码器将请求对象转化为便于传输的字节流,而解码器则将字节流转换为服务端应用能使用的格式。

编码器RpcEncoder

public class RpcEncoder extends MessageToByteEncoder {
private static final Logger logger = LoggerFactory.getLogger(RpcEncoder.class); private Class
genericClass; private Serializer serializer; public RpcEncoder(Class
genericClass, Serializer serializer) {
this.genericClass = genericClass; this.serializer = serializer; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
try {
byte[] data = serializer.serialize(in); out.writeInt(data.length); out.writeBytes(data); } catch (Exception ex) {
logger.error("Encode error: " + ex.toString()); } } }}

解码器

public class RpcDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(RpcDecoder.class); private Class
genericClass; private Serializer serializer; public RpcDecoder(Class
genericClass, Serializer serializer) {
this.genericClass = genericClass; this.serializer = serializer; } @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
if (in.readableBytes() < 4) {
return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) {
in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = null; try {
obj = serializer.deserialize(data, genericClass); out.add(obj); } catch (Exception ex) {
logger.error("Decode error: " + ex.toString()); } }}

转载地址:http://ffkai.baihongyu.com/

你可能感兴趣的文章
jwt与token+redis,哪种方案更好用?
查看>>
Comparator接口
查看>>
在二叉树中找到一个节点的后继节点
查看>>
寻找第K大
查看>>
String.trim
查看>>
缓存行 伪共享
查看>>
400 : perceived to be a client error 错误
查看>>
Establishing SSL connection without server's identity verification is not recommended
查看>>
扫描包不存在:pojo类找不到
查看>>
c语言中计算数组长度的方法
查看>>
java 数组定义
查看>>
java中的&和&&的区别
查看>>
Java的位运算符
查看>>
BufferedReader与Scanner的区别
查看>>
java String于常量池中的介绍
查看>>
java Text 错误: 找不到或无法加载主类 Text
查看>>
XShell连接ubantu:给ubantu安装ssh
查看>>
c语言的null和0
查看>>
二进制详解:世界上有10种人,一种懂二进制,一种不懂。
查看>>
c语言一个字符变量存储多个字符
查看>>