RPC远程服务调用

RPC的诞生

image-20240814001351555

RPC远程过程调用(Remote Procedure Call)

调用远程计算机上的服务,就像调用本地服务一样。

image-20240814001503712

RPC的JAVA版本–RMI

RMI(remote method invocation),可以认为是RPC的java版本,允许运行在一个java 虚拟机的对象调用运行在另一个java虚拟机上对象的方法。

实现原理

RMI使用的是JRMP(Java Remote Messageing Protocol)协议, JRMP是专门为java定制的通信协议,所以是纯java的分布式解决方案

实现RMI程序步骤

1、创建一个远程接口,继承java.rmi.Remote接口

2、实现远程接口,并继承UnicastRemoteObject

3、创建服务器程序,同时使用createRegistry方法注册远程接口对象

4、创建客户端程序,通过Naming类的lookup方法来远程调用接口中的方法

image-20240814002007734

当然这种方式的效率很低

企业级RPC解决方案

Dubbo

阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC实现服务的输出和输入功能,可以和Spring框架无缝集成。

image-20240814003708821

RPC的原理手写RPC

1、创建一个查询用户的接口

1
2
3
public interface IUserService {
public User findUserById(Integer id);
}

2、创建实体类并且实现序列化接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.bitzh.rpc;

import java.io.Serializable;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 0:48
*/
public class User implements Serializable {
private static final long serialVersionUID = 6245288973191873978L;
private String username;
private Integer id;

public User(String username, Integer id) {
this.username = username;
this.id = id;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}
}

3、接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.bitzh.rpc;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 0:53
*/
public class UserServiceImpl implements IUserService {
@Override
public User findUserById(Integer id) {
return new User("张三", id);
}
}

4、RPC调用类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.bitzh.rpc;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 0:54
*/
public class RpcDemo {
//这是一个main方法,程序的入口
public static void main(String[] args) {
IUserService service = new UserServiceImpl();
//本地调用
service.findUserById(13);

//Http调用--远程
// RequestParam param = new RequestParam();
// ....
//HttpClient.get(url,param,....);

//Rpc调用 service封装 调用远程接口 和调用本地接口一样的
service.findUserById(13);//具体的视线已经在另外一台服务器上
}
}

解决服务在远端的问题

服务在远端我们肯定不会用Http来进行通讯,因为Http里面包含了很多头,因此我们用TCP/IP的通讯

image-20240814010113047

image-20240814013505984

1、服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.bitzh.rpc.server;

import com.bitzh.rpc.IUserService;
import com.bitzh.rpc.User;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:02
*/
//服务端
public class Server {
//这是一个main方法,程序的入口
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8888);//监听8888端口
while(true){
Socket socket = serverSocket.accept();//网络请求过来了,使用socket通道(没有就阻塞)
//业务的处理
process(socket);
socket.close();//记得关闭
}
}
private static void process(Socket socket)throws Exception {
InputStream inputStream = socket.getInputStream();//客户端送过来的信息
OutputStream outputStream = socket.getOutputStream();//响应客户端
DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);

int id = dataInputStream.readInt();
IUserService userService = new UserServiceImpl();
User user = userService.findUserById(id);
dataOutputStream.writeInt(user.getId());
dataOutputStream.writeUTF(user.getUsername());

dataOutputStream.flush();//刷新缓冲区

}
}

2、客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.bitzh.rpc.client;

import com.bitzh.rpc.User;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:02
*/

//客户端
public class Client {
//这是一个main方法,程序的入口
public static void main(String[] args) throws Exception{
//TCP的网络连接
Socket socket = new Socket("127.0.0.1",8888);


//发送请求
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(bos);
dataOutputStream.writeInt(13);
//DataOutputStream.writeInt(int)方法会将整数13写入到ByteArrayOutputStream对象中,
// 但是并没有真正地将数据发送出去。要确保数据被发送出去,
// 需要调用socket.getOutputStream().write(bos.toByteArray())
// 来将ByteArrayOutputStream转换为字节数组并写入到套接字的输出流中。
socket.getOutputStream().write(bos.toByteArray());
socket.getOutputStream().flush();




//处理响应
DataInputStream in = new DataInputStream(socket.getInputStream());
String name = in.readUTF();
int id = in.readInt();
User user = new User(name,id);
System.out.println(user.toString());

bos.close();
dataOutputStream.close();
socket.close();

}
}

