Zookeeper 入门 工作机制 从设计模式角度来理解,是一个基于观察者模式设计的分布式服务管理框架,负责存储和管理大家都关心的数据 ,然后接受观察者的注册 ,一旦这些数据的状态发生变化,ZK就将 负责通知已经在ZK上注册的哪些观察者 做出相应的反应。
特点
1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
2)集群中只要有 半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
5)数据更新原子性,一次数据更新要么成功,要么失败。
6)实时性,在一定时间范围内,Client能读到最新数据。
数据结构 ZooKeeper 数据模型的结构与 Unix 文件系统 很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode 。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
应用场景 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡 等。
下载地址 1) 官网首页:
https://zookeeper.apache.org/
下载linux环境安装的tar包。
ZK本地安装 本地模式安装 1、安装前准备 (1)安装 JDK
(2)拷贝apache-zookeeper-3.5.7-bin.tar.gz安装包到 Linux 系统下
(3)解压到指定目录
1 tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
(4)修改名称
1 mv apache-zookeeper-3.5.7 -bin/zookeeper-3.5.7
2、配置修改 (1)将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg;
1 mv zoo_sample.cfg zoo.cfg
(2)打开 zoo.cfg 文件,修改 dataDir 路径:
修改如下内容:
1 dataDir=/opt/module/zookeeper-3.5.7/zkData
(3)在/opt/module/zookeeper-3.5.7/这个目录上创建 zkData 文件夹
3、操作ZK (1)启动 Zookeeper
(2)查看进程是否启动
(3)查看状态
(4)启动客户端
(5)退出客户端:
(6)停止 Zookeeper
配置参数解读 Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
1 )tickTime = 2000 :通信心跳 时间,Zookeeper 服务器 与 客户端 心跳时间,单位毫秒
2 )initLimit = 10 :LF初始通信时限
3 )syncLimit = 5 :LF 同步通信时限
Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
4 )dataDir :保存Zookeeper中的数据
注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录
5 )clientPort = 2181 :客户端连接端口,通常不做修改。
ZK集群 集群操作 集群安装 1) 集群 规划 在 hadoop102、hadoop103 和 hadoop104 三个节点上都部署 Zookeeper。
思考:如果是 10 台服务器,需要部署多少台 Zookeeper ?(奇数台即可)
2) 解压安装 (1)在 hadoop102 解压 Zookeeper 安装包到/opt/module/目录下
(2)修改 apache-zookeeper-3.5.7-bin 名称为 zookeeper-3.5.7
1 mv apache-zookeeper-3.5.7-bin/zookeeper-3.5.7
3) 配置服务器 编号 (1)在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData
(2)在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件
在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)
注意:添加 myid 文件,一定要在 Linux 里面创建,在 notepad++里面很可能乱码
(3)拷贝配置好的 zookeeper 到其他机器上
并分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4
4) 配置zoo.cfg 文件 (1)重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
1 mv zoo_sample.cfg zoo.cfg
(2)打开 zoo.cfg 文件
1 2 3 4 5 6 7 # 修改数据存储路径配置 dataDir=/opt/module/zookeeper-3.5.7/zkData # 增加如下配置 # server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888
(3)配置参数解读
A 是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据
就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比
较从而判断到底是哪个 server。
B 是这个服务器的地址;
C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
(4)同步 zoo.cfg 配置文件
5) 集群操作 (1)分别启动 Zookeeper
1 2 3 [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start [atguigu@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start [atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start
(2)查看状态
1 2 3 [atguigu@hadoop102 zookeeper-3.5.7]# bin/zkServer.sh status [atguigu@hadoop103 zookeeper-3.5.7]# bin/zkServer.sh status [atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
选举机制(面试重点)
在Zookeeper中每个服务有这些属性:
SID,服务器ID,
ZXID,事务ID,用来标识一次服务器状态的变更(比如有个写操作,那么三台机器的事务ID是同步的,一个操作结束之后广播给其他Follower机器同步)。
Epoch,每个Leader任期的代号。没有Leader时间一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。(也就是Leader的是1,然后宕机了下一个Leader就是2)
流程:
1、服务器1启动,发起一次选举。服务器1投自己一票,此时服务器1票数一票,不够半数以上,选举无法完成,服务器1状态保持为 LOOKING
2、服务器2启动,再发起一次选举,服务器1和2分别投自己一票并交换选票信息,此时服务器1发现服务器2的服务器ID比自己目前投票推举的大,更改选票为推举服务器2.此时服务器1 0票,服务器2 2票,服务器1和2保持LOOKING
3、服务器3启动,发起一次选举,然后这时候服务器3就会超过半数,当选Leader,服务器1和2更改状态为FOLLOWING,服务器3就是LEADING
4、服务器4启动,发起选举,不会更改选票了,直接变成FOLLOWING
5、服务器5启动,跟4一样当小弟。
流程:
1、当ZK集群中的一台服务器出现服务器初始化启动或者服务器运行期间无法和Leader保持连接就会进入Leader选举。
2、当一台机器进入Leader选举流程时,当前集群也可能会处于两种状态。
一种是集群中本来就有一个Leader,当机器试图去选举Leader的时候,会被告知Leader的信息,对于这台机器只用重新和Leader机器恢复连接即可。
集群中确实不存在Leader
假设ZK有5台服务器组成,SID分别为1,2,3,4,5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader,某一时刻,3和5服务器出现故障,因此开始进行Leader选举。
SID为1,2,4的机器投票情况:
EPOCH
ZXID
SID
1
8
1
1
8
2
1
7
4
Leader选举规则:EPOCH大的直接胜出,如果相同事务ID大的胜出,如果相同服务器ID大的胜出
ZK 集群启动停止脚本 1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
1 [atguigu@hadoop102 bin]$ vim zk.sh
在脚本中编写如下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # !/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 启动 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" done };; "status"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status" done };; esac
2)增加脚本执行权限
1 [atguigu@hadoop102 bin]$ chmod u+x zk.sh
3)Zookeeper 集群启动脚本
1 [atguigu@hadoop102 module]$ zk.sh start
4)Zookeeper 集群停止脚本
1 [atguigu@hadoop102 module]$ zk.sh stop
集群启动停止脚本 1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
1 [atguigu@hadoop102 bin]$ vim zk.sh
在脚本中编写如下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # !/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 启动 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" done };; "status"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status" done };; esac
2)增加脚本执行权限
1 [atguigu@hadoop102 bin]$ chmod u+x zk.sh
3)Zookeeper 集群启动脚本
1 [atguigu@hadoop102 module]$ zk.sh start
4)Zookeeper 集群停止脚本
1 [atguigu@hadoop102 module]$ zk.sh stop
客户端 命令行 操作 命令行语法 也就是说可以操作zk客户端
1) 启动客户端
1 2 [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop102:2181
2) 显示所有操作命令
1 [zk: hadoop102:2181(CONNECTED) 1] help
znode节点数据信息 1) 查看当前znode 中所包含的内容
1 [zk: hadoop102:2181(CONNECTED) 0] ls /
2) 查看当前节点详细 数据
1 [zk: hadoop102:2181(CONNECTED) 5] ls -s /
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [zookeeper]cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1 (1)czxid:创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。 (2)ctime:znode 被创建的毫秒数(从 1970 年开始) (3)mzxid:znode 最后更新的事务 zxid (4)mtime:znode 最后修改的毫秒数(从 1970 年开始) (5)pZxid:znode 最后更新的子节点 zxid (6)cversion:znode 子节点变化号,znode 子节点修改次数 (7)dataversion:znode 数据变化号 (8)aclVersion:znode 访问控制列表的变化号 (9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0。 (10)dataLength:znode 的数据长度 (11)numChildren:znode 子节点数量
节点类型
持久型:客户端和服务端断开连接后,创建的节点不删除
短暂型:客户端和服务端断开连接后,创建的节点自己删除。
顺序号节点是可选功能,在创建节点的时候参数决定。
案例
1) 分别创建2 个普通节点 (永久节点 + 不带序号)
注意:创建节点时,要赋值
2) 获得节点的值 1 [zk: localhost:2181(CONNECTED) 5] get -s /sanguo diaochan
3) 创建带序号的节点 (永久节点 + 带序号) (1)先创建一个普通的根节点/sanguo/weiguo
(2)创建带序号的节点
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
4) 创建短暂节点 (短暂节点 + 不带序号 or 带序号) (1)创建短暂的不带序号的节点
(2)创建短暂的带序号的节点
(3)在当前客户端是能查看到的
(4)退出当前客户端然后再重启客户端
1 2 [zk: localhost:2181(CONNECTED) 12] quit [atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkCli.sh
(5)再次查看根目录下短暂节点已经删除
5) 修改节点数据值
监听器原理 客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。
1) 节点的值变化监听 (1)在 hadoop104 主机上注册监听/sanguo 节点数据变化
1 [zk: localhost:2181(CONNECTED) 26] get -w /sanguo
(2)在 hadoop103 主机上修改/sanguo 节点的数据
1 [zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
(3)观察 hadoop104 主机收到数据变化的监听
1 2 3 WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
2) 节点的子节点变化监听(路径变化) (1)在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
1 2 [zk: localhost:2181(CONNECTED) 1] ls -w /sanguo [shuguo, weiguo]
(2)在 hadoop103 主机/sanguo 节点上创建子节点
1 2 [zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi" Created /sanguo/jin
(3)观察 hadoop104 主机收到子节点变化的监听
1 2 3 WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
节点删除与查看 看 1) 删除节点 1 [zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
2) 递归删除节点 1 [zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
3) 查看节点状态 1 [zk: localhost:2181(CONNECTED) 17] stat /sanguo
1 2 3 4 5 6 7 8 9 10 11 cZxid = 0x100000003 ctime = Wed Aug 29 00:03:23 CST 2018 mZxid = 0x100000011 mtime = Wed Aug 29 00:21:23 CST 2018 pZxid = 0x100000014 cversion = 9 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 1
客户端API 操作 前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。
环境搭建 1) 创建一个 工程:zookeeper
2) 添加pom 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.13</version > </dependency > <dependency > <groupId > org.apache.logging.log4j</groupId > <artifactId > log4j-core</artifactId > <version > 2.14.1</version > </dependency > <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.5.7</version > </dependency >
3) 拷贝log4j.properties 文件到项目根目录
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
1 2 3 4 5 6 7 8 log4j.rootLogger =INFO, stdout log4j.appender.stdout =org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout =org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern =%d %p [%c] - %m%n log4j.appender.logfile =org.apache.log4j.FileAppender log4j.appender.logfile.File =target/spring.log log4j.appender.logfile.layout =org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern =%d %p [%c] - %m%n
4 )创建包名com.rg.zk
5 )创建类名称zkClient
创建ZK客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class zkClient { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181" ; private int sessionTimeout = 5000 ; private ZooKeeper zkClient; @Before public void init () throws IOException { zkClient = new ZooKeeper (connectString, sessionTimeout, new Watcher () { @Override public void process (WatchedEvent watchedEvent) { System.out.println("--------------------------------" ); List <String> children = null ; try { children = zkClient.getChildren("/" , true ); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } for (String child : children) { System.out.println(child); } System.out.println("--------------------------------" ); } }); } }
创建子节点 1 2 3 4 5 6 7 8 9 10 11 12 @Test public void create () throws KeeperException, InterruptedException { String nodeCreated = zkClient.create("/atguigu" , "ss.avi" .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }
获取 子节点 并 监听 节点 1 2 3 4 5 6 7 8 9 10 11 @Test public void getChildRen () throws KeeperException, InterruptedException { List <String> children = zkClient.getChildren("/" , true ); for (String child : children) { System.out.println(child); } Thread.sleep(Long.MAX_VALUE); }
1)在 IDEA 控制台上看到如下节点: 1 2 3 zookeeper sanguo atguigu
(2)在 hadoop102 的客户端上创建再创建一个节点/atguigu1,观察 IDEA 控制台
1 [zk: localhost:2181(CONNECTED) 3] create /atguigu1 "atguigu1"
(3)在 hadoop102 的客户端上删除节点/atguigu1,观察 IDEA 控制台
1 [zk: localhost:2181(CONNECTED) 4] delete /atguigu1
判断 Znode 是否存在 1 2 3 4 5 6 @Test public void exist () throws KeeperException, InterruptedException { Stat stat = zkClient.exists("/atguigu" , false ); System.out.println(stat==null ? "not exist " :"exist" ); }
客户端向服务端写数据流程
服务器动态上下限监听案例 需求 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析
注:服务器上线的过程就是Zookeeper集群创建节点的过程.
服务器和客户端相对于Zookeeper都是 “客户端”,只不过服务器是创建节点的操作,客户端是监听节点的操作(一旦那个节点不存在了,下次就不去访问这个节点了)
具体实现 (1)先在集群上创建/servers 节点
1 2 [zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers
(2)在 Idea 中创建包名:com.rg.case1
(3)服务器端向 Zookeeper 注册代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class DistributeServer { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181" ; private int sessionTimeout = 2000 ; private ZooKeeper zk; public static void main (String[] args) throws IOException, KeeperException, InterruptedException { DistributeServer server = new DistributeServer (); server.getConnect(); server.regist(args[0 ]); server.business(); } private void business () throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } private void regist (String hostname) throws KeeperException, InterruptedException { String create = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+"is online" ); } private void getConnect () throws IOException { zk = new ZooKeeper (connectString, sessionTimeout, new Watcher () { @Override public void process (WatchedEvent event) { } }); } }
(4)客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class DistributeClient { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181" ; private int sessionTimeout = 2000 ; private ZooKeeper zk; public static void main (String[] args) throws IOException, InterruptedException, KeeperException { DistributeClient client = new DistributeClient (); client.getConnect(); client.getServerList(); client.business(); } private void getServerList () throws KeeperException, InterruptedException { List <String> children = zk.getChildren("/servers" , true ); List <String> servers = new ArrayList <>(); for (String child : children) { byte [] data = zk.getData("/servers/" + child, false , null ); servers.add(new String (data)); } System.out.println(servers); } private void business () throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } private void getConnect () throws IOException { zk = new ZooKeeper (connectString, sessionTimeout, new Watcher () { @Override public void process (WatchedEvent event) { try { getServerList(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
测试 1)在 Linux 命令行上操作增加减少服务器 (1)启动 DistributeClient 客户端
(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
1 2 3 4 [zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102" [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"
(3)观察 Idea 控制台变化
[hadoop102, hadoop103]
(4)执行删除操作
1 2 [zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000
(5)观察 Idea 控制台变化
[hadoop103]
2)在 Idea 上操作增加减少服务器 (1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)
(2)启动 DistributeServer 服务
①点击 Edit Configurations…
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
③ 回 到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run“DistributeServer.main()”
④观察 DistributeServer 控制台,提示 hadoop102 is working
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
ZK分布式锁案例 什么叫做分布式锁呢?
比如说”进程 1”在使用该资源的时候,会先去获得锁,”进程 1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,”进程 1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
原生ZK实现 1)分布式锁实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 public class DistributedLock { private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181" ; private final int sessionTimeout = 2000 ; private final ZooKeeper zk; private CountDownLatch connectLatch = new CountDownLatch (1 ); private CountDownLatch waitLatch = new CountDownLatch (1 ); private String currentMode; private String waitPath; public DistributedLock () throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper (connectString, sessionTimeout, new Watcher () { @Override public void process (WatchedEvent event) { if (event.getState()==Event.KeeperState.SyncConnected){ connectLatch.countDown(); } if (event.getType()==Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){ waitLatch.countDown(); } } }); connectLatch.await(); Stat stat = zk.exists("/locks" , false ); if (stat==null ){ zk.create("/locks" , "locks" .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } public void zklock () throws KeeperException, InterruptedException { currentMode = zk.create("/locks/" + "seq-" , null , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List <String> children = zk.getChildren("/locks" , false ); if (children.size()==1 ){ return ; }else { Collections.sort(children); String thisNode = currentMode.substring("/locks/" .length()); int index = children.indexOf(thisNode); if (index==-1 ){ System.out.println("数据异常..." ); }else if (index == 0 ){ return ; }else { waitPath = "/locks/" + children.get(index - 1 ); zk.getData(waitPath,true ,null ); waitLatch.await(); return ; } } } public void unZkLock () throws KeeperException, InterruptedException { zk.delete(this .currentMode,-1 ); } }
2)分布式锁 测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class DistributedLockTest { public static void main (String[] args) throws InterruptedException, IOException, KeeperException { final DistributedLock lock1 = new DistributedLock (); final DistributedLock lock2 = new DistributedLock (); new Thread (new Runnable () { @Override public void run () { try { lock1.zklock(); System.out.println("线程1启动, 获取到锁" ); System.out.println("线程1使用资源中..." ); Thread.sleep(5 *1000 ); lock1.unZkLock(); System.out.println("线程1使用资源完毕,释放锁" ); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread (new Runnable () { @Override public void run () { try { lock2.zklock(); System.out.println("线程2启动, 获取到锁" ); System.out.println("线程2使用资源中..." ); Thread.sleep(5 *1000 ); lock2.unZkLock(); System.out.println("线程2使用资源完毕,释放锁" ); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
Curator框架实现分布式锁案例 1) 原生的 Java API 开发存在的问题 (1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2) Curator 是一个专门解决分布式锁的框架,解决了原生Java API 开发分布式遇到的问题。 详情请查看官方文档:https://curator.apache.org/index.html
3 )Curator 案例实操 (1)添加依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-framework</artifactId > <version > 4.3.0</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 4.3.0</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-client</artifactId > <version > 4.3.0</version > </dependency >
2)代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public class CuratorLockTest { public static void main (String[] args) { InterProcessMutex lock1 = new InterProcessMutex (getCuratorFramework(),"/locks" ); InterProcessMutex lock2 = new InterProcessMutex (getCuratorFramework(),"/locks" ); new Thread (new Runnable () { @Override public void run () { try { lock1.acquire(); System.out.println("线程1获取到锁" ); lock1.acquire(); System.out.println("线程1再次获取到锁" ); Thread.sleep(5 *1000 ); lock1.release(); System.out.println("线程1释放锁.." ); lock1.release(); System.out.println("线程1再次释放锁..." ); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread (new Runnable () { @Override public void run () { try { lock2.acquire(); System.out.println("线程2获取到锁" ); lock2.acquire(); System.out.println("线程2再次获取到锁" ); Thread.sleep(5 *1000 ); lock2.release(); System.out.println("线程2释放锁.." ); lock2.release(); System.out.println("线程2再次释放锁..." ); } catch (Exception e) { e.printStackTrace(); } } }).start(); } private static CuratorFramework getCuratorFramework () { ExponentialBackoffRetry policy = new ExponentialBackoffRetry (3000 , 3 ); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181" ) .connectionTimeoutMs(2000 ) .sessionTimeoutMs(2000 ) .retryPolicy(policy).build(); client.start(); System.out.println("Zookeeper启动成功.." ); return client; } }
面试题 选举机制 半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:
投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:
①EPOCH 大的直接胜出
②EPOCH 相同,事务 id 大的胜出
③事务 id 相同,服务器 id 大的胜出
生产集群至少安装多少 zk 合适? 安装奇数台。
生产经验:
⚫ 10 台服务器:3 台 zk;
⚫ 20 台服务器:5 台 zk;
⚫ 100 台服务器:11 台 zk;
⚫ 200 台服务器:11 台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
常用命令 ls、get、create、delete