下载 https://zookeeper.apache.org/ https://archive.apache.org/dist/zookeeper/ 我们选择比较稳定的版本 3.5.7
安装 上传到 linux 上 解压缩[xiamu@hadoop202 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
修改名称[xiamu@hadoop202 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7/
配置修改[xiamu@hadoop202 module]$ cd zookeeper-3.5.7/
[xiamu@hadoop202 zookeeper-3.5.7]$ mkdir zkData
[xiamu@hadoop202 zookeeper-3.5.7]$ cd conf/
[xiamu@hadoop202 zookeeper-3.5.7]$ mv zoo_sample.cfg zoo.cfg
[xiamu@hadoop202 conf]$ vim zoo.cfg
修改 dataDir 的路径为刚刚创建的 zkData 的路径
1 2 dataDir=/opt/module/zookeeper-3.5.7/zkData
本地启动和停止 [xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkServer.sh start
启动 zookeeper
[xiamu@hadoop202 zookeeper-3.5.7]$ jps
查看进程是否启动 4720 Jps 4686 QuorumPeerMain
[xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkServer.sh status
查看状态 ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone
[xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkCli.sh
启动客户端[zk: localhost:2181(CONNECTED) 0] quit
退出客户端
[xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkServer.sh stop
停止 ZooKeeper ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Stopping zookeeper … STOPPED
集群安装 在创建好的 zkData 目录下创建一个文件 myid[xiamu@hadoop202 zkData]$ vim myid
2[xiamu@hadoop203 zkData]$ vim myid
3[xiamu@hadoop204 zkData]$ vim myid
4 在 hadoop202(102)中设置 2 在 hadoop203(103)中设置 3 在 hadoop204(104)中设置 4 分发脚本命令[xiamu@hadoop202 module]$ xsync zookeeper-3.5.7/
配置[xiamu@hadoop202 conf]$ vim zoo.cfg
1 2 3 4 server.2=hadoop202:2888:3888server.3=hadoop203:2888:3888 server.4=hadoop204:2888:3888
配置完成之后分发[xiamu@hadoop202 conf]$ xsync zoo.cfg
集群启动 [xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkServer.sh start
[xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkServer.sh status
需要对每一个集群开启 zookeeper[xiamu@hadoop203 zookeeper-3.5.7]$ bin/zkServer.sh start
[xiamu@hadoop203 zookeeper-3.5.7]$ bin/zkServer.sh status
Mode: leader[xiamu@hadoop204 zookeeper-3.5.7]$ bin/zkServer.sh start
[xiamu@hadoop204 zookeeper-3.5.7]$ bin/zkServer.sh status
Mode: follower
第一次选举机制
非第一次选举机制
编写启动停止脚本 在/home/xiamu/bin 目录下创建 zk.sh 脚本[xiamu@hadoop202 bin]$ vim zk.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #!/bin/bash case $1 in "start" ){ for i in hadoop202 hadoop203 hadoop204 do echo ---------- ZooKeeper $i 启动 ---------- ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start" done } ;;"stop" ){ for i in hadoop202 hadoop203 hadoop204 do echo ---------- ZooKeeper $i 停止 ---------- ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" done } ;;"status" ){ for i in hadoop202 hadoop203 hadoop204 do echo ---------- ZooKeeper $i 状态 ---------- ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status" done } ;;esac
给予可执行权限[xiamu@hadoop202 bin]$ chmod 777 zk.sh
分发脚本[xiamu@hadoop202 bin]$ xsync zk.sh
使用脚本启动/查看状态/关闭 ZooKeeper[xiamu@hadoop202 bin]$ zk.sh start
[xiamu@hadoop202 bin]$ zk.sh status
[xiamu@hadoop202 bin]$ zk.sh stop
命令行-节点信息 命令行语法
命令基本语法
功能描述
help
显示所有操作命令
ls path
使用 ls 命令来查看当前 znode 的子节点[可监听]
-w 监听子节点变化 -s 附加次级信息 |
| create | 普通创建 -s 含有序列
-e 临时(重启或者超时消失) | | — | — | | get path | 获得节点的值[可监听] -w 监听节点内容变化 -s 附加次级信息 | | set | 设置节点的具体值 | | stat | 查看节点状态 | | delete | 删除节点 | | deleteall | 递归删除节点 |
[xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkCli.sh
客户端直接连接 [zk: localhost:2181(CONNECTED) 0] 此时 zk 后面的主机名是 localhost, 可以使用-server 参数修改[xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop202:2181
[zk: hadoop202:2181(CONNECTED) 0] ls /
查看当前 znode 中所包含的内容[zk: hadoop202:2181(CONNECTED) 1] help
显示所有操作命令[zk: hadoop202:2181(CONNECTED) 4] ls -s /
查看当前节点详细数据
1 2 3 4 5 6 7 8 9 10 11 [zookeeper]cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1
(1)czxid:创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。 (2)ctime:znode 被创建的毫秒数(从 1970 年开始) (3)mzxid:znode 最后更新的事务 zxid (4)mtime:znode 最后修改的毫秒数(从 1970 年开始) (5)pZxid:znode 最后更新的子节点 zxid (6)cversion:znode 子节点变化号,znode 子节点修改次数 (7)dataversion:znode 数据变化号 (8)aclVersion:znode 访问控制列表的变化号 (9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。 (10)dataLength:znode 的数据长度 (11)numChildren:znode 子节点数量
命令行-节点类型(持久/短暂/有序号/无序号) 创建永久节点 创建普通节点(默认创建的是不带序号的永久节点)
1 2 3 4 5 6 7 8 9 10 11 12 13 [zk: hadoop202:2181(CONNECTED) 0] ls / [zookeeper] [zk: hadoop202:2181(CONNECTED) 1] create /sanguo "diaochan" Created /sanguo [zk: hadoop202:2181(CONNECTED) 2] ls / [sanguo, zookeeper] [zk: hadoop202:2181(CONNECTED) 3] [zk: hadoop202:2181(CONNECTED) 5] create /sanguo/shuguo "liubei" Created /sanguo/shuguo [zk: hadoop202:2181(CONNECTED) 7] ls /sanguo [shuguo]
获取节点中的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [zk: hadoop202:2181(CONNECTED) 8] get -s /sanguo diaochan cZxid = 0x30000000a ctime = Wed Feb 01 20:12:01 CST 2023 mZxid = 0x30000000a mtime = Wed Feb 01 20:12:01 CST 2023 pZxid = 0x30000000b cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 1 [zk: hadoop202:2181(CONNECTED) 9] get -s /sanguo/shuguo liubei cZxid = 0x30000000b ctime = Wed Feb 01 20:14:02 CST 2023 mZxid = 0x30000000b mtime = Wed Feb 01 20:14:02 CST 2023 pZxid = 0x30000000b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0
创建带序号的节点
1 2 3 4 5 6 7 8 9 10 [zk: hadoop202:2181(CONNECTED) 12] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000000 [zk: hadoop202:2181(CONNECTED) 13] ls / [sanguo, zookeeper] [zk: hadoop202:2181(CONNECTED) 14] ls /sanguo/weiguo [zhangliao0000000000] [zk: hadoop202:2181(CONNECTED) 15]
带序号的永久节点第二次创建会自动加上序号, 而不带序号的永久节点再次创建会报错
1 2 3 4 5 6 7 8 [zk: hadoop202:2181(CONNECTED) 15] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000001 [zk: hadoop202:2181(CONNECTED) 10] create /sanguo/weiguo "caocao" Created /sanguo/weiguo [zk: hadoop202:2181(CONNECTED) 16] create /sanguo/weiguo "caocao" Node already exists: /sanguo/weiguo
使用quit
退出客户端, 再次连接, 发现之前创建的节点仍然存在
1 2 3 4 5 6 7 8 [zk: hadoop202:2181(CONNECTED) 17] quit [xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop202:2181 [zk: hadoop202:2181(CONNECTED) 0] ls /sanguo [shuguo, weiguo] [zk: hadoop202:2181(CONNECTED) 1] ls /sanguo/weiguo [zhangliao0000000000, zhangliao0000000001]
创建临时节点 创建临时节点只需要加上参数-e 即可
1 2 3 4 5 6 7 [zk: hadoop202:2181(CONNECTED) 2] create -e /sanguo/wuguo "zhouyu" Created /sanguo/wuguo 查看吴国, 吴国创建成功 [zk: hadoop202:2181(CONNECTED) 3] ls /sanguo [shuguo, weiguo, wuguo]
创建一个带序号的临时节点 只需要加上参数-s 即可
1 2 3 4 5 6 7 [zk: hadoop202:2181(CONNECTED) 4] create -e -s /sanguo/wuguo "zhouyu" Created /sanguo/wuguo0000000003 查看刚刚创建的节点 [zk: hadoop202:2181(CONNECTED) 5] ls /sanguo [shuguo, weiguo, wuguo, wuguo0000000003]
断开quit
连接之后, 吴国(wuguo)就没有了, 因为吴国创建的时候有-e, -e 表示是临时节点, 临时节点断开连接之后就删除了
1 2 3 4 5 6 [zk: hadoop202:2181(CONNECTED) 6] quit [xiamu@hadoop202 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop202:2181 [zk: hadoop202:2181(CONNECTED) 0] ls /sanguo [shuguo, weiguo]
**总结: ** -s 表示带序号 -e 表示临时节点
修改节点的值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 [zk: hadoop202:2181(CONNECTED) 2] get -s /sanguo/weiguo caocao cZxid = 0x30000000c ctime = Wed Feb 01 20:15:52 CST 2023 mZxid = 0x30000000c mtime = Wed Feb 01 20:15:52 CST 2023 pZxid = 0x30000000e cversion = 2 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 2 [zk: hadoop202:2181(CONNECTED) 4] set /sanguo/weiguo "simayi" [zk: hadoop202:2181(CONNECTED) 5] get -s /sanguo/weiguo simayi cZxid = 0x30000000c ctime = Wed Feb 01 20:15:52 CST 2023 mZxid = 0x300000016 mtime = Wed Feb 01 20:29:31 CST 2023 pZxid = 0x30000000e cversion = 2 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 2 [zk: hadoop202:2181(CONNECTED) 8] get /sanguo/weiguo simayi
监听器及节点删除 监听节点的值 在 hadoop204 中, get -s /sanguo
查看 sanguo 的值, 并且使用get -w /sanguo
监控 sanguo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [zk: hadoop204(CONNECTED) 0] get -s /sanguo diaochan cZxid = 0x30000000a ctime = Wed Feb 01 20:12:01 CST 2023 mZxid = 0x30000000a mtime = Wed Feb 01 20:12:01 CST 2023 pZxid = 0x300000014 cversion = 6 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 2 [zk: hadoop204(CONNECTED) 1] get -w /sanguo diaochan
在 hadoop203 中修改 sanguo 的值, 在 hadoop204 的选项卡会出现一个感叹号 只有 hadoop203 第一次修改的时候 hadoop204 会出现一个感叹号, hadoop203 修改多次, hadoop204 只会出现一个感叹号, 因为 hadoop204 只监听一次 如果想再次监听, 需要再次在 hadoop204 注册
1 2 3 [zk: hadoop203(CONNECTED) 3] set /sanguo "xisi" [zk: hadoop203(CONNECTED) 4] set /sanguo "yangfeiyan"
监听节点的路径变化 在 hadoop204 中使用ls -w /sanguo
监听 sanguo 的路径变化 在 hadoop203 中添加一个节点, hadoop204 中出现叹号, 同样也是注册一次生效一次, 创建多个节点也就只会出现一个叹号, 除非再次开启监听
删除节点 删除节点和递归删除节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 [zk: hadoop204(CONNECTED) 7] ls / [sanguo, zookeeper] [zk: hadoop204(CONNECTED) 8] ls /sanguo [jin, jin1, shuguo, weiguo] [zk: hadoop204(CONNECTED) 9] delete /sanguo/jin [zk: hadoop204(CONNECTED) 10] ls /sanguo [jin1, shuguo, weiguo] [zk: hadoop204(CONNECTED) 12] delete /sanguo Node not empty: /sanguo [zk: hadoop204(CONNECTED) 13] deleteall /sanguo [zk: hadoop204(CONNECTED) 14] ls /sanguo Node does not exist: /sanguo [zk: hadoop204(CONNECTED) 15] ls / [zookeeper]
查看节点状态 查看节点状态, 但是不查看值
1 2 3 4 5 6 7 8 9 10 11 12 13 [zk: hadoop204(CONNECTED) 21] stat /sanguo cZxid = 0x30000002d ctime = Wed Feb 01 21:07:20 CST 2023 mZxid = 0x30000002d mtime = Wed Feb 01 21:07:20 CST 2023 pZxid = 0x30000002d cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 9 numChildren = 0
客户端-创建节点 创建 maven 项目zookeeper
在 pom.xml 导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> </dependencies>
在 resource 添加 log4j.properties 配置文件
1 2 3 4 5 6 7 8 log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
创建包名com.atguigu.zk
创建类名称zkClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class zkClient { // 注意, 逗号左右不能有空格 private String connectString = "hadoop202:2181,hadoop203:2181,hadoop204:2181" ; private int sessionTimeout = 2000; private ZooKeeper zooKeeper; // 连接zookeeper @Before public void init() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher () { @Override public void process(WatchedEvent watchedEvent) { } }); } @Test public void create() throws InterruptedException, KeeperException { // 创建一个节点 String nodeCreated = zooKeeper.create("/atguigu" , "ss.avi" .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } }
客户端-监听节点的变化 getChildren 中第二个参数为 true 会调用 init 中 new Watch(){…}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Before public void init() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher () { @Override public void process(WatchedEvent watchedEvent) { List<String> children = null; try { children = zooKeeper.getChildren("/" , true ); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------" ); for (String child : children) { System.out.println(child); } System.out.println("------------------" ); } }); } @Test public void getChildren() throws InterruptedException, KeeperException { List<String> children = zooKeeper.getChildren("/" , true ); Thread.sleep (Long.MAX_VALUE); }
监听的同时, 在 linux 中添加/删除节点, idea 能监听得到
客户端-判断节点是否存在 1 2 3 4 5 6 // 判断Znode是否存在 @Test public void exist() throws InterruptedException, KeeperException { Stat stat = zooKeeper.exists("/atguigu" , false ); System.out.println(stat == null ? "not exist" : "exist" ); }
写数据原理 ack 表示应答 write 表示写操作
服务器动态上下线
操作上线 操作下线
DistributeServer 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.atguigu.case1; import org.apache.zookeeper.*; import java.io.IOException; public class DistributeServer { private String connectString = "hadoop202:2181,hadoop203:2181,hadoop204:2181" ; private int sessionTimeout = 2000; private ZooKeeper zooKeeper; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributeServer server = new DistributeServer(); // 获取zk连接 server.getConnect(); // 注册服务器到zk集群 server.register(args[0]); // 启动业务逻辑(睡觉) server.bussiness(); } private void bussiness() throws InterruptedException { Thread.sleep (Long.MAX_VALUE); } private void register(String hostname) throws InterruptedException, KeeperException { String create = zooKeeper.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online" ); } private void getConnect() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher () { @Override public void process(WatchedEvent event) { } }); } }
DistributeClient 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package com.atguigu.case1; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class DistributeClient { private String connectString = "hadoop202:2181,hadoop203:2181,hadoop204:2181" ; private int sessionTimeout = 2000; private ZooKeeper zooKeeper; public static void main(String[] args) throws InterruptedException, IOException, KeeperException { DistributeClient client = new DistributeClient(); // 获取zk连接 client.getConnect(); // 监听/servers下面子节点的增加和删除 client.getServerList(); // 业务逻辑(睡觉) client.bussiness(); } private void bussiness() throws InterruptedException { Thread.sleep (Long.MAX_VALUE); } private void getServerList() throws InterruptedException, KeeperException { List<String> children = zooKeeper.getChildren("/servers" , true ); ArrayList<String> servers = new ArrayList<>(); for (String child : children) { byte[] data = zooKeeper.getData("/servers/" + child, false , null); servers.add(new String(data)); } System.out.println(servers); } private void getConnect() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher () { @Override public void process(WatchedEvent event) { try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } }
tips: 运行 main 的时候需要传入参数, 在 idea 中传入参数的方式如下 在 Program arguments 中填写传入的参数
分布式锁 什么叫做分布式锁呢? 比如说”进程 1”在使用该资源的时候,会先去获得锁,”进程 1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,”进程 1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。 DistributedLock 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 package com.atguigu.case2; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class DistributedLock { private final String connectString = "hadoop202:2181,hadoop203:2181,hadoop204:2181" ; private final int sessionTimeout = 2000; private final ZooKeeper zooKeeper; private CountDownLatch countDownLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String currentMode; private String waitPath; public DistributedLock() throws IOException, InterruptedException, KeeperException { // 获取连接 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher () { @Override public void process(WatchedEvent watchedEvent) { // connectLatch 如果连接上zk 可以释放 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } // waitLatch 需要释放 if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); // 等待zk正常连接后, 往下走程序 countDownLatch.await(); // 判断根节点/locks是否存在 Stat stat = zooKeeper.exists("/locks" , false ); if (stat == null) { // 创建一下根节点 zooKeeper.create("/locks" , "locks" .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } // 对zk加锁 public void zkLock () { // 创建对应的临时带序号节点 try { currentMode = zooKeeper.create("/locks/" + "seq-" , null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 判断创建的节点是否是最小序号节点, 如果是获取到锁, 如果不是, 监听序号前一个节点 List<String> children = zooKeeper.getChildren("/locks" , false ); // 如果children 只有一个值, 那就直接获取锁, 如果有多个节点, 需要判断, 谁最小 if (children.size() == 1) { return ; } else { Collections.sort (children); // 获取节点名称 seq-00000000 String thisNode = currentMode.substring("/locks/" .length()); // 通过seq-00000000获取该节点在children结合的位置 int index = children.indexOf(thisNode); // 判断 if (index == -1) { System.out.println("数据异常" ); } else if (index == 0) { // 就一个节点, 可以获取锁了 return ; } else { // 需要监听他前一个节点变化 waitPath = "/locks/" + children.get(index - 1); zooKeeper.getData(waitPath, true , null); // 等待监听 waitLatch.await(); return ; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } // 解锁 public void unZkLock () { // 删除节点 try { zooKeeper.delete(currentMode, -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }
DistributedLockTest 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.atguigu.case2; import org.apache.zookeeper.KeeperException; import java.io.IOException; public class DistributedLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { final DistributedLock lock1 = new DistributedLock(); final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable () { @Override public void run () { try { lock1.zkLock(); System.out.println("线程1启动 获取到锁" ); Thread.sleep (5 * 1000); lock1.unZkLock(); System.out.println("线程1 释放锁" ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable () { @Override public void run () { try { lock2.zkLock(); System.out.println("线程2启动 获取到锁" ); Thread.sleep (5 * 1000); lock2.unZkLock(); System.out.println("线程2 释放锁" ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
运行结果如下: 线程 2 先获取锁, 当锁释放掉之后, 才会给其他线程上锁
Curator 框架实现分布式锁案例 pom.xml 导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
CuratorLockTest 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 package com.atguigu.case3; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLockTest { public static void main(String[] args) { // 创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks" ); // 创建分布式锁2 InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks" ); new Thread(new Runnable () { @Override public void run () { try { lock1.acquire(); System.out.println("线程1 获取到锁" ); lock1.acquire(); System.out.println("线程1 再次获取到锁" ); Thread.sleep (5 * 1000); lock1.release(); System.out.println("线程1 释放锁" ); lock1.release(); System.out.println("线程1 再次释放锁" ); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable () { @Override public void run () { try { lock2.acquire(); System.out.println("线程2 获取到锁" ); lock2.acquire(); System.out.println("线程2 再次获取到锁" ); Thread.sleep (5 * 1000); lock2.release(); System.out.println("线程2 释放锁" ); lock2.release(); System.out.println("线程2 再次释放锁" ); } catch (Exception e) { e.printStackTrace(); } } }).start(); } private static CuratorFramework getCuratorFramework () { ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("hadoop202:2181,hadoop203:2181,hadoop204:2181" ) .connectionTimeoutMs(2000) .sessionTimeoutMs(2000) .retryPolicy(policy) .build(); // 启动客户端 client.start(); System.out.println("zookeeper 启动成功" ); return client; } }
说明同一个线程中锁是可以多次获取的
企业面试真题(面试重点) 选举机制 半数机制,超过半数的投票通过,即通过。 (1) 第一次启动选举规则: 投票过半数时,服务器 id 大的胜出 (2) 第二次启动选举规则: ①EPOCH 大的直接胜出 ②EPOCH 相同,事务 id 大的胜出 ③ 事务 id 相同,服务器 id 大的胜出
生产集群安装多少 zk 合适? 安装奇数台。生产经验: l 10 台服务器:3 台 zk; l 20 台服务器:5 台 zk; l 100 台服务器:11 台 zk; l 200 台服务器:11 台 zk 服务器台数多:好处,提高可靠性;坏处:提高通信延时
常用命令 ls、get、create、delete