解决客户端简单调用的问题

刚刚我们的代码,一大堆的处理,本质上我们就只想调用接口,这时候我们就利用到技术:客户端存根

image-20240814014910025

image-20240814015323601

把网络连接的操作封装到Stub里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.bitzh.rpc.client;

import com.bitzh.rpc.User;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:49
*/
public class Stub {
public User findUserById(Integer id) throws Exception{
//TCP的网络连接
Socket socket = new Socket("127.0.0.1",8888);


//发送请求
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(bos);
dataOutputStream.writeInt(13);
//DataOutputStream.writeInt(int)方法会将整数13写入到ByteArrayOutputStream对象中,
// 但是并没有真正地将数据发送出去。要确保数据被发送出去,
// 需要调用socket.getOutputStream().write(bos.toByteArray())
// 来将ByteArrayOutputStream转换为字节数组并写入到套接字的输出流中。
socket.getOutputStream().write(bos.toByteArray());
socket.getOutputStream().flush();




//处理响应
DataInputStream in = new DataInputStream(socket.getInputStream());
String name = in.readUTF();
id = in.readInt();
User user = new User(name,id);
System.out.println(user.toString());

bos.close();
dataOutputStream.close();
socket.close();
return new User("张三", id);
}
}

而在本地就只用跟调用本地的方法类似

1
2
3
4
5
6
7
8
public class Client {
//这是一个main方法,程序的入口
public static void main(String[] args) throws Exception{
Stub stub = new Stub();
stub.findUserById(13);//调用远程的方法跟调用本地的方法类似

}
}

RPC的核心技术动态代理

动态代理–隐藏网络细节

image-20240814022202959

修改Stub类的代码实现动态代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.bitzh.rpc.client;

import com.bitzh.rpc.IUserService;
import com.bitzh.rpc.User;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:49
*/
public class Stub {
public static IUserService getStub() throws Exception {

InvocationHandler handler = new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//TCP的网络连接
Socket socket = new Socket("127.0.0.1", 8888);


//发送请求
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(bos);
dataOutputStream.writeInt(13);
//DataOutputStream.writeInt(int)方法会将整数13写入到ByteArrayOutputStream对象中,
// 但是并没有真正地将数据发送出去。要确保数据被发送出去,
// 需要调用socket.getOutputStream().write(bos.toByteArray())
// 来将ByteArrayOutputStream转换为字节数组并写入到套接字的输出流中。
socket.getOutputStream().write(bos.toByteArray());
socket.getOutputStream().flush();


//处理响应
DataInputStream in = new DataInputStream(socket.getInputStream());
String name = in.readUTF();
int id = in.readInt();
User user = new User(name, id);
System.out.println(user.toString());

bos.close();
dataOutputStream.close();
socket.close();
return user;

}
};
//执行动态代理,传入类加载器,传入接口,传入代理对象,返回对象
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, handler);
return (IUserService) o;
}
}

服务端利用反射提高灵活性

传递的时候不传一些死的方法,如果我们传递类、方法、参数,在服务端利用反射就灵活了很多

image-20240814043541558

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.bitzh.rpc.server;

import com.bitzh.rpc.IUserService;
import com.bitzh.rpc.User;

import java.io.*;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:02
*/
//服务端
public class Server {
//这是一个main方法,程序的入口
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8888);//监听8888端口
while(true){
Socket socket = serverSocket.accept();//网络请求过来了,使用socket通道(没有就阻塞)
//业务的处理
process(socket);
socket.close();//记得关闭
}
}
private static void process(Socket socket)throws Exception {
InputStream inputStream = socket.getInputStream();//客户端送过来的信息
OutputStream outputStream = socket.getOutputStream();//响应客户端
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
//格式化参数,传出的第一个参数是方法名,第二个是方法参数类型,第三个是参数值
String methodName = objectInputStream.readUTF();
Class[] parameterTypes = (Class[]) objectInputStream.readObject();
Object[] arguments = (Object[]) objectInputStream.readObject();
//通过反射调用

IUserService service = new UserServiceImpl();
Method method = service.getClass().getMethod(methodName, parameterTypes);

User user = (User) method.invoke(service, arguments);//通过反射调用
dataOutputStream.writeInt(user.getId());
dataOutputStream.writeUTF(user.getUsername());

dataOutputStream.flush();//刷新缓冲区

}
}

