ZooKeeper part 2: building highly available applications, Ceilometer central agent unleashed

Datetime:2016-08-23 00:34:19          Topic: ZooKeeper           Share

The Ceilometer project is in charge of collecting various measurements from the whole OpenStack infrastructure, including the bare metal and virtual level. For instance, we can see the number of virtual machines running or the number of storage volumes.

One of the Ceilometer components is the central agent. Basically it polls the other resources in order to get some measurements. As the name suggests, it’s central and thus it implies some obvious drawbacks in term of reliability and scalability.

In this article we will develop an application which mimic the central agent and then we will study how to improve it with ZooKeeper.

You can download all the samples from here:

$ git clone https://github.com/ylamgarchal/zksamples.git

The breakable architecture

Historically, the central agent looked like something like that:

The architecture was pretty simple, it’s only one agent which polls periodically the resources to get the measurements of the OpenStack infrastructure. A resource is an OpenStack component – for instance it could be a compute node – exposing an API used to retrieve the information of interest.

The two obvious drawbacks of this architecture are:

  • We have a single point of failure because if the central agent fails then we cannot retrieve the measurements anymore.
  • We have a bottleneck because it is working alone and if the amount of resources increase dramatically then the polling mechanism will slow down.

Let’s implement this behavior in Python, the code below mimic a central agent:

# -*- coding: utf-8 -*-
import time

# The number of resources to create.
N = 6

# The resources of the Openstack infrastructure referenced by their names.
os_resources = ["resource %s" % i for i in xrange(0, N)]

class CentralAgent(object):
    """Mimic the Ceilometer central agent."""

    def poll_resource(self, resource):
        """The function in charge to poll a resource and save the result somewhere."""
        print("Send poll request to '%s'" % resource)

    def start(self):
        """Main loop for sending periodically the poll requests."""
        while True:
            for resource in os_resources:
                self.poll_resource(resource)
        time.sleep(3)

if __name__ == '__main__':
    central_agent = CentralAgent()
    central_agent.start()

The code is composed of a set of resources identified by their names and a main loop which send periodically a poll request to each resources. For the sake of simplicity, the networking stuffs with the remote resources will be dropped so that to focus on the central agent improvement.

It’s worth noting that when the performance is not an issue then we can still make the central agent highly available – without anything to develop – by using a cluster manager like Pacemaker or KeepAlived . This tool will manage a cluster of machines and monitor the agent. When the agent or the machine fails then it’s detected and it will restart the agent on another machine.

This architecture result to an active-passive cluster because at one time there is only one central agent running. Here is the documentation to have such a setup with Ceilometer.

The improved architecture

The improved architecture should remove the two drawbacks, thus we want not only one agent running but several ones which cooperate together as a team, let’s call it the Central team :-).

In order to remove the single point of failure and the bottleneck, the Central team should be composed of several agents. Each agent is assigned to a set of resources to poll so that two agents polls two distinct set of resources.

Having an agent polling a unique set of resources is a requirement because in the case of Ceilometer we don’t want to retrieve and store the same result several times in the database.So far, so good, how the agents will cooperate ? Well, in order to implement the coordination we must answer two questions:

  • What happen if an agent leave (gracefully or from a crash) or join the Central team ?
  • How to make sure that an agent poll a unique set of resources ?

We want to have a dynamic Central team which reacts when a new member joins the team or when a member leaves the team.

More precisely, when a new member joins the team then a set of resources must be assigned to him so that each agent have the same number of resources to poll. It means that the other agents should “give” some resources to poll to the new one.

In return, when an agent leaves the team then the other should share its set of resources to poll. The idea here is that in any way the agents have the same amount of resources to poll.

Okay, it sounds cool but how an agent is notified that a member joined or left the team ? This is where ZooKeeper comes into play �� !

Dynamic central team membership with ZooKeeper

Thanks to ZooKeeper we will be able to detect a new member or a left member. The idea is to create a znode which represent the team, like “/central_team” and each agent will join the team by creating an ephemeral znode under that znode.

If you forgot what is an ephemeral znode then go read the part 1 of that article !

Having each agent creating an ephemeral znode is not sufficient. They must listen to the events of their parent znode “/central_team” so that when an agent join the team by creating its entry under “/central_team” ZooKeeper will notify the others.

When an agent leaves the team it just has to remove its znode, if the agent crashes then ZooKeeper will detect it (because we used an ephemeral znode ;-)) and remove its entry.

In all cases, when the number of znode under “/central_team” changes then the whole team is notified.

Let’s see how to implement it on top of our little central agent:

# -*- coding: utf-8 -*-
import functools
import time
from kazoo import client as kz_client
import uuid

