本文共 7930 字,大约阅读时间需要 26 分钟。
rpc是远程过程调用,可做到像本地一样调用远程服务,是一种进程间通信方式。
本地函数即为常用的函数方法,通过传入参数或者直接调用即可得出输出,操作过程在同一个进程中进行。
Socket可支持在不同进程间进行通信,这需约定一个通信协议进行传参和函数调用,返回结果。
RPC能够像调用本地一样调用远程服务,用户不需要关心其底层的实现。
RPC框架中,我们关心的有如下几个部分:
协议要确定通信双方的格式(请求对象和响应对象)
协议对象RpcProtocol
public class RpcProtocol implements Serializable { private static final long serialVersionUID = -1102180003395190700L; // service host private String host; // service port private int port; // service info list private ListserviceInfoList; public String toJson() { String json = JsonUtil.objectToJson(this); return json; } public static RpcProtocol fromJson(String json) { return JsonUtil.jsonToObject(json, RpcProtocol.class); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RpcProtocol that = (RpcProtocol) o; return port == that.port && Objects.equals(host, that.host) && isListEquals(serviceInfoList, that.getServiceInfoList()); } private boolean isListEquals(List thisList, List thatList) { if (thisList == null && thatList == null) { return true; } if ((thisList == null && thatList != null) || (thisList != null && thatList == null) || (thisList.size() != thatList.size())) { return false; } return thisList.containsAll(thatList) && thatList.containsAll(thisList); } @Override public int hashCode() { return Objects.hash(host, port, serviceInfoList.hashCode()); } @Override public String toString() { return toJson(); } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public List getServiceInfoList() { return serviceInfoList; } public void setServiceInfoList(List serviceInfoList) { this.serviceInfoList = serviceInfoList; }}
请求对象RpcRequest
public class RpcRequest implements Serializable { private static final long serialVersionUID = -2524587347775862771L; //Id用于验证服务器请求和响应是否匹配 private String requestId; private String className; private String methodName; private Class [] parameterTypes; private Object[] parameters; private String version; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class [] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class [] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; }}
响应对象 RpcResponse
public class RpcResponse implements Serializable { private static final long serialVersionUID = 8215493329459772524L; private String requestId; private String error; private Object result; public boolean isError() { return error != null; } public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; }}
序列话将对象数据信息转换为用于存储或传输的信息,当服务器收到序列化后的信息会对其进行反序列化读取。
Kryo序列化
public class KryoSerializer extends Serializer { private KryoPool pool = KryoPoolFactory.getKryoPoolInstance(); @Override publicbyte[] serialize(T obj) { Kryo kryo = pool.borrow(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output out = new Output(byteArrayOutputStream); try { kryo.writeObject(out, obj); out.close(); return byteArrayOutputStream.toByteArray(); } catch (Exception ex) { throw new RuntimeException(ex); } finally { try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException(e); } pool.release(kryo); } } @Override public Object deserialize(byte[] bytes, Class clazz) { Kryo kryo = pool.borrow(); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input in = new Input(byteArrayInputStream); try { Object result = kryo.readObject(in, clazz); in.close(); return result; } catch (Exception ex) { throw new RuntimeException(ex); } finally { try { byteArrayInputStream.close(); } catch (IOException e) { throw new RuntimeException(e); } pool.release(kryo); } }}
在协议格式和序列化方式确定前提下,还要用编码器将请求对象转化为便于传输的字节流,而解码器则将字节流转换为服务端应用能使用的格式。
编码器RpcEncoder
public class RpcEncoder extends MessageToByteEncoder { private static final Logger logger = LoggerFactory.getLogger(RpcEncoder.class); private Class genericClass; private Serializer serializer; public RpcEncoder(Class genericClass, Serializer serializer) { this.genericClass = genericClass; this.serializer = serializer; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { try { byte[] data = serializer.serialize(in); out.writeInt(data.length); out.writeBytes(data); } catch (Exception ex) { logger.error("Encode error: " + ex.toString()); } } }}
解码器
public class RpcDecoder extends ByteToMessageDecoder { private static final Logger logger = LoggerFactory.getLogger(RpcDecoder.class); private Class genericClass; private Serializer serializer; public RpcDecoder(Class genericClass, Serializer serializer) { this.genericClass = genericClass; this.serializer = serializer; } @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List
转载地址:http://ffkai.baihongyu.com/