2019-01-08 · Develop

Java 集成 ZooKeeper

前面一篇文章 Zookeeper 简介 中,使用命令行交互界面来操作 ZooKeeper 的节点。 这篇文章来讲述下将 ZooKeeper 集成到 Spring Boot 中。

ZooKeeper Java API

ZooKeeper 提供了 Java 和 C 的驱动包来操作节点,只需要引入如下的依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>

ZooKeeper 提供的 API 很多,常用的有如下几个

// 建立连接
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException
// 创建节点
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
// 判断节点是否存在
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
// 获取节点数据
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException 
// 设置节点数据
public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException
// 删除节点
public void delete(final String path, int version) throws InterruptedException, KeeperException
// 获取子节点列表
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
// 关闭节点
public synchronized void close() throws InterruptedException

其中的参数含义:
connectString zookeeper server列表, 以逗号隔开。创建 ZooKeeper 对象时,只要对象完成初始化便立即返回,建立连接是以异步的形式进行的,当连接成功建立后,会回调 Watcher 的 process 方法。
sessionTimeout 连接的超时时间。
watcher 事件回调接口。如果是 boolean 值这表示 指定是否监听 path node 的的增加和删除事件, 以及数据改变事件。
path znode 的路径。
data 节点上的数据。
acl 指定权限信息, 如果不想指定权限, 可以传入 Ids.OPEN_ACL_UNSAFE
createMode 指定znode类型。 CreateMode是一个枚举类。
stat 是节点的状态信息。
version 指定数据的版本, 如果 version 和真实的版本不同, 操作将失败. 指定 version 为 -1 则忽略版本检查。

下面是 Java 代码的简单示例

@Slf4j
public class ZooKeeperTest {

    @Test
    public void zookeeper() throws IOException, KeeperException, InterruptedException {
        // 创建连接
        ZooKeeper zooKeeper = new ZooKeeper("192.168.19.124:2181", 5000,
                // 监控所有被触发的事件
                watchedEvent -> System.out.println("Watched " + watchedEvent.getType() + " event!"));

        // 创建节点 - 顺序临时节点
        String path = zooKeeper.create("/root", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        log.info("path -> " + path);

        // 判断节点是否存在,并监听-创建、删除、数据更新事件
        Stat stat = zooKeeper.exists(path, true);

        // 获取节点数据
        byte[] data = zooKeeper.getData(path, false, null);

        // 更新节点数据
        stat = zooKeeper.setData(path, "Hello,ZooKeeper".getBytes(), -1);

        // 删除节点
        zooKeeper.delete(path, -1);

        // 获取子节点列表
        List<String> childrenPathList = zooKeeper.getChildren("/", false);
        for (String childrenPath : childrenPathList) {
            log.info("childrenPath -> " + childrenPath);
        }

        // 关闭连接
        zooKeeper.close();
    }
}

Apache Curator

ZooKeeper 本身提供了低级别的 Java API 来实现节点的操作。 Curator 是 Apache 提供的一个访问 zk 的工具包,封装了对节点的操作,同时提供了一些高级服务,比如分布式锁、领导选举等。集成 Curator 只需要在 pom 中添加如下的依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>3.3.0</version>
</dependency>

Curator API 是链式调用风格,遇到 forPath 接口就触发 ZooKeeper 调用。直接操作节点的 API 示例如下:

@Slf4j
public class CuratorRecipesTest {

    @Test
    public void curatorRecipes() throws Exception {
        // RetryPolicy 用于重连策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.19.124:2181", retryPolicy);
        // 开始连接
        client.start();

        // 创建节点
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/root", "Hello, Curator".getBytes());

        // 判断节点是否存在
        // inBackground 设置异步执行,异步执行的结果也将通过 CuratorListener 进行通知
        Stat stat = client.checkExists().watched().inBackground().forPath(path);

        // 监听 Watched 事件
        client.getCuratorListenable().addListener((curatorFramework, curatorEvent) -> {
            CuratorEventType curatorEventType = curatorEvent.getType();
            if(curatorEventType == CuratorEventType.WATCHED) {
                WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
                Watcher.Event.EventType eventType = watchedEvent.getType();
                log.info("Watcher.Event.EventType -> " + eventType);
                // zk 得到监听的消息后,客户端还需要再次设置监听,才能收到后面的节点变化的事件
                client.checkExists().watched().forPath(watchedEvent.getPath());
            }
        });

        // 获取节点数据
        byte[] data = client.getData().forPath(path);

        // 更新节点数据
        stat = client.setData().forPath(path);

        // 删除节点
        client.delete().forPath(path);

        // 获取子节点
        List<String> childrenPathList = client.getChildren().forPath("/");

        // 关闭连接
        client.close();
    }
}

但是当我执行上面的测试代码的时候就会出现如下的报错信息

org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /root

	at org.apache.zookeeper.KeeperException.create(KeeperException.java:103)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1297)
	at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1075)
	at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1058)
	at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:67)
	at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
	at org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:1055)
	at org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:524)
	at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:514)
	at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:44)
	at com.zuojl.rapid.boot.zookeeper.CuratorRecipesTest.curatorRecipes(CuratorRecipesTest.java:30)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

出现上面的错误并不是调用方法的错误使用,而是因为 Curator Framework 的 2.x.x 版本和 3.x.x 版本对 Zookeeper 支持的版本是有差异的。查看官网有如下的提示

Versions
The are currently two released versions of Curator, 2.x.x and 3.x.x:

Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc.

所以将上面的依赖更改为如下即可解决

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>