RPC远程服务调用
RPC的诞生
RPC远程过程调用(Remote Procedure Call)
调用远程计算机上的服务,就像调用本地服务一样。
RPC的JAVA版本–RMI
RMI(remote method invocation),可以认为是RPC的java版本,允许运行在一个java 虚拟机的对象调用运行在另一个java虚拟机上对象的方法。
实现原理
RMI使用的是JRMP(Java Remote Messageing Protocol)协议, JRMP是专门为java定制的通信协议,所以是纯java的分布式解决方案
实现RMI程序步骤
1、创建一个远程接口,继承java.rmi.Remote接口
2、实现远程接口,并继承UnicastRemoteObject
3、创建服务器程序,同时使用createRegistry方法注册远程接口对象
4、创建客户端程序,通过Naming类的lookup方法来远程调用接口中的方法
当然这种方式的效率很低
企业级RPC解决方案
Dubbo
阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC实现服务的输出和输入功能,可以和Spring框架无缝集成。
RPC的原理手写RPC
1、创建一个查询用户的接口
1 2 3
| public interface IUserService { public User findUserById(Integer id); }
|
2、创建实体类并且实现序列化接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.bitzh.rpc;
import java.io.Serializable;
public class User implements Serializable { private static final long serialVersionUID = 6245288973191873978L; private String username; private Integer id;
public User(String username, Integer id) { this.username = username; this.id = id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; } }
|
3、接口实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.bitzh.rpc;
public class UserServiceImpl implements IUserService { @Override public User findUserById(Integer id) { return new User("张三", id); } }
|
4、RPC调用类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.bitzh.rpc;
public class RpcDemo { public static void main(String[] args) { IUserService service = new UserServiceImpl(); service.findUserById(13);
service.findUserById(13); } }
|
解决服务在远端的问题
服务在远端我们肯定不会用Http来进行通讯,因为Http里面包含了很多头,因此我们用TCP/IP的通讯
1、服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.bitzh.rpc.server;
import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
import java.io.*; import java.net.ServerSocket; import java.net.Socket;
public class Server { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(8888); while(true){ Socket socket = serverSocket.accept(); process(socket); socket.close(); } } private static void process(Socket socket)throws Exception { InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
int id = dataInputStream.readInt(); IUserService userService = new UserServiceImpl(); User user = userService.findUserById(id); dataOutputStream.writeInt(user.getId()); dataOutputStream.writeUTF(user.getUsername());
dataOutputStream.flush();
} }
|
2、客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.bitzh.rpc.client;
import com.bitzh.rpc.User;
import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.OutputStream; import java.net.Socket;
public class Client { public static void main(String[] args) throws Exception{ Socket socket = new Socket("127.0.0.1",8888);
ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(bos); dataOutputStream.writeInt(13); socket.getOutputStream().write(bos.toByteArray()); socket.getOutputStream().flush();
DataInputStream in = new DataInputStream(socket.getInputStream()); String name = in.readUTF(); int id = in.readInt(); User user = new User(name,id); System.out.println(user.toString());
bos.close(); dataOutputStream.close(); socket.close();
} }
|
解决客户端简单调用的问题
刚刚我们的代码,一大堆的处理,本质上我们就只想调用接口,这时候我们就利用到技术:客户端存根
把网络连接的操作封装到Stub里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.bitzh.rpc.client;
import com.bitzh.rpc.User;
import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.net.Socket;
public class Stub { public User findUserById(Integer id) throws Exception{ Socket socket = new Socket("127.0.0.1",8888);
ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(bos); dataOutputStream.writeInt(13); socket.getOutputStream().write(bos.toByteArray()); socket.getOutputStream().flush();
DataInputStream in = new DataInputStream(socket.getInputStream()); String name = in.readUTF(); id = in.readInt(); User user = new User(name,id); System.out.println(user.toString());
bos.close(); dataOutputStream.close(); socket.close(); return new User("张三", id); } }
|
而在本地就只用跟调用本地的方法类似
1 2 3 4 5 6 7 8
| public class Client { public static void main(String[] args) throws Exception{ Stub stub = new Stub(); stub.findUserById(13);
} }
|
RPC的核心技术动态代理
动态代理–隐藏网络细节
修改Stub类的代码实现动态代理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.bitzh.rpc.client;
import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.Socket;
public class Stub { public static IUserService getStub() throws Exception {
InvocationHandler handler = new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = new Socket("127.0.0.1", 8888);
ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(bos); dataOutputStream.writeInt(13); socket.getOutputStream().write(bos.toByteArray()); socket.getOutputStream().flush();
DataInputStream in = new DataInputStream(socket.getInputStream()); String name = in.readUTF(); int id = in.readInt(); User user = new User(name, id); System.out.println(user.toString());
bos.close(); dataOutputStream.close(); socket.close(); return user;
} }; Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, handler); return (IUserService) o; } }
|
服务端利用反射提高灵活性
传递的时候不传一些死的方法,如果我们传递类、方法、参数,在服务端利用反射就灵活了很多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.bitzh.rpc.server;
import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
import java.io.*; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket;
public class Server { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(8888); while(true){ Socket socket = serverSocket.accept(); process(socket); socket.close(); } } private static void process(Socket socket)throws Exception { InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream); String methodName = objectInputStream.readUTF(); Class[] parameterTypes = (Class[]) objectInputStream.readObject(); Object[] arguments = (Object[]) objectInputStream.readObject();
IUserService service = new UserServiceImpl(); Method method = service.getClass().getMethod(methodName, parameterTypes);
User user = (User) method.invoke(service, arguments); dataOutputStream.writeInt(user.getId()); dataOutputStream.writeUTF(user.getUsername());
dataOutputStream.flush();
} }
|
终级方案(把整个类也发过去)
客户端
1 2 3 4 5 6 7 8 9 10
| public class Client { public static void main(String[] args) throws Exception{ IUserService stub = Stub.getStub(IUserService.class); stub.findUserById(13); System.out.println();
} }
|
Stub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.bitzh.rpc.client;
import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
import java.io.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.Socket;
public class Stub { public static IUserService getStub(Class clazz) throws Exception {
InvocationHandler handler = new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = new Socket("127.0.0.1", 8888);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); objectOutputStream.writeUTF(clazz.getName()); objectOutputStream.writeUTF(methodName); objectOutputStream.writeObject(parameterTypes); objectOutputStream.writeObject(args); objectOutputStream.flush();
ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); Object o = in.readObject();
objectOutputStream.close(); socket.close(); return o;
} }; Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, handler); return (IUserService) o; }
}
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.bitzh.rpc.server;
import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
import java.io.*; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket;
public class Server { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(8888); while(true){ Socket socket = serverSocket.accept(); process(socket); socket.close(); } } private static void process(Socket socket)throws Exception { InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream); String clazzName = objectInputStream.readUTF(); String methodName = objectInputStream.readUTF(); Class[] parameterTypes = (Class[]) objectInputStream.readObject(); Object[] arguments = (Object[]) objectInputStream.readObject(); Class clazz = UserServiceImpl.class; Method method = clazz.getMethod(methodName, parameterTypes);
User user = (User) method.invoke(clazz.newInstance(), arguments); objectOutputStream.writeObject(user); objectOutputStream.flush();
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.bitzh.rpc.server;
import com.bitzh.rpc.IUserService; import com.bitzh.rpc.User;
public class UserServiceImpl implements IUserService { @Override public User findUserById(Integer id) { return new User("张三", id); } }
|
RPC框架流程及技术
RPC在架构中的位置
RPC框架序列化问题
对象在网络上传输都是二进制!
所以对象要经过序列化转成Byte数组二进制数组,这个过程称为序列化,这个流程反过来就是反序列化
1、JDK的原生序列化(ObjectInputStream反序列化,ObjectOutputStream序列化)
具体如何操作
当然原生的序列化效率太低了,所以衍生出了其他序列化的手段
比如JSON(典型的key-value形式,优点很方便,缺点是进行序列化的开销大,没有类型),
Hessian :
优点:支持跨语言,动态的二进制的紧凑的,生成字节数更少,性能好
缺点:常见对象不支持,Linked类型需要拓展修复。Byte integer
Protobuf(IDL:Interface description language):支持主流语言,专注于效率的协议,
不便之处:工具(User -> UserProtoBuf类),使用门槛高
RPC框架如何选择序列化?
通常是用ProtoBuf或者Hessian
如何架构设计一个RPC框架
如何提升RPC的吞吐量
使用全异步的方法
Zookeeper安装
1 Zookeeper简介
zookeeper分布式管理软件。常用它做注册中心(依赖zookeeper的发布/订阅功能)、配置文件中心、分布式锁配置、集群管理等。
zookeeper一共就有两个版本。主要使用的是java语言写的。
2 安装
2.1 上传压缩文件
上传到 /usr/local/tmp中
2.2 解压
1 2
| # tar zxf apache-zookeeper-3.5.5-bin.tar.gz # cp -r apache-zookeeper-3.5.5-bin ../zookeeper
|
2.3 新建data目录
进入到zookeeper中
1 2
| # cd /usr/local/zookeeper # mkdir data
|
2.4 修改配置文件
进入conf中
1 2 3
| # cd conf # cp zoo_sample.cfg zoo.cfg # vim zoo.cfg
|
修改dataDir为data文件夹路径
1
| dataDir=/usr/local/zookeeper/data
|
2.5 启动zookeeper
进入bin文件夹
1 2
| # cd /usr/local/zookeeper/bin # ./zkServer.sh start
|
通过status查看启动状态。稍微有个等待时间
# ./zkServer.sh status
Zookeeper客户端常用命令
进入到./zkCli.sh命令行工具后,可以使用下面常用命令
1 ls
ls [-s][-R] /path
-s 详细信息,替代老版的ls2
-R 当前目录和子目录中内容都罗列出来
例如:ls -R / 显示根目录下所有内容
2 create
create /path [data]
[data] 包含内容
创建指定路径信息
例如:create /demo 创建/demo
3 get
get [-s] /path
[-s] 详细信息
查看指定路径下内容。
例如: get -s /demo
null:存放的数据
cZxid:创建时zxid(znode每次改变时递增的事务id)
ctime:创建时间戳
mZxid:最近一次更新的zxid
mtime:最近一次更新的时间戳
pZxid:子节点的zxid
cversion:子节点更新次数
dataversion:节点数据更新次数
aclVersion:节点ACL(授权信息)的更新次数
ephemeralOwner:如果该节点为ephemeral节点(临时,生命周期与session一样), ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是ephemeral节点, ephemeralOwner值为0.
dataLength:节点数据字节数
numChildren:子节点数量
4 set
set /path data
设置节点内容
5 delete
delete /path
删除节点
向Zookeeper中注册内容
新建项目ZookeeperClient
1 创建/demo
使用zookeeper的客户端命令工具创建/demo
1 2
| ./zkCli.sh create /demos
|
2 添加依赖
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.5</version> </dependency> </dependencies>
|
3 编写代码
创建类com.msb.MyApp。
ZooDefs.Ids.OPEN_ACL_UNSAFE 表示权限。
CreateMode.PERSISTENT_SEQUENTIAL 永久存储,文件内容编号递增。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String [] args){ try { ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("获取连接"); } }); String content = zookeeper.create("/demo/nn", "content".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("content"+content); } catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
|
4 查看上传数据
ls -R / :查看列表
get /demo/nn0000000002 :查看内容
从zookeeper中发现内容
在原有项目中新建一个类,类中编写主方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void main(String[] args) { try { ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("获取连接"); } }); //获取列表 List<String> list = zookeeper.getChildren("/demo", false); for (String child : list) { byte[] result = zookeeper.getData("/demo/" + child, false, null); System.out.println(new String(result)); } } catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
|