# The number of resources to create.
N = 6

# The resources of the Openstack infrastructure referenced by their names.
os_resources = ["resource %s" % i for i in xrange(0, N)]

class CentralAgent(object):
    """Mimic the improved Ceilometer central agent."""

    def __init__(self):
        self._my_client = kz_client.KazooClient(hosts='127.0.0.1:2181',
                                                timeout=5)
        self._my_client.add_listener(CentralAgent.my_listener)
        self._my_resources = []
        self._my_id = str(uuid.uuid4())
        print("Agent id: %s" % self._my_id)

    @staticmethod
    def my_listener(state):
        """Print a message when the client is connected to the ZK server."""
        if state == kz_client.KazooState.CONNECTED:
            print("Client connected !")

    def poll_resource(self, resource):
        """The function in charge to poll a resource and save the result somewhere."""
        print("Send poll request to '%s'" % resource)

    def _get_my_resources(self, children):
        return os_resources

    def _my_watcher(self, event):
        """Kazoo watcher for membership events."""
        if event.type == 'CHILD':
            my_watcher = functools.partial(self._my_watcher)
            children = self._my_client.get_children("/central_team", watch=my_watcher)
            print("Central team members: %s" % children)

    def _setup(self):
        """Ensure the central team group is created."""
        self._my_client.start(timeout=5)
        # Ensure that the "/central_team" znode is created.
        self._my_client.ensure_path("/central_team")

    def start(self):
        """Main loop for sending periodically the poll requests."""
        self._setup()
        self._my_client.create("/central_team/%s" % self._my_id, ephemeral=True)
        my_watcher = functools.partial(self._my_watcher)
        children = self._my_client.get_children("/central_team", watch=my_watcher)
        print("Central team members: %s" % children)
        self._my_resources = self._get_my_resources(children)
        print("My resources: %s" % self._my_resources)

        while True:
            for resource in self._my_resources:
                self.poll_resource(resource)
            time.sleep(3)

if __name__ == '__main__':
    central_agent = CentralAgent()
    central_agent.start()

The code is pretty straightforward, since we have a set of agents now we need to identify them so we added a unique identifier per agent.

The function _setup() is in charge of starting the Kazoo client and creating the “/central_team” znode. Before polling the resources each agent creates its ephemeral znode under “/central_team”.

Afterward, the agent retrieves the znodes under “/central_team” in order to get the current  members of the team, at the same time it sets a watcher (the method _my_watcher()) on “/central_team” in order to be notified when an event occur.

It is worth noting that when the watcher is executed, we must set the watcher again on “/central_team” because watchers are one time triggered by ZooKeeper. The best place to do it is in the watcher itself ;-).

Here is an example of the execution of two agents when agent 1 is run before agent 2:

$ python agent_step2.py
Agent id: fe439a7a-b371-4818-92e2-4fdb75cf9d02
Client connected !
Central team members: [u'fe439a7a-b371-4818-92e2-4fdb75cf9d02']
My resources: ['resource 0', 'resource 1', 'resource 2', 'resource 3', 'resource 4', 'resource 5']
Send poll request to 'resource 0'
Send poll request to 'resource 1'
Send poll request to 'resource 2'
Send poll request to 'resource 3'
Send poll request to 'resource 4'
Send poll request to 'resource 5'
Central team members: [u'ad86afbb-1720-4229-8e27-0811a9c70890', u'fe439a7a-b371-4818-92e2-4fdb75cf9d02']
$ python agent_step2.py
Agent id: ad86afbb-1720-4229-8e27-0811a9c70890
Client connected !
Central team members: [u'ad86afbb-1720-4229-8e27-0811a9c70890', u'fe439a7a-b371-4818-92e2-4fdb75cf9d02']
My resources: ['resource 0', 'resource 1', 'resource 2', 'resource 3', 'resource 4', 'resource 5']
Send poll request to 'resource 0'
Send poll request to 'resource 1'
Send poll request to 'resource 2'
Send poll request to 'resource 3'
Send poll request to 'resource 4'
Send poll request to 'resource 5'

We can see that agent 1 is polling the whole set of resources and then it received a notification when agent 2 joined the team. I suggest you to do some tests with joined agents and left agents to see how it works.

Let’s recap where we are there, so we have a team of agents that are notified when a member join or leave the team. We can see that the agent 2 polls the whole set of resources which is problematic because we want each agent to poll a distinct set of resources. This is the last issue we need to fix

!

Dynamic resources partitioning

There is two possible solutions to assign a unique set of resources to each agents:

  • Thanks to ZooKeeper we can elect a special agent from the team to be the leader and then it will be in charge in assigning resources to the others.
  • Or we can use a consistent hashing algorithm on the agent side…

