ZooKeeper Examples - Barrier

Datetime:2016-08-23 00:33:25          Topic: ZooKeeper           Share

Background

I was reading a tech book on Apache ZooKeeper from Packt Publishing, and I found that they talked about multiple examples that did not have code examples to go with them. I wanted to see if I could learn ZooKeeper API code by producing these examples from the pseudo code they had in the book into real code.

Setup

I am using Docker to get a ZooKeeper instance up and running.

docker run -it -p 2181:2181 fabric8/zookeeper bash  

This will start up a container with a zookeeper instance one it. Once we have the instance downloaded and started then we can start up the stand alone instance.

Note:The container should have the working directory as /opt/zookeeper/

./bin/zkServer.sh start

And then start the ZooKeeper CLI.

./bin/zkCli.sh

Set up your pom.xml file to help get all of the classes that you will need. We are including the slf4j-simple dependency to help show log messages from the ZooKeeper host and our code.

<dependencies>  
	<dependency>
		<groupId>org.apache.zookeeper</groupId>
		<artifactId>zookeeper</artifactId>
		<version>3.4.6</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>log4j</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-simple</artifactId>
		<version>1.7.5</version>
	</dependency>
</dependencies>  

Step 1 - Add a barrier class

1

So we need to create a class that will handle all of the barrier calls to ZooKeeper. Let's create a class called Barrier.java

public class Barrier {

}

2

Let's start this class off very simply. We'll add a main method that just connects to our ZooKeeper instance.

public class Barrier implements Watcher {  
	private final Logger LOG = LoggerFactory.getLogger(Barrier.class);
	private final String hostPort;
	private ZooKeeper zk;
	public Barrier(String hostPort) {
		this.hostPort = hostPort;
	}
	void startZK() throws IOException {
		zk = new ZooKeeper(hostPort, 15000, this);
		LOG.debug("barrier started");
	}
	void stopZK() throws InterruptedException, IOException {
		zk.close();
		LOG.debug("barrier closed");
	}
	public void process(WatchedEvent e) {
		LOG.info("Processing event: " + e.toString());
	}
	public static void main(String[] args) throws IOException, InterruptedException {
		Barrier barrier = new Barrier(args[0]);
		barrier.startZK();
		barrier.stopZK();
	}
}

3

Running this main() will start a connection with the ZooKeeper instance. Your program arg should include the ip address and port number to the ZooKeeper host instance. In my case I am running the Docker Toolbox and I exposed the 2181 port from my Docker container, so I can give it 192.168.99.100:2181 .

You should see output like the following...

