zookeeper简介 zookeeper是什么 zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂的容易出错的分布式一致服务封装起来,构成一个高效可靠的集合,并提供一些简单的接口给用户使用。
zookeeper主要用来解决分布式集群中应用系统的一致性问题,例如如何避免同时操作同一数据造成脏读的问题等。
zookeeper本质生是一个分布式的小文件存储系统。提供类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。
zookeeper提供给客户端监控存储在zookeeper内部数据的功能,从而可以基于数据的集群管理。
zookeeper的架构组成
zookeeper没有沿⽤传递的Master/Slave概念,⽽是引⼊了Leader、Follower、Observer三种⻆⾊。
leader :
zookeeper集群工作的核心角色
集群内部各个服务器的调度者
事务请求(写操作)的唯一调度者和处理者,保证集群事务处理的顺序;对于create、delete等有写操作的请求,则需要同意转发给leader处理,leader需要决定编号、执行操作,这个过程称为一个事务。
follower :
处理客户端非事务请求(读操作)
转发事务给leader
参与集群leader选举投票
针对访问量大的zookeeper集群,可以增加观察者角色
observer :
观察者角色,观察 Zookeeper 集群的最新状态变化并将这些状态同步过来,其对于非事务请求可以进行独立处理,对于事务请求,则会转发给 Leader服务器进行处理。
不会参与任何形式的投票只提供非事务服务,通常用于在不影响集群事务处理能力的前提下提升集群的非事务处理能力。增加了集群增加并发的读请求。
zookeeper特点
Leader负责进行投票的发起和决议,更新系统状态
Follower用于接收客户请求并向客户端返回结果,在选举Leader过程中参与投票
集群中只要有半数以上节点存活,Zookeeper集群就能正常服务
全局数据一致:每个server保存一份相同的数据副本,Client无论连接到哪个server,数据都是一 致的
数据更新原子性,一次数据更新要么成功,要么失败
zookeeper环境搭建 zookeeper搭建方式 zookeeper安装方式有三种:单机模式、伪集群模式、集群模式
单机模式:Zookeeper只运⾏在⼀台服务器上,适合测试环境
伪集群模式:就是在⼀台服务器上运⾏多个Zookeeper 实例
集群模式:Zookeeper运⾏于⼀个集群上,适合⽣产环境
zookeeper集群搭建 下载稳定版本的zookeeper http://zookeeper.apache.org/releases.html
单机模式 将安装包上传到一台linux服务器上,然后解压:
1 tar -zxvf zookeeper-3.4.14.tar.gz
进⼊ zookeeper-3.4.14 ⽬录,创建 data ⽂件夹
1 2 cd zookeeper-3.4.14mkdir data
修改配置⽂件名称
1 2 cd confmv zoo_sample.cfg zoo.cfg
修改zoo.cfg中的data属性
1 2 3 vim zoo.cfg dataDir=/root/zookeeper-3.4.14/data
zookeeper服务启动
伪集群模式 将不同的zookeeper实例运⾏在同⼀台机器,⽤端⼝进⾏区分。
注意事项:
⼀台机器上部署了3个server,也就是说单台机器及上运⾏多个Zookeeper实例。这种情况下,必须 证每个配置⽂档的各个端⼝号不能冲突,除clientPort不同之外,dataDir也不同。另外,还要在dataDir所对应的⽬录中创建myid⽂件来指定对应的zookeeper服务器实例。
解压压缩包 创建⽬录zkcluster :
1 2 mkdir zkclustertar -zxvf zookeeper-3.4.14.tar.gz -C /zkcluster
重新命名文件夹 :
1 2 3 4 mv zookeeper-3.4.14 zookeeper01cp -r zookeeper01/ zookeeper02cp -r zookeeper01/ zookeeper03
分别在zookeeper01、zookeeper02、zookeeper03⽬录下创建data及logs⽬录
修改配置⽂件名称:
1 2 cd confmv zoo_sample.cfg zoo.cfg
配置每⼀个zookeeper 的dataDir(zoo.cfg) clientPort 分别为2181 2182 2183:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 clientPort=2181 dataDir=/zkcluster/zookeeper01/data dataLogDir=/zkcluster/zookeeper01/data/logs clientPort=2182 dataDir=/zkcluster/zookeeper02/data dataLogDir=/zkcluster/zookeeper02/data/logs clientPort=2183 dataDir=/zkcluster/zookeeper03/data dataLogDir=/zkcluster/zookeeper03/data/logs
配置集群:
1 2 3 4 server.1=10.211.55.4:2881:3881 server.2=10.211.55.4:2882:3882 server.3=10.211.55.4:2883:3883
启动集群 依次启动三个zk实例
集群模式 修改配置文件创建data与log目录 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 mkdir -p /opt/servers/zookeeper-3.4.14/datamkdir -p /opt/servers/zookeeper-3.4.14/data/logscd /opt/servers/zookeeper-3.4.14/confmv zoo_sample.cfg zoo.cfgvim zoo.cfg dataDir=/opt/servers/zookeeper-3.4.14/data dataLogDir=/opt/servers/zookeeper-3.4.14/data/logs server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888 autopurge.purgeInterval=1
添加myid配置 :
在zookeeper的 data 目录下创建一个 myid 文件,内容为1,这个文件就是记录每个服务器的ID
1 2 cd /opt/servers/zookeeper-3.4.14/dataecho 1 > myid
安装包分发并修改myid的值
依次启动三个zk实例
集群启动停止脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 vim zk.sh echo "start zookeeper server..." if (($# ==0 ));then echo "no params" ;exit ;fi hosts="node1 node2 node3" for host in $hosts do ssh $host "source /etc/profile; /opt/servers/zookeeper-3.4.14/bin/zkServer.sh $1 " done
zookeeper系统模型 zookeeper数据模型ZNode
在zookeeper中,数据模型被保存到一个个数据节点上,这些节点被称为Znode。Znode是zookeeper中最小树单位。,在 ZNode 下面又可以再挂 ZNode,这样一层层下去就形成了一个层次化命名空间 ZNode 树,我们称为 ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。见下图示例:
ZNode类型 Zookeeper 节点类型可以分为三大类:
持久性节点(Persistent)
临时性节点(Ephemeral)
顺序性节点(Sequential)
在开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点则会有不同的生命周期
持久节点 :是Zookeeper中最常见的一种节点类型,所谓持久节点,就是指节点被创建后会一直存在服务器,直到删除操作主动清除。
持久顺序节点 :就是有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。
临时节点 :就是会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。
临时顺序节点 :就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀。
事务ID :
在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也称之为事务操作或更新 操作,一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用 ZXID 来表示,通常是一个 64 位的数字。每一个 ZXID 对应一次 更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序
zk中的事务指的是对zk服务器状态改变的操作(create,update data,更新字节点);zk对这些事务操作都 会编号,这个编号是自增长的被称为ZXID。
ZNode的状态信息 使用bin/zkCli.sh 连接到zk集群
整个 ZNode 节点内容包括两部分:节点数据内容和节点状态信息。数据内容是空,其他的属于状态信息:
cZxid 就是 Create ZXID,表示节点被创建时的事务ID。
ctime 就是 Create Time,表示节点创建时间。
mZxid 就是 Modified ZXID,表示节点最后一次被修改时的事务ID。
mtime 就是 Modified Time,表示节点最后一次被修改的时间。
pZxid 表示该节点的子节点列表最后一次被修改时的事务 ID。只有子节点列表变更才会更新 pZxid,子节点内容变更不会更新。
cversion 表示子节点的版本号。
dataVersion 表示内容版本号。
aclVersion 标识acl版本
ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0
dataLength 表示数据长度。
numChildren 表示直系子节点数。
Watcher 监听机制 Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能
一个典型的发布/订阅模型系统定义了一种 一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。
在 ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么Zk就会向指定客户端发送一个事件通知来实现分布式的通知功能。
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。具体工作流程:
客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中
当Zookeeper服务器触发Watcher事件后,会向客户端发送通知
客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑
zookeeper基本使用 zookeeper的命令行操作 通过zkClient进入zookeeper客户端命令行:
1 2 ./zkcli.sh ./zkCli.sh -server ip:port(2181)
连接成功之后,系统会输出Zookeeper的相关环境及配置信息等信息。输入help之后,屏幕会输出可用 的Zookeeper命令,如下图所示
创建节点 使用create命令,可以创建一个Zookeeper节点:
1 2 create [-s][-e] path data
创建顺序节点:
1 2 [zk: localhost:2181(CONNECTED) 4] create -s /zk-test 123 Created /zk-test0000000000
执行完后,就在根节点下创建了一个叫做/zk-test的节点,该节点内容就是123,同时可以看到创建 的zk-test节点后面添加了一串数字以示区别
创建临时节点:
1 2 3 4 [zk: localhost:2181(CONNECTED) 1] create -e /zk-temp 123 Created /zk-temp [zk: localhost:2181(CONNECTED) 2] ls / [zk-test0000000000, zookeeper, zk-temp]
临时节点在客户端会话结束后会自动删除。
创建永久节点:
1 2 3 4 [zk: localhost:2181(CONNECTED) 1] create /zk-permanent 123 Created /zk-permanent [zk: localhost:2181(CONNECTED) 2] ls / [zk-permanent, zk-test0000000000, zookeeper]
可以看到永久节点不同于顺序节点,不会自动在后面添加一串数字
读取节点
ls命令可以列出Zookeeper指定节点下的所有子节点,但只能查看指定节点下的第一级的所有子节点
get命令可以获取Zookeeper指定节点的数据内容和属性信息
1 2 [zk: localhost:2181(CONNECTED) 2] ls / [zk-permanent, zk-test0000000000, zookeeper]
使用如下命令:get /zk-permanent,获取/zk-permanent的数据内容和属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 [zk: localhost:2181(CONNECTED) 1] get /zk-persist 123 cZxid = 0x300000006 ctime = Thu May 20 01:33:28 UTC 2021 mZxid = 0x300000006 mtime = Thu May 20 01:33:28 UTC 2021 pZxid = 0x300000006 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0
第一行是节点/zk-permanent 的数据内容,其他几行则是 建该节点的事务ID(cZxid)、最后一次更新该节点的事务ID(mZxid)和最后一次更新该节点的时间(mtime)等属性信息
更新命令 使用set命令
1 2 3 4 5 6 7 8 9 10 11 12 [zk: localhost:2181(CONNECTED) 2] set /zk-persist 456 cZxid = 0x300000006 ctime = Thu May 20 01:33:28 UTC 2021 mZxid = 0x300000009 mtime = Thu May 20 01:38:20 UTC 2021 pZxid = 0x300000006 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0
现在dataVersion已经变为1了,表示进行了更新
删除节点 1 2 3 [zk: localhost:2181(CONNECTED) 8] delete /zk-permanent [zk: localhost:2181(CONNECTED) 9] ls / [zk-test0000000000, zookeeper]
可以看到,已经成功删除/zk-permanent节点
若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点
zookeeper客户端ZkClient使用 ZkClient是Github上一个开源的zookeeper客户端,在Zookeeper原生API接口之上进行了包装,是一个更易用的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能
添加依赖:
1 2 3 4 5 6 7 8 9 10 11 12 <dependencies > <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.4.14</version > </dependency > <dependency > <groupId > com.101tec</groupId > <artifactId > zkclient</artifactId > <version > 0.11</version > </dependency > </dependencies >
创建会话 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package org.cwz.zk.demo;import org.I0Itec.zkclient.ZkClient;public class ZkDemo { public static void main (String[] args) { ZkClient zkClient = new ZkClient ("192.168.33.111:2181" ); System.out.println("zkClient = " + zkClient); } }
创建节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package org.cwz.zk.demo;import org.I0Itec.zkclient.ZkClient;public class ZkDemo { public static void main (String[] args) { ZkClient zkClient = new ZkClient ("192.168.33.111:2181" ); System.out.println("zkClient = " + zkClient); zkClient.createPersistent("/cwz-client/cwz-c1" , true ); System.out.println("path is created" ); } }
ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点
删除节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package org.cwz.zk.demo;import org.I0Itec.zkclient.ZkClient;public class ZkDemo { public static void main (String[] args) { ZkClient zkClient = new ZkClient ("192.168.33.111:2181" ); System.out.println("zkClient = " + zkClient); zkClient.deleteRecursive("/cwz-client" ); System.out.println("delete path is success" ); } }
监听节点变化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package org.cwz.zk.demo;import org.I0Itec.zkclient.IZkChildListener;import org.I0Itec.zkclient.ZkClient;import java.util.List;public class GetChange { public static void main (String[] args) throws InterruptedException { ZkClient zkClient = new ZkClient ("192.168.33.111" ); zkClient.subscribeChildChanges("/cwz-client" , new IZkChildListener () { @Override public void handleChildChange (String path, List<String> childs) throws Exception { System.out.println(path + "childs changes, current childs " + childs); } }); zkClient.createPersistent("/cwz-client" ); Thread.sleep(1000 ); zkClient.createPersistent("/cwz-client/c1" ); Thread.sleep(1000 ); zkClient.delete("/cwz-client/c1" ); Thread.sleep(1000 ); zkClient.delete("/cwz-client" ); Thread.sleep(Integer.MAX_VALUE); } }
运行结果:
1 2 3 4 /cwz-clientchilds changes, current childs [] /cwz-clientchilds changes, current childs [c1] /cwz-clientchilds changes, current childs [] /cwz-clientchilds changes, current childs null
由此可知:
监听器可以对不存在的目录进行监听
监听目录下 子节点发生改变,服务端都会通知客户端,并将最新的子节点列表发送给客户端
监听目录创建和删除本身也会被监听到
监听数据变化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package org.cwz.zk.demo;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;public class ListenDataChange { public static void main (String[] args) throws InterruptedException { ZkClient zkClient = new ZkClient ("192.168.33.111" ); zkClient.setZkSerializer(new ZkStrSerializer ()); boolean exists = zkClient.exists("/cwz-client" ); if (!exists) { zkClient.createEphemeral("/cwz-client" , "123" ); } zkClient.subscribeDataChanges("/cwz-client" , new IZkDataListener () { @Override public void handleDataChange (String path, Object data) throws Exception { System.out.println(path + " data is changed, new data " + data); } @Override public void handleDataDeleted (String path) throws Exception { System.out.println(path + " is deleted!" ); } }); Object data = zkClient.readData("/cwz-client" ); System.out.println("data = " + data); zkClient.writeData("/cwz-client" , "new data" ); Thread.sleep(1000 ); zkClient.delete("/cwz-client" ); Thread.sleep(Integer.MAX_VALUE); } }
自定义序列化类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package org.cwz.zk.demo;import org.I0Itec.zkclient.exception.ZkMarshallingError;import org.I0Itec.zkclient.serialize.ZkSerializer;public class ZkStrSerializer implements ZkSerializer { @Override public byte [] serialize(Object o) throws ZkMarshallingError { return String.valueOf(o).getBytes(); } @Override public Object deserialize (byte [] bytes) throws ZkMarshallingError { return new String (bytes); } }
运行结果:
1 2 3 data = 123 /cwz-client data is changed, new data new data /cwz-client is deleted!
zookeeper内部机制 选举机制
集群中半数以上机器存活,集群就可用。所以Zookeeper适合安装奇数台服务器。
Zookeeper虽然在配置文件中并没有指定Master和Slave。但是,Zookeeper工作时,是有一个节点为Leader,其它为Follower,Leader是通过内部的选举机制产生的
集群首次启动 假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动
zookeeper的选举机制
服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态
服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态
服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader
服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了
服务器5启动,同4一样称为follower
集群非首次启动
每个节点在选举时都会参考自身节点的zxid值(事务ID)
优先选择zxid值大的节点称为Leader
ZAB一致性协议 分布式数据一致性问题
将数据复制到分布式部署的多台机器中,可以消除单点故障,防止系统由于某台(些)机器宕机导致的不可用
通过负载均衡技术,能够让分布在不同地方的数据副本全都对外提供服务。有效提高系统性能
在分布式系统中引入数据复制机制后,多台数据节点之间由于网络等原因很容易产生数据不一致的情况
当客户端Client1将系统中的一个值K1由V1更新为V2,但是客户端Client2读取的是一个还没有同步更新的副本,K1的值依然是V1,这就导致了数据的不一致性。其中,常见的就是主从数据库之间的复制延时问题
ZAB协议 zookeeper就是分布式一致性问题的工业解决方案,paxos是其底层理论算法,其中zab,raft和众多开源算法是对paxos的工业级实现。ZK没有完全采用paxos算法,而是使用了一种称为Zookeeper Atomic Broadcast(ZAB,Zookeeper原子消息广播协议)的协议作为其数据一致性的核心算法
ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复和原子广播协议
ZK怎么处理集群中的数据?所有客户端写入数据都是写入Leader中,然后,由 Leader 复制到Follower中。ZAB会将服务器数据的状态变更以事务Proposal的形式广播到所有的副本进程上,ZAB协议能够保证了事务操作的一个全局的变更序号(ZXID)。
广播消息 ZAB 协议的消息广播过程类似于 二阶段提交过程。对于客户端发送的写请求,全部由 Leader 接收,Leader 将请求封装成一个事务 Proposal(提议),将其发送给所有 Follwer ,如果收到超过半数反馈ACK,则执行 Commit 操作(先提交自己,再发送 Commit 给所有 Follwer)。
不能正常反馈Follower恢复正常后会进入数据同步阶段最终与Leader保持一致
说明:
Leader接收到Client请求之后,会将这个请求封装成一个事务,并给这个事务分配一个全局递增的唯一 ID,称为事务ID(ZXID),ZAB 协议要求保证事务的顺序,因此必须将每一个事务按照 ZXID进行先后排序然后处理
ZK集群为了保证任何事务操作能够有序的顺序执行,只能是 Leader 服务器接受写请求,即使是Follower 服务器接受到客户端的请求,也会转发到 Leader 服务器进行处理
ZK提供的应该是最终一致性的标准。ZK所有节点接收写请求之后可以在一定时间内保证所有节点都能看到该条数据
Leader崩溃问题 Leader宕机后,ZK集群无法正常工作,ZAB协议提供了一个高效且可靠的leader选举算法
Leader宕机后,被选举的新Leader需要解决的问题:
ZAB 协议确保那些已经在 Leader 提交的事务最终会被所有服务器提交
ZAB 协议确保丢弃那些只在 Leader 提出/复制,但没有提交的事务
基于上面的目的,ZAB协议设计了一个选举算法:能够确保已经被Leader提交的事务被集群接受,丢弃还没有提交的事务
这个选举算法的关键点:保证选举出的新Leader拥有集群中所有节点最大编号(ZXID)的事务
zookeeper应用实践 ZooKeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架。
通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher事件通知机制,可以非常方便地构建一系列分布式应用中都会涉及的核心功能:
数据发布/订阅
命名服务
集群管理
Master选举
分布式锁
分布式队列
zookeeper两大特性:
客户端如果对Zookeeper的数据节点注册Watcher监听,那么当该数据节点的内容或是其子节点列表发生变更时,Zookeeper服务器就会向订阅的客户端发送变更通知
对在Zookeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么临时节点也会被自动删除
利用其两大特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册一个Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节 点:/clusterServers/[Hostname],这样,监控系统就能够实时监测机器的变动情况
服务器上下线动态监听 分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,而任意一台客户端都要能实时感知到主节点服务器的上下线
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package org.cwz.zk.onoffline;import org.I0Itec.zkclient.ZkClient;public class Server { ZkClient zkClient = null ; String connectInfo = "192.168.33.111:2181,192.168.33.112:2181" ; private void connectZk () { zkClient = new ZkClient (connectInfo); if (!zkClient.exists("/servers" )) { zkClient.createPersistent("/servers" ); } } private void saveServerInfo (String ip, String port) { String sequencePath = zkClient.createEphemeralSequential("/servers/server" , ip + ":" + port); System.out.println("服务器:" + ip + ":" + port + ",向zk保存信息成功,成功上线可以接受client查询" ); } public static void main (String[] args) { Server server = new Server (); server.connectZk(); server.saveServerInfo(args[0 ], args[1 ]); new TimeService (Integer.parseInt(args[1 ])).start(); } }
服务端提供时间查询的线程类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package org.cwz.zk.onoffline;import java.io.IOException;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.util.Date;public class TimeService extends Thread { private int port = 0 ; public TimeService (int port) { this .port = port; } @Override public void run () { try { ServerSocket serverSocket = new ServerSocket (port); while (true ) { Socket socket = serverSocket.accept(); OutputStream out = socket.getOutputStream(); out.write(new Date ().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 package org.cwz.zk.onoffline;import org.I0Itec.zkclient.IZkChildListener;import org.I0Itec.zkclient.ZkClient;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.Socket;import java.util.ArrayList;import java.util.List;import java.util.Random;public class Client { ZkClient zkClient = null ; String connectInfo = "192.168.33.111:2181,192.168.33.112:2181" ; ArrayList<String> infos = new ArrayList <String>(); private void connectZk () { zkClient = new ZkClient (connectInfo); List<String> childs = zkClient.getChildren("/servers" ); for (String child : childs) { Object data = zkClient.readData("/servers/" + child); infos.add(String.valueOf(data)); } zkClient.subscribeChildChanges("/servers" , new IZkChildListener () { @Override public void handleChildChange (String s, List<String> children) throws Exception { ArrayList<String> list = new ArrayList <>(); for (String path : children) { Object data = zkClient.readData("/servers/" + path); list.add(String.valueOf(data)); } infos = list; System.out.println("接收到通知,最新服务器信息为:" + infos); } }); } public void sendRequest () throws IOException { Random random = new Random (); int i = random.nextInt(infos.size()); String ipPort = infos.get(i); String[] arr = ipPort.split(":" ); Socket socket = new Socket (arr[0 ], Integer.parseInt(arr[1 ])); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); out.write("query time" .getBytes()); out.flush(); byte [] bytes = new byte [1024 ]; in.read(bytes); System.out.println("client端接收到server:" + ipPort + "返回结果:" + new String (bytes)); in.close(); out.close(); socket.close(); } public static void main (String[] args) throws IOException, InterruptedException { Client client = new Client (); client.connectZk(); while (true ) { client.sendRequest(); Thread.sleep(2000 ); } } }
启动多个服务端线程
分布式锁 锁的概念:
在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全(数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串行执行消除并发修改变量。
对变量或者堆代码码块做同步本质上就是加锁。目的就是实现多个线程在一个时刻同一个代码块只能有一个线程可执行
上面的设计存在线程安全问题:
假设Redis 里面的某个商品库存为 1;此时两个用户同时下单,其中一个下单请求执行到第3 步,更新数据库的库存为 0,但是第 4 步还没有执行。
而另外一个用户下单执行到了第 2 步,发现库存还是 1,就继续执行第 3 步。但是商品库存已经为0,所以如果数据库没有限制就会出现超卖的问题。
解决办法:
用锁把 2、3、4 步锁住,让他们执行完之后,另一个线程才能进来执行
公司业务发展迅速,系统应对并发不断提高,解决方案是要增加一台机器,结果会出现更大的问题
假设有两个下单请求同时到来,分别由两个机器执行,那么这两个请求是可以同时执行了,依然存在超卖的问题。
如图所示系统是运行在两个不同的 JVM 里面,不同的机器上,增加的锁只对自己当前 JVM 里面的线程有效,对于其他 JVM 的线程是无效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同一个锁,此时分布式锁就能解决该问题。
分布式锁的作用:在整个系统提供一个全局、唯一的锁,在分布式系统中每个系统在进行相关操作的时候需要获取到该锁,才能执行相应操作
使用zk实现分布式锁 利用Zookeeper可以创建临时带序号节点的特性来实现一个分布式锁
实现思路:
锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时的顺序节点,因为zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断
每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败
获取锁失败的线程获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁
main方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package org.cwz.zk.dislock;public class DisLockTest { public static void main (String[] args) { for (int i = 0 ; i < 10 ; i++) { new Thread (new DisLockRunnable ()).start(); } } static class DisLockRunnable implements Runnable { @Override public void run () { DisClient client = new DisClient (); client.getDisLock(); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } client.deleteLock(); } } }
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 package org.cwz.zk.dislock;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;public class DisClient { public DisClient () { synchronized (DisClient.class) { if (!zkClient.exists("/distrilock" )) { zkClient.createPersistent("/distrilock" ); } } } String beforeNodePath; String currentNodePath; private ZkClient zkClient = new ZkClient ("192.168.33.111,192.168.33.112" ); public void getDisLock () { String threadName = Thread.currentThread().getName(); if (tryGetLock()) { System.out.println(threadName + ": 获取到了锁" ); } else { System.out.println(threadName + ": 获取锁失败,进入等待状态" ); waitForLock(); getDisLock(); } } CountDownLatch countDownLatch = null ; public boolean tryGetLock () { if (currentNodePath == null || "" .equals(currentNodePath)) { currentNodePath = zkClient.createEphemeralSequential("/distrilock/" , "lock" ); } List<String> childs = zkClient.getChildren("/distrilock" ); Collections.sort(childs); String minNode = childs.get(0 ); if (currentNodePath.equals("/distrilock/" + minNode)) { return true ; } else { int i = Collections.binarySearch(childs, currentNodePath.substring("/distrilock/" .length())); String lastNodeChild = childs.get(i - 1 ); beforeNodePath = "/distrilock/" + lastNodeChild; } return false ; } public void waitForLock () { IZkDataListener iZkDataListener = new IZkDataListener () { @Override public void handleDataChange (String s, Object o) throws Exception { } @Override public void handleDataDeleted (String s) throws Exception { countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener); if (zkClient.exists(beforeNodePath)) { countDownLatch = new CountDownLatch (1 ); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener); } public void deleteLock () { if (zkClient != null ) { zkClient.delete(currentNodePath); zkClient.close(); } } }
分布式锁的实现可以是 Redis、Zookeeper,相对来说生产环境如果使用分布式锁可以考虑使用Redis实现而非zk
Hadoop HA HA概述
HA(High Available),即高可用(7*24小时不中断服务)
实现高可用最关键的策略是消除单点故障。Hadoop-HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA
Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)
NameNode主要在以下两个方面影响HDFS集群:
NameNode机器发生意外,如宕机,集群将无法使用,直到重启
NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用
HDFS HA功能通过配置Active/Standby两个NameNodes实现在集群中对NameNode的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将NameNode很快的切换到另外一台机器
HDFS-HA 通过双NameNode消除单点故障(Active/Standby)
HDFS-HA工作要点
元数据管理方式需要改变
内存中各自保存一份元数据
Edits日志只有Active状态的NameNode节点可以做写操作
两个NameNode都可以读取Edits
共享的Edits放在一个共享存储中管理(qjournal和NFS两个主流实现)
需要一个状态管理功能模块
实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在namenode节点,利用zk进行状态标识,当需要进行状态切换,由zkfailover负责,切换时需要防止脑裂(brain split)现象发生(集群中出现两个Active的Namenode)
同一时刻仅仅有一个NameNode对外提供服务
HDFS-HA工作机制 配置部署HDFS-HA进行自动故障转移。自动故障转移为HDFS部署增加了两个新组件:ZooKeepe和 ZKFailoverController(ZKFC)进程,ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:
故障检测
集群中的每个NameNode在ZooKeeper中维护了一个临时会话,如果机器崩溃,ZooKeeper中的 会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移
现役NameNode选择
ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode
ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:
健康检查 ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
ZooKeeper会话管理 当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除
基于ZooKeeper的选择 如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNode为 Active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为Active状态
HDFS-HA集群配置 Apache Hadoop 3.2.2 – HDFS High Availability Using the Quorum Journal Manager
集群规划
hadoop-node1
hadoop-node2
hadoop-node3
NameNode
NameNode
JournalNode
JournalNode
JournalNode
DataNode
DataNode
DataNode
ZK
ZK
ZK
ResourceManager
NodeManager
NodeManager
NodeManager
启动zookeeper集群:
配置HDFS-HA集群 1 2 3 4 5 6 7 8 9 10 11 stop-dfs.sh mkdir /opt/servers/hacp -r hadoop-2.9.2 harm -rf /opt/servers/ha/hadoop-2.9.2/data
配置hdfs-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 <property > <name > dfs.nameservices</name > <value > hacluster</value > </property > <property > <name > dfs.ha.namenodes.hacluster</name > <value > nn1,nn2</value > </property > <property > <name > dfs.namenode.rpc-address.hacluster.nn1</name > <value > hadoop-node1:9000</value > </property > <property > <name > dfs.namenode.rpc-address.hacluster.nn2</name > <value > hadoop-node2:9000</value > </property > <property > <name > dfs.namenode.http-address.hacluster.nn1</name > <value > hadoop-node1:50070</value > </property > <property > <name > dfs.namenode.http-address.hacluster.nn2</name > <value > hadoop-node2:50070</value > </property > <property > <name > dfs.namenode.shared.edits.dir</name > <value > qjournal://hadoop-node1:8485;hadoop-node2:8485;hadoop-node3:8485/ha</value > </property > <property > <name > dfs.client.failover.proxy.provider.hacluster</name > <value > org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value > </property > <property > <name > dfs.ha.fencing.methods</name > <value > sshfence</value > </property > <property > <name > dfs.ha.fencing.ssh.private-key-files</name > <value > /root/.ssh/id_rsa</value > </property > <property > <name > dfs.journalnode.edits.dir</name > <value > /opt/journalnode</value > </property > <property > <name > dfs.ha.automatic-failover.enabled</name > <value > true</value > </property >
配置core-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 <property > <name > fs.defaultFS</name > <value > hdfs://hacluster</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /opt/servers/ha/hadoop-2.9.2/data/tmp</value > </property > <property > <name > ha.zookeeper.quorum</name > <value > hadoop-node1:2181,hadoop-node2:2181,hadoop-node3:2181</value > </property >
拷贝配置好的hadoop环境到其他节点
启动HDFS-HA集群 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 /opt/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start journalnode /opt/servers/ha/hadoop-2.9.2/bin/hdfs namenode -format /opt/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start namenode /opt/servers/ha/hadoop-2.9.2/bin/hdfs namenode -bootstrapStandby /opt/servers/ha/hadoop-2.9.2/bin/hdfs zkfc -formatZK /opt/servers/ha/hadoop-2.9.2/sbin/start-dfs.sh
YARN-HA配置 官方文档:Apache Hadoop 3.2.2 – ResourceManager High Availability
YARN-HA工作机制,如图:
配置YARN-HA集群
hadoop-node1
hadoop-node2
hadoop-node3
NameNode
NameNode
JournalNode
JournalNode
JournalNode
DataNode
DataNode
DataNode
ZK
ZK
ZK
ResourceManager
ResourceManager
NodeManager
NodeManager
NodeManager
yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.resourcemanager.ha.enabled</name > <value > true</value > </property > <property > <name > yarn.resourcemanager.cluster-id</name > <value > cluster-yarn</value > </property > <property > <name > yarn.resourcemanager.ha.rm-ids</name > <value > rm1,rm2</value > </property > <property > <name > yarn.resourcemanager.hostname.rm1</name > <value > hadoop-node2</value > </property > <property > <name > yarn.resourcemanager.hostname.rm2</name > <value > hadoop-node3</value > </property > <property > <name > yarn.resourcemanager.zk-address</name > <value > hadoop-node1:2181,hadoop-node2:2181,hadoop-node3:2181</value > </property > <property > <name > yarn.resourcemanager.recovery.enabled</name > <value > true</value > </property > <property > <name > yarn.resourcemanager.store.class</name > <value > org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value > </property >
同步更新其他节点的配置信息,启动hdfs