ZooKeeper Examples - Queue

Datetime:2016-08-23 00:32:29          Topic: ZooKeeper           Share

Background

This is next in the series from my first post on ZooKeeper Examples.

All code can be cloned from this repo on GitHub.

Setup

I am using Docker to get a ZooKeeper instance up and running.

docker run -it -p 2181:2181 fabric8/zookeeper bash  

This will start up a container with a zookeeper instance one it. Once we have the instance downloaded and started then we can start up the stand alone instance.

Note:The container should have the working directory as /opt/zookeeper/

./bin/zkServer.sh start

And then start the ZooKeeper CLI.

./bin/zkCli.sh

Set up your pom.xml file to help get all of the classes that you will need. We are including the slf4j-simple dependency to help show log messages from the ZooKeeper host and our code.

<dependencies>  
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>

        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.5</version>
    </dependency>
</dependencies>  

Step 1 - Add a queue class

1

So we need to create a class that will handle all of the barrier calls to ZooKeeper. Let's create a class called Queue.java

public class Queue {

}

2

Let's start this class off very simply. We'll add a main method that just connects to our ZooKeeper instance.

public class Queue implements Watcher {  
    private final Logger LOG = LoggerFactory.getLogger(Queue.class);

    private final String hostPort;
    private ZooKeeper zk;

    public Queue(String hostPort) {
        this.hostPort = hostPort;
    }

    void startZK() throws IOException {
        zk = new ZooKeeper(hostPort, 15000, this);
        LOG.debug("queue started");
    }

    void stopZK() throws InterruptedException, IOException {
        zk.close();
        LOG.debug("queue closed");
    }

    public void process(WatchedEvent e) {
        LOG.info("Processing event: " + e.toString());
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Queue queue = new Queue(args[0]);

        queue.startZK();

        queue.stopZK();
    }

}

3

Running this main() will start a connection with the ZooKeeper instance. Your program arg should include the ip address and port number to the ZooKeeper host instance. In my case I am running the Docker Toolbox and I exposed the 2181 port from my Docker container, so I can give it 192.168.99.100:2181 .

You should see output like the following...