...removed for brevity...
[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.barrier.Barrier@eed1f14
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x15096ac85270007, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.barrier.Barrier - Processing event: WatchedEvent state:SyncConnected type:None path:null
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15096ac85270007 closed
[main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down

Take not of this line: Processing event: WatchedEvent state:SyncConnected type:None path:null . This tells us that the Barrier class received a SyncConnected event that says the connection the ZooKeeper host was made successfully. You can see all of the possible responses from the KeeperState here .

Step 2 - Setup connection states

Next we are going to setup some host connection states, so we can take action when certain states are met by the ZooKeeper host.

1

Add a connected and expired variable to the Barrier class.

private final String hostPort;  
private ZooKeeper zk;  
private volatile boolean connected = false; // add a host connected flag  
private volatile boolean expired = false; // add a host disconnected flag

public Barrier(String hostPort) {  

2

Now we can utilize the two new variables to help set the state of our connection with the ZooKeeper host. Add some code to our process(WatchedEvent e)

public void process(WatchedEvent e) {  
	LOG.info("Processing event: " + e.toString());
	if(e.getType() == Event.EventType.None){
		switch (e.getState()) {
			case SyncConnected:
				LOG.debug("connected to zookeeper");
				connected = true;
				break;
			case Disconnected:
				LOG.debug("disconnected from zookeeper");
				connected = false;
				break;
			case Expired:
				expired = true;
				connected = false;
				LOG.error("session expiration");
			default:
				break;
		}
	}
}

3

Now let's add some accessor methods for the connected and expired variables.

boolean isConnected() {  
	return connected;
}
boolean isExpired() {  
	return expired;
}

4

Finally, let's add some checks to the main(String[] args) to see what state the connection is in.

public static void main(String[] args) throws IOException, InterruptedException {  
	Barrier barrier = new Barrier(args[0]);
	barrier.startZK();
	// keep checking to see if the we are connected to the ZooKeeper host
	while(!barrier.isConnected()){
		System.out.println("sleeping for connection...");
		Thread.sleep(100);
	}
	// keep checking to see if the we are disconnected from the ZooKeeper host
	while(!barrier.isExpired()){
		System.out.println("sleeping for expiration...");
		Thread.sleep(1000);
	}
	barrier.stopZK();
}

5

When you run the Barrier.main() you should see some output like the following. Now you will have information about when you are connected to or disconnected from the ZooKeeper host.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.tutorial.Barrier@eed1f14
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x15096ac8527000b, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.tutorial.Barrier - Processing event: WatchedEvent state:SyncConnected type:None path:null
sleeping for expiration...  

Step 3 - Add the "/barrier" znode

Let's start using ZooKeeper for it's purpose and setup some information and data.

1

First we are going to add a variable to help us know the state of the barrier.

private final String hostPort;  
private ZooKeeper zk;  
private volatile boolean connected = false;  
private volatile boolean expired = false;  
private volatile boolean barrierSet = false; // add a barrier state flag  

2

We are making use of the asynchronous calls that you can make to the ZooKeeper host using the Java API, so we need to implement some callback values.

AsyncCallback.StringCallback barrierCreateCallback = new AsyncCallback.StringCallback() {  
	public void processResult(int rc, String path, Object ctx, String name) {
		switch (KeeperException.Code.get(rc)) {
			case CONNECTIONLOSS:
				LOG.debug("connection was lost while setting the barrier");
				setBarrier();
				break;
			case OK:
			case NODEEXISTS:
				LOG.debug("the barrier is set");
				barrierSet = true;
				break;
			default:
				barrierSet = false;
				LOG.error("Something went wrong when setting barrier.",
						KeeperException.create(KeeperException.Code.get(rc), path));
		}
		LOG.info("barrier is " + (barrierSet ? "" : "not ") + "set");
	}
};

This callback will take action when the znode create is completed. We either want to try to set the barrier again if the connection is lost or set the barrier state to true if it is "OK" or "NODEEXISTS". The "NODEEXISTS" means that the "/barrier" znode was already set prior to it trying to set again. The default action is to log the error.

3

Now we can create our method to make the actual call to the ZooKeeper host.

public void setBarrier() {  
    LOG.info("setting barrier");
    zk.create("/barrier",
	    "barrier-id".getBytes(),
	    ZooDefs.Ids.OPEN_ACL_UNSAFE,
	    CreateMode.EPHEMERAL,
	    barrierCreateCallback,
	    null);
}

This is the asynchronous call to the ZooKeeper host. The first parameter is the name of the znode that you want to setup, the second parameter is data that we want to share (in this case we are just going to share the id of the server that we are "using" as the barrier), and the third parameter is the znode mode that we are using (EPHEMERAL means that the node will be deleted if the "/barrier" znode fails to respond; we don't want it to stay around with no one controlling it).

4

The final piece is to call the setBarrier() method from main() .

public static void main(String[] args) throws IOException, InterruptedException {  
	Barrier barrier = new Barrier(args[0]);
	barrier.startZK();
	// keep checking to see if the we are connected to the ZooKeeper host
	while(!barrier.isConnected()){
		System.out.println("sleeping for connection...");
		Thread.sleep(100);
	}
	barrier.setBarrier(); // call the method to setup the "/barrier" znode
	// keep checking to see if the we are disconnected from the ZooKeeper host
	while(!barrier.isExpired()){
		System.out.println("sleeping for expiration...");
		Thread.sleep(1000);
	}
	barrier.stopZK();
}

5

Run the Barrier.main() now and notice your output for log statements.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.tutorial.Barrier@4c873330
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x1509a25c5300001, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.tutorial.Barrier - Processing event: WatchedEvent state:SyncConnected type:None path:null
[main] INFO zookeeper.tutorial.Barrier - setting barrier
sleeping for expiration...  
[main-EventThread] INFO zookeeper.tutorial.Barrier - barrier is set
sleeping for expiration...  

Step 4 - Watch the "/barrier" znode

The point of the Barrier workflow in ZooKeeper is to hold clients at bay until the barrier disappears. Once the barrier disappears then the clients can perform their operations. When the barrier disappears the expectation is that one will go up after the removal to hold the next set of clients.

1

We need to setup a watcher to watch for the removal of the barrier.

Watcher barrierExistsWatcher = new Watcher() {  
	public void process(WatchedEvent e) {
		if(e.getType() == Event.EventType.NodeDeleted) {
			LOG.info("barrier deleted");
			if ("/barrier".equals( e.getPath() )) {
				setBarrier();
			}
		}
	}
};

2

Because we are doing another asynchronous call we need to create a callback object.

AsyncCallback.StatCallback barrierExistsCallback = new AsyncCallback.StatCallback() {  
	public void processResult(int rc, String path, Object ctx, Stat stat) {
		LOG.debug("Code = {}", KeeperException.Code.get(rc));
		switch (KeeperException.Code.get(rc)) {
			case CONNECTIONLOSS:
				barrierExists();
				break;
			case OK:
				if (!barrierSet) setBarrier();
				break;
			case NONODE:
				barrierSet = false;
				setBarrier();
				LOG.info("set the barrier again.");
				break;
		}
	}
};

If the connection is lost, then we will try to check if the barrier exists again. If the operation works, then we'll see if the barrier is still set and if not, then we will set it again. If there is no node for the "/barrier" znode, then we will set the flag and set the barrier again.

3

We can now add the method to setup the watch on the ZooKeeper host.

public void barrierExists() {  
    zk.exists("/barrier",
	    barrierExistsWatcher,
	    barrierExistsCallback,
	    null);
}

4

The final step is to call the barrierExists() after the barrier is set. Inside the barrierCreateCallback object we need to call it if everything was setup ok.

AsyncCallback.StringCallback barrierCreateCallback = new AsyncCallback.StringCallback() {  
	public void processResult(int rc, String path, Object ctx, String name) {
		switch (KeeperException.Code.get(rc)) {
			case CONNECTIONLOSS:
				LOG.debug("connection was lost while setting the barrier");
				setBarrier();
				break;
			case OK:
			case NODEEXISTS:
				LOG.debug("the barrier is set");
				barrierSet = true;
				barrierExists(); // add this line in order to setup the watch on the ZooKeeper host
				break;
			default:
				barrierSet = false;
				LOG.error("Something went wrong when setting barrier.",
						KeeperException.create(KeeperException.Code.get(rc), path));
		}
		LOG.info("barrier is " + (barrierSet ? "" : "not ") + "set");
	}
};

5

Run the Barrier.main() and notice that barrier gets set, but it sits there waiting for something to happen to the barrier. We can force the barrier to be deleted by going into the zkCli.sh operation and calling delete /barrier . After doing this you should see lines in your log that look like the following.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.tutorial.Barrier@776ec8df
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x1509a25c5300004, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.tutorial.Barrier - Processing event: WatchedEvent state:SyncConnected type:None path:null
[main] INFO zookeeper.tutorial.Barrier - setting barrier
sleeping for expiration...  
[main-EventThread] INFO zookeeper.tutorial.Barrier - barrier is set
sleeping for expiration...  
sleeping for expiration...  
sleeping for expiration...  
sleeping for expiration...  
[main-EventThread] INFO zookeeper.tutorial.Barrier - barrier deleted
[main-EventThread] INFO zookeeper.tutorial.Barrier - setting barrier
[main-EventThread] INFO zookeeper.tutorial.Barrier - barrier is set
sleeping for expiration...  
sleeping for expiration...  
sleeping for expiration...  

Step 5 - Add the "/clients" znode

The final part of this example is to have some clients that are waiting for the barrier to be removed.

1

First we need to add a new class called Client.java . This will look very similar to Barrier.java initial setup.

public class Client implements Watcher {  
	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
	private final String hostPort;
	private ZooKeeper zk;
	private volatile boolean connected = false;
	private volatile boolean expired = false;
	public Client(final String hostPort) {
		this.hostPort = hostPort;
	}
	void startZK() throws IOException {
		zk = new ZooKeeper(hostPort, 15000, this);
		LOG.debug("client started");
	}
	void stopZK() throws InterruptedException, IOException {
		zk.close();
		LOG.debug("client closed");
	}
	public void process(WatchedEvent e) {
		LOG.info("Processing event: " + e.toString());
	}
	boolean isConnected() {
		return connected;
	}
	boolean isExpired() {
		return expired;
	}
	public static void main(String[] args) throws IOException, InterruptedException {
		Client client = new Client(args[0]);
		client.startZK();
		while (!client.isConnected()) {
			Thread.sleep(100);
		}
		while (!client.isExpired()) {
			Thread.sleep(1000);
		}
		client.stopZK();
	}
}

2

You should see some log info that is similar to the Barrier one.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.tutorial.Client@eed1f14
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x1509a25c530000d, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.tutorial.Client - Processing event: WatchedEvent state:SyncConnected type:None path:null
sleeping for expiration...  

3

Create the "/clients" znode to hold all of the clients that will wait for the barrier.

public void createClientParent() {  
	zk.create("/clients",
			"clients-id".getBytes(),
			ZooDefs.Ids.OPEN_ACL_UNSAFE,
			CreateMode.PERSISTENT,
			createClientParentCallback,
			null);
}
AsyncCallback.StringCallback createClientParentCallback = new AsyncCallback.StringCallback() {  
	public void processResult(int rc, String path, Object ctx, String name) {
		switch (KeeperException.Code.get(rc)) {
			case CONNECTIONLOSS:
				createClientParent();
				break;
			case OK:
			case NODEEXISTS:
				break;
			default:
				LOG.error("there was an issue creating the client parent.");
				break;
		}
	}
};

We make another create asynchronous call to the ZooKeeper host to create the "/clients" znode. If there is a connection loss, then we will try again. If we are successful, then we will create the client. That is the next part.

4

Add the call to the Client.main() for the createClientParent() .

public static void main(String[] args) throws IOException, InterruptedException {  
	Client client = new Client(args[0]);
	client.startZK();
	while (!client.isConnected()) {
		System.out.println("sleeping for connection...");
		Thread.sleep(100);
	}
	client.createClientParent(); // add the call to create the "/clients" znode
	while (!client.isExpired()) {
		System.out.println("sleeping for expiration...");
		Thread.sleep(10000);
	}
	client.stopZK();
}

5

Add the client znode.

public void createClient() {  
	int randomInt = new Random(System.currentTimeMillis()).nextInt();
	String hex = Integer.toHexString(randomInt);
	zk.create("/clients/" + hex,
			hex.getBytes(),
			ZooDefs.Ids.OPEN_ACL_UNSAFE,
			CreateMode.EPHEMERAL,
			clientCreateCallback,
			null);
}
AsyncCallback.StringCallback clientCreateCallback = new AsyncCallback.StringCallback() {  
	public void processResult(int rc, String path, Object ctx, String name) {
		switch (KeeperException.Code.get(rc)) {
			case CONNECTIONLOSS:
				createClient();
				break;
			case OK:
			case NODEEXISTS:
				barrierExists();
				break;
			default:
				LOG.error("Something went wrong when adding client to barrier.",
						KeeperException.create(KeeperException.Code.get(rc), path));
		}
		LOG.info("Added " + serverId + " to the barrier.");
	}
};

6

If you run the Client.main() now, you should see that the new client was created.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.tutorial.Client@4c873330
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x1509a25c5300010, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.tutorial.Client - Processing event: WatchedEvent state:SyncConnected type:None path:null
sleeping for expiration...  
[main-EventThread] INFO zookeeper.tutorial.Client - added client to the barrier.

If you check the nodes that are currently in the zkCli.sh ( ls / ), then you will see a new /clients znode. If you list the clients znode ( ls /clients ), then you will see your new client.

Step 6 - Client check for "/barrier" znode

We have a barrier setup and we have a client. Our next part is to watch for the barrier to be removed.

1

Add a watcher for the "/barrier" znode by the client.

Watcher barrierExistsWatcher = new Watcher() {  
	public void process(WatchedEvent e) {
		if (e.getType() == Event.EventType.NodeDeleted) {
			// take some action
			LOG.info("barrier removed - client taking action"); // log the action
			createClient(); // create a new client to wait for the next barrier
		}
	}
};

We are making things simple by logging the event that the barrier is removed, and then adding a new client.

2

Add a callback object to handle the possibility for the connection loss to the ZooKeeper host.

AsyncCallback.StatCallback barrierExistsCallback = new AsyncCallback.StatCallback() {  
	public void processResult(int rc, String path, Object ctx, Stat stat) {
		switch (KeeperException.Code.get(rc)) {
			case CONNECTIONLOSS:
				barrierExists();
				break;
		}
	}
};

3

Create the "exists" watcher on the ZooKeeper host.

public void barrierExists() {  
    zk.exists("/barrier",
	    barrierExistsWatcher,
	    barrierExistsCallback,
	    null);
}

4

If you run Client.main() while Barrier.main() is running you should see logs like the following.

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.99.100:2181 sessionTimeout=15000 watcher=zookeeper.tutorial.Client@18e8568
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.99.100/192.168.99.100:2181. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.99.100/192.168.99.100:2181, initiating session
sleeping for connection...  
[main-SendThread(192.168.99.100:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.99.100/192.168.99.100:2181, sessionid = 0x1509a25c5300022, negotiated timeout = 15000
[main-EventThread] INFO zookeeper.tutorial.Client - Processing event: WatchedEvent state:SyncConnected type:None path:null
sleeping for expiration...  
[main-EventThread] INFO zookeeper.tutorial.Client - added client to the barrier.
[main-EventThread] INFO zookeeper.tutorial.Client - barrier removed - client taking action
sleeping for expiration...  

Final

With these two classes you can run a Barrier.main() and run X number of 'Client.main()' as you want.

From zkCli.sh:

ls / # see all of the znodes registered ls /clients # see all of the clients that are waiting for the barrier to be removed delete /barrier # remove the barrier and watch the logs for each client to see the clients take action and then reregister a new client





About List