终级方案(把整个类也发过去)

image-20240814045603817

客户端

1
2
3
4
5
6
7
8
9
10
//客户端
public class Client {
//这是一个main方法,程序的入口
public static void main(String[] args) throws Exception{
IUserService stub = Stub.getStub(IUserService.class);//可以根据不同的类发起不同的调用
stub.findUserById(13);//调用远程的方法跟调用本地的方法类似 加入了代理的概念
System.out.println();

}
}

Stub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.bitzh.rpc.client;

import com.bitzh.rpc.IUserService;
import com.bitzh.rpc.User;

import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:49
*/
public class Stub {
public static IUserService getStub(Class clazz) throws Exception {

InvocationHandler handler = new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//TCP的网络连接
Socket socket = new Socket("127.0.0.1", 8888);


//发送请求
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
String methodName = method.getName();
//获得参数类型
Class[] parameterTypes = method.getParameterTypes();
//格式化参数,传出的第零个事类名,第一个参数是方法名,第二个是方法参数类型,第三个是参数值
objectOutputStream.writeUTF(clazz.getName());
objectOutputStream.writeUTF(methodName);
objectOutputStream.writeObject(parameterTypes);
objectOutputStream.writeObject(args);
objectOutputStream.flush();




//处理响应
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
Object o = in.readObject();


//响应端 对象不就行了
objectOutputStream.close();
socket.close();
return o;

}
};
//执行动态代理,传入类加载器,传入接口,传入代理对象,返回对象
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, handler);
return (IUserService) o;
}





}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.bitzh.rpc.server;

import com.bitzh.rpc.IUserService;
import com.bitzh.rpc.User;