...removed for brevity...
[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.queue.Queue@eed1f14
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x15096ac85270007, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.queue.Queue - Processing event: WatchedEvent state:SyncConnected type:None path:null
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15096ac85270007 closed
[main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down

Take not of this line: Processing event: WatchedEvent state:SyncConnected type:None path:null . This tells us that the Queue class received a SyncConnected event that says the connection the ZooKeeper host was made successfully. You can see all of the possible responses from the KeeperState here .

Step 2 - Setup connection states

Next we are going to setup some host connection states, so we can take action when certain states are met by the ZooKeeper host.

1

Add a connected and expired variable to the Queue class.

private final String hostPort;  
private ZooKeeper zk;  
private volatile boolean connected = false; // add a host connected flag  
private volatile boolean expired = false; // add a host disconnected flag

public Queue(String hostPort) {  

2

Now we can utilize the two new variables to help set the state of our connection with the ZooKeeper host. Add some code to our process(WatchedEvent e)

public void process(WatchedEvent e) {  
    LOG.info("Processing event: " + e.toString());
    if(e.getType() == Event.EventType.None){
        switch (e.getState()) {
            case SyncConnected:
                LOG.debug("connected to zookeeper");
                connected = true;
                break;
            case Disconnected:
                LOG.debug("disconnected from zookeeper");
                connected = false;
                break;
            case Expired:
                expired = true;
                connected = false;
                LOG.error("session expiration");
            default:
                break;
        }
    }
}

3

Now let's add some accessor methods for the connected and expired variables.

boolean isConnected() {  
    return connected;
}

boolean isExpired() {  
    return expired;
}

4

Finally, let's add some checks to the main(String[] args) to see what state the connection is in.

public static void main(String[] args) throws IOException, InterruptedException {  
    Queue queue = new Queue(args[0]);

    queue.startZK();

    // keep checking to see if the we are connected to the ZooKeeper host
    while(!queue.isConnected()){
        System.out.println("sleeping for connection...");
        Thread.sleep(100);
    }

    // keep checking to see if the we are disconnected from the ZooKeeper host
    while(!queue.isExpired()){
        System.out.println("sleeping for expiration...");
        Thread.sleep(1000);
    }

    queue.stopZK();
}

5

When you run the Queue.main() you should see some output like the following. Now you will have information about when you are connected to or disconnected from the ZooKeeper host.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.queue.Queue@65b54208
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x15222b0052d0007, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.queue.Queue - Processing event: WatchedEvent state:SyncConnected type:None path:null

Step 3 - Add the "/_QUEUE_" znode

Let's add a new znode to ZooKeeper.

1

We are making use of the asynchronous calls that you can make to the ZooKeeper host using the Java API, so we need to implement some callback values.

AsyncCallback.StringCallback queueCreateCallback = new AsyncCallback.StringCallback() {  
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (KeeperException.Code.get(rc)) {
            case CONNECTIONLOSS:
                createQueue();
                break;
            case OK:
            case NODEEXISTS:
                LOG.info("Queue exists.");
                break;
            default:
                LOG.error("Something went wrong when setting barrier.",
                        KeeperException.create(KeeperException.Code.get(rc), path));
        }
    }
};

This callback will take action when the znode create is completed. We want to try to set the queue again if the connection is lost.

3

Now we can create our method to make the actual call to the ZooKeeper host.

public void createQueue() {  
    LOG.info("Creating queue");
    zk.create(QUEUE_ZNODE,
            serverId.getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT,
            queueCreateCallback,
            null);
}

This is the asynchronous call to the ZooKeeper host. The first parameter is the name of the znode that you want to setup, the second parameter is data that we want to share (in this case we are just going to share the id of the server that we are "using" as the queue data), the third parameter is the ACL to apply to the znode, the fourth parameter is the creation policy for the znode, and the fifth parameter is the callback to use.

4

The final piece is to call the createQueue() method from main() .

public static void main(String[] args) throws IOException, InterruptedException {  
    Queue queue = new Queue(args[0]);

    queue.startZK();

    // keep checking to see if the we are connected to the ZooKeeper host
    while(!queue.isConnected()){
        System.out.println("sleeping for connection...");
        Thread.sleep(100);
    }

    queue.createQueue(); // call the method to setup the "/barrier" znode

    // keep checking to see if the we are disconnected from the ZooKeeper host
    while(!queue.isExpired()){
        System.out.println("sleeping for expiration...");
        Thread.sleep(1000);
    }

    queue.stopZK();
}

5

Run the Queue.main() now and notice your output for log statements.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.queue.Queue@65b54208
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x15222b0052d0007, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.queue.Queue - Processing event: WatchedEvent state:SyncConnected type:None path:null
[main] INFO zookeeper.queue.Queue - Creating queue
[main-EventThread] INFO zookeeper.queue.Queue - Queue exists.

Step 4 - Create the producer class

With our main znode in place we can no create our producer to add items to the queue.

1

Create a Producer.java in the same package that we created the Queue.java and add in all of the ZooKeeper setup code.

public class Producer implements Watcher {  
    private static final Logger LOG = LoggerFactory.getLogger(Producer.class);

    private ZooKeeper zk;
    private String hostPort;
    private volatile boolean connected = false;
    private volatile boolean expired = false;

    public Producer(final String hostPort) {
        this.hostPort = hostPort;
    }

    public void startZK() throws IOException {
        this.zk = new ZooKeeper(this.hostPort, 15000, this);
    }

    public void stopZK() throws InterruptedException, IOException {
        close();
    }

    public void process(WatchedEvent e) {
        LOG.info("Processing event: " + e.toString());
        if(e.getType() == Event.EventType.None){
            switch (e.getState()) {
                case SyncConnected:
                    connected = true;
                    break;
                case Disconnected:
                    connected = false;
                    break;
                case Expired:
                    expired = true;
                    connected = false;
                    LOG.error("Session expiration");
                default:
                    break;
            }
        }
    }

    /**
     * Check if this client is connected.
     *
     * @return boolean ZooKeeper client is connected
     */
    boolean isConnected() {
        return connected;
    }

    /**
     * Check if the ZooKeeper session has expired.
     *
     * @return boolean ZooKeeper session has expired
     */
    boolean isExpired() {
        return expired;
    }

    /**
     * Closes the ZooKeeper session.
     *
     * @throws IOException
     */
    public void close() throws IOException {
        if(zk != null) {
            try{
                zk.close();
            } catch (InterruptedException e) {
                LOG.warn( "Interrupted while closing ZooKeeper session.", e );
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Producer producer = new Producer(args[0]);

        producer.startZK();

        while (!producer.isConnected()) {
            Thread.sleep(100);
        }

        while (!producer.isExpired()) {
            Thread.sleep(1000);
        }

        producer.stopZK();
    }
}

We should be able to run the Producer code and connect to the ZooKeeper instance that is running in Docker.

2

Next we can add in the a method to see if our queue was created in ZooKeeper and to get notified when it has been.

public void queueExists() {  
    zk.exists("/queue-",
            queueExistsWatcher,
            queueExistsCallback,
            null);
}

Watcher queueExistsWatcher = new Watcher() {  
    public void process(WatchedEvent e) {
        LOG.info("Processing event: " + e.toString());
        if (e.getType() == Event.EventType.NodeCreated) {
        }
    }
};

AsyncCallback.StatCallback queueExistsCallback = new AsyncCallback.StatCallback() {  
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        LOG.info("Processing result: " + KeeperException.Code.get(rc));
        switch (KeeperException.Code.get(rc)) {
            case OK:
                break;
        }
    }
};

...

public static void main(String[] args) throws IOException, InterruptedException {  
    Producer producer = new Producer(args[0]);

    producer.startZK();

    while (!producer.isConnected()) {
        Thread.sleep(100);
    }

    producer.queueExists(); // add this call to start the queue creation

    while (!producer.isExpired()) {
        Thread.sleep(1000);
    }

    producer.stopZK();
}

This is going to get us setup to add queue items when the queue is created in ZooKeeper.

3

Let's finally add in our method to create our queue items.

public class Producer implements Watcher {  
    private static final Logger LOG = LoggerFactory.getLogger(Producer.class);

    public static String QUEUE_ZNODE_ITEM = "/queue-";

    private ZooKeeper zk;
    private String hostPort;
    private Random random = new Random(this.hashCode());
    private String serverId = Integer.toHexString( random.nextInt() );
    private volatile boolean connected = false;
    private volatile boolean expired = false;

    ...

    Watcher queueExistsWatcher = new Watcher() {
        public void process(WatchedEvent e) {
            LOG.info("Processing event: " + e.toString());
            if (e.getType() == Event.EventType.NodeCreated) {
                createQueueItem(); // add this line to make the call when the "/_QUEUE_" znode is created
            }
        }
    };

    AsyncCallback.StatCallback queueExistsCallback = new AsyncCallback.StatCallback() {
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            LOG.info("Processing result: " + KeeperException.Code.get(rc));
            switch (KeeperException.Code.get(rc)) {
                case OK:
                    createQueueItem(); // add this line
                    break;
            }
        }
    };

    public void createQueueItem() {
        try {
            for (int i = 0; i < 10; i++) {
                LOG.info("Creating queue item: " + i);
                zk.create("/_QUEUE_/queue-",
                        serverId.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL
                );
                serverId = Integer.toHexString( random.nextInt() );
            }
        } catch (KeeperException | InterruptedException ex) {
            LOG.error(ex.getLocalizedMessage(), ex);
        }
    }

With all of this in place you can run the Queue.main() and Producer.main() in order to create 10 queue items.

Step 5 - Create the consumer class

Our final class to add is the consumer class that will read the data from each queue item and then delete the queue item.

1

Let's add a new class to the same package as the other two classes.

public class Consumer implements Watcher {  
    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);

    private ZooKeeper zk;
    private String hostPort;
    private volatile boolean connected = false;
    private volatile boolean expired = false;

    public Consumer(final String hostPort) {
        this.hostPort = hostPort;
    }

    public void startZK() throws IOException {
        this.zk = new ZooKeeper(this.hostPort, 15000, this);
    }

    public void stopZK() throws InterruptedException, IOException {
        close();
    }

    public void process(WatchedEvent e) {
        LOG.info("Processing event: " + e.toString());
        if(e.getType() == Event.EventType.None){
            switch (e.getState()) {
                case SyncConnected:
                    connected = true;
                    break;
                case Disconnected:
                    connected = false;
                    break;
                case Expired:
                    expired = true;
                    connected = false;
                    LOG.error("Session expiration");
                default:
                    break;
            }
        }
    }

    /**
     * Check if this client is connected.
     *
     * @return boolean ZooKeeper client is connected
     */
    boolean isConnected() {
        return connected;
    }

    /**
     * Check if the ZooKeeper session has expired.
     *
     * @return boolean ZooKeeper session has expired
     */
    boolean isExpired() {
        return expired;
    }

    /**
     * Closes the ZooKeeper session.
     *
     * @throws IOException
     */
    public void close() throws IOException {
        if(zk != null) {
            try{
                zk.close();
            } catch (InterruptedException e) {
                LOG.warn( "Interrupted while closing ZooKeeper session.", e );
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Consumer consumer = new Consumer(args[0]);

        consumer.startZK();

        while (!consumer.isConnected()) {
            Thread.sleep(100);
        }

        while (!consumer.isExpired()) {
            Thread.sleep(1000);
        }

        consumer.stopZK();
    }
}

2

Let's finally add the code to process the queue items.

public void processQueue() {  
    try {
        List<String> children = zk.getChildren(Queue.QUEUE_ZNODE, true);

        Collections.sort(children);

        for (String item : children) {
            String path = Queue.QUEUE_ZNODE + "/" + item;
            zk.getData(path,
                    false,
                    dataCallback,
                    null);
        }
    } catch (KeeperException | InterruptedException e) {
        LOG.error("Error getting children for znode: " + Queue.QUEUE_ZNODE, e);
    }
}

AsyncCallback.DataCallback dataCallback = new AsyncCallback.DataCallback() {  
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        LOG.info("Processing data: " + KeeperException.Code.get(rc));
        switch (KeeperException.Code.get(rc)) {
            case OK:
                LOG.info(new String(data));
                deleteQueueItem(path);
                break;
        }
    }
};

public void deleteQueueItem(String path) {  
    LOG.info("Deleting queue item: " + path);
    zk.delete(path, -1, deleteQueueItemCallback, null);
}

AsyncCallback.VoidCallback deleteQueueItemCallback = new AsyncCallback.VoidCallback(){  
    public void processResult(int rc, String path, Object ctx){
        switch (KeeperException.Code.get(rc)) {
            case CONNECTIONLOSS:
                zk.delete(path, -1, deleteQueueItemCallback, null);
                break;
            case OK:
                LOG.info("Successfully deleted " + path);
                break;
            default:
                LOG.error("Something went wrong here, " +
                        KeeperException.create(KeeperException.Code.get(rc), path));
        }
    }
};

After adding this code you can run all main methods for Queue , Producer and Consumer . The log statements will show you the outcome of adding the items with the producer class and processing and deleting them with the consumer class.

Final

In order to see the before and after you will have to run the Queue.main() and Producer.main() and then check the ZooKeeper zkCli.sh for the queue data. After checking the before state you can run Consumer.main() and then check the after state to see that all of the queue items have been deleted.





About List