In this article we will implement the second solution because this is what has been done in Ceilometer, the first solution is left as an exercise for the reader �� !

Using a consistent hashing algorithm for assigning resources to the agents is an elegant solution because given the team member list each agent can independently retrieve its unique set of resources. Explaining consistent hashing is beyond of this article but you can take a look at this explanation .

The basic idea is to hash the id of each resources and the id of each agents. Depending on the hashes we can assign a resource to an agent.

Let’s see how it works in Python, here is the added lines:

# The resources of the Openstack infrastructure referenced by their names.
os_resources = ["resource %s" % i for i in xrange(0, N)]
os_resources_hash = {}

for resource in os_resources:
    md5_sum = hashlib.md5()
    md5_sum.update(resource)
    os_resources_hash[resource] = md5_sum.hexdigest()
…

class CentralAgent(object):
…
    @staticmethod
    def _get_ring(members):
        ring = {}
        for member in members:
            md5_sum = hashlib.md5()
            md5_sum.update(member)
            ring[md5_sum.hexdigest().encode()] = member
        return ring

    def _get_my_resources(self, children):
        ring = CentralAgent._get_ring(children)
        hash_members = ring.keys()
        hash_members.sort()

        my_resources = []
        for resource in os_resources:
            hash_resource = os_resources_hash[resource]
            member_index = bisect.bisect(hash_members, hash_resource) % len(hash_members)
            hash_member = hash_members[member_index]
            member = ring[hash_member]

            if member == self._my_id:
                my_resources.append(resource)
        return my_resources

    def _my_watcher(self, event):
    """Kazoo watcher for membership events."""
        if event.type == 'CHILD':
            my_watcher = functools.partial(self._my_watcher)
            children = self._my_client.get_children("/central_team", watch=my_watcher)
            print("Central team members: %s" % children)
            self._my_resources = self._get_my_resources(children)
            print("My resources: %s" % self._my_resources)

The most interesting part is the method _get_my_resources() which returns the resources that are assigned to the current agent. It’s a little bit tricky if you don’t know about consistent hashing but with some aspirins it will be clear �� !

Let’s see the how it runs for two agents and six resources:

$ python agent_step3.py
Agent id: 898c4bc5-d56f-4a47-a3fa-70499a669078
Client connected !
Central team members: [u'898c4bc5-d56f-4a47-a3fa-70499a669078']
My resources: ['resource 0', 'resource 1', 'resource 2', 'resource 3', 'resource 4', 'resource 5']
Send poll request to 'resource 0'
Send poll request to 'resource 1'
Send poll request to 'resource 2'
Send poll request to 'resource 3'
Send poll request to 'resource 4'
Send poll request to 'resource 5'
Central team members: [u'898c4bc5-d56f-4a47-a3fa-70499a669078', u'e929a9dd-fbbe-4b93-9e5d-ee46fa1a517c']
My resources: ['resource 1', 'resource 2']
Send poll request to 'resource 1'
Send poll request to 'resource 2'
$ python agent_step3.py
Agent id: e929a9dd-fbbe-4b93-9e5d-ee46fa1a517c
Client connected !
Central team members: [u'898c4bc5-d56f-4a47-a3fa-70499a669078', u'e929a9dd-fbbe-4b93-9e5d-ee46fa1a517c']
My resources: ['resource 0', 'resource 3', 'resource 4', 'resource 5']
Send poll request to 'resource 0'
Send poll request to 'resource 3'
Send poll request to 'resource 4'
Send poll request to 'resource 5'

We can see that the agent 1 is first assigned to the whole set of resources. When agent 2 join the team then we can see he automatically get a unique set of resources. You can do some tests with more resources and more agents to see what happens when an agent leave or join the team.

Thanks to the consistent hashing algorithm the resource partitions will be nearly fair between the agents.

Conclusion

If we sum up what has been done we can say that we leveraged ZooKeeper to establish group membership between the set of agents and then get the ability to react when an event happen. We also leveraged the consistent hashing algorithm in combination with ZooKeeper to partition the resources among the agents.

In this way the Ceilometer central agent moved from a weak architecture to a highly available and scalable one. As i said in the previous article, the real code of Ceilometer use the Tooz API but conceptually it acts in a similar manner as our little central agent (here is the real patch https://review.openstack.org/#/c/113549).

I hope you enjoyed this adventure with ZooKeeper ! As an exercise you can implement some real resources, use ZooKeeper to detect events (join, left or failure) so that the agents can adjust their set of assigned resources dynamically.





About List