import java.io.*;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 1:02
*/
//服务端
public class Server {
//这是一个main方法,程序的入口
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8888);//监听8888端口
while(true){
Socket socket = serverSocket.accept();//网络请求过来了,使用socket通道(没有就阻塞)
//业务的处理
process(socket);
socket.close();//记得关闭
}
}
private static void process(Socket socket)throws Exception {
InputStream inputStream = socket.getInputStream();//客户端送过来的信息
OutputStream outputStream = socket.getOutputStream();//响应客户端
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
//格式化参数,传出的第零个事类名,第一个参数是方法名,第二个是方法参数类型,第三个是参数值
String clazzName = objectInputStream.readUTF();
String methodName = objectInputStream.readUTF();
Class[] parameterTypes = (Class[]) objectInputStream.readObject();
Object[] arguments = (Object[]) objectInputStream.readObject();
//通过反射调用
//这里会从服务的注册表中去找(在RPC框架中有服务的注册中心)
Class clazz = UserServiceImpl.class;
Method method = clazz.getMethod(methodName, parameterTypes);

User user = (User) method.invoke(clazz.newInstance(), arguments);//通过反射调用
objectOutputStream.writeObject(user);//这里就需要根据不同的类
objectOutputStream.flush();//刷新缓冲区

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.bitzh.rpc.server;

import com.bitzh.rpc.IUserService;
import com.bitzh.rpc.User;

/**
* @author oyy0v0
* @version 1.0.0
* @create 2024/8/14 0:53
*/
public class UserServiceImpl implements IUserService {
@Override
public User findUserById(Integer id) {
return new User("张三", id);
}
}

RPC框架流程及技术

image-20240814050230746

RPC在架构中的位置

image-20240814050938420

RPC框架序列化问题

对象在网络上传输都是二进制!

所以对象要经过序列化转成Byte数组二进制数组,这个过程称为序列化,这个流程反过来就是反序列化

1、JDK的原生序列化(ObjectInputStream反序列化,ObjectOutputStream序列化)

image-20240814051527033

具体如何操作

image-20240814052753447

当然原生的序列化效率太低了,所以衍生出了其他序列化的手段

比如JSON(典型的key-value形式,优点很方便,缺点是进行序列化的开销大,没有类型),

Hessian :

​ 优点:支持跨语言,动态的二进制的紧凑的,生成字节数更少,性能好

​ 缺点:常见对象不支持,Linked类型需要拓展修复。Byte integer

Protobuf(IDL:Interface description language):支持主流语言,专注于效率的协议,

​ 不便之处:工具(User -> UserProtoBuf类),使用门槛高

RPC框架如何选择序列化?

image-20240814054559237

通常是用ProtoBuf或者Hessian

如何架构设计一个RPC框架

image-20240814060441779

如何提升RPC的吞吐量

image-20240814061709909

使用全异步的方法

Zookeeper安装

1 Zookeeper简介

​ zookeeper分布式管理软件。常用它做注册中心(依赖zookeeper的发布/订阅功能)、配置文件中心、分布式锁配置、集群管理等。

​ zookeeper一共就有两个版本。主要使用的是java语言写的。

2 安装

2.1 上传压缩文件

​ 上传到 /usr/local/tmp中

2.2 解压

1
2
# tar zxf apache-zookeeper-3.5.5-bin.tar.gz
# cp -r apache-zookeeper-3.5.5-bin ../zookeeper

2.3 新建data目录

进入到zookeeper中

1
2
# cd /usr/local/zookeeper
# mkdir data

2.4 修改配置文件

进入conf中

1
2
3
# cd conf
# cp zoo_sample.cfg zoo.cfg
# vim zoo.cfg

修改dataDir为data文件夹路径

1
dataDir=/usr/local/zookeeper/data

2.5 启动zookeeper

进入bin文件夹

1
2
# cd /usr/local/zookeeper/bin
# ./zkServer.sh start

通过status查看启动状态。稍微有个等待时间

# ./zkServer.sh status

Zookeeper客户端常用命令

​ 进入到./zkCli.sh命令行工具后,可以使用下面常用命令

1 ls

​ ls [-s][-R] /path

​ -s 详细信息,替代老版的ls2

​ -R 当前目录和子目录中内容都罗列出来

​ 例如:ls -R / 显示根目录下所有内容

2 create

​ create /path [data]

​ [data] 包含内容

​ 创建指定路径信息

​ 例如:create /demo 创建/demo

3 get

​ get [-s] /path

​ [-s] 详细信息

​ 查看指定路径下内容。

​ 例如: get -s /demo

​ null:存放的数据

​ cZxid:创建时zxid(znode每次改变时递增的事务id)

​ ctime:创建时间戳

​ mZxid:最近一次更新的zxid

​ mtime:最近一次更新的时间戳

​ pZxid:子节点的zxid

​ cversion:子节点更新次数

​ dataversion:节点数据更新次数

​ aclVersion:节点ACL(授权信息)的更新次数

​ ephemeralOwner:如果该节点为ephemeral节点(临时,生命周期与session一样), ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是ephemeral节点, ephemeralOwner值为0.

​ dataLength:节点数据字节数

​ numChildren:子节点数量

4 set

​ set /path data

​ 设置节点内容

5 delete

​ delete /path

​ 删除节点

向Zookeeper中注册内容

​ 新建项目ZookeeperClient

1 创建/demo

​ 使用zookeeper的客户端命令工具创建/demo

1
2
./zkCli.sh
create /demos

2 添加依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
</dependencies>

3 编写代码

​ 创建类com.msb.MyApp。

​ ZooDefs.Ids.OPEN_ACL_UNSAFE 表示权限。

​ CreateMode.PERSISTENT_SEQUENTIAL 永久存储,文件内容编号递增。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String [] args){
try {
ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("获取连接");
}
});
String content = zookeeper.create("/demo/nn", "content".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("content"+content);
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

4 查看上传数据

​ ls -R / :查看列表

​ get /demo/nn0000000002 :查看内容

从zookeeper中发现内容

​ 在原有项目中新建一个类,类中编写主方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
try {
ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("获取连接");
}
});
//获取列表
List<String> list = zookeeper.getChildren("/demo", false);
for (String child : list) {
byte[] result = zookeeper.getData("/demo/" + child, false, null);
System.out.println(new String(result));
}
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}