Skip to content

Distributed Applications with Norbert

Distributed Applications at LinkedIn

Here at LinkedIn we need to build a variety of services that can horizontally scale.  Two of the common reasons we need to horizontally scale is because we need either more memory or more CPUs (and sometimes both) than we can fit into a single machine.  The solution to this problem is often simple, one cluster of computers all running the same service to do the processing and another cluster of memcached instances to store the data.  Or, maybe instead of memcached, you shard your data across multiple database servers.  Add a basic round robin load balancer and you’re done!

The problem with this approach is now your data and the processing occur in two different systems.  That means every incoming request needs to go to memcached or the database, retrieve the data and then perform the processing.  When retrieving the data takes too long, or when you need to perform some complex transformation of that data before you can start processing, you have a problem.  To solve this problem you decide to cache the data in the same machine processing that data.  You grab something like ehcache or maybe build your own solution and have your in process cache.

But now your round robin load balancer doesn’t work.  If you just send requests randomly to different machines your local cache won’t be used.  Or, even worse, you’ll end up trying to cache all the data on every instance.  What you really want is a load balancer that knows what instances of the service are available to server traffic and what data they have locally available.

This is the exact situation we ended up with here in our search engine.  Our search index is simply too large for a single machine to execute a query fast enough.  So you split the index into multiple smaller indexes and have multiple searchers to execute queries against those sub-indexes.  To satisfy a request a broker, in parallel, has each searcher execute the query and then combines the sub-result sets into the final results.  In order to do it’s job the broker needs to know all of the searchers and which sub-index they have on them.  Keeping track of this information is commonly known as Group Management and we wrote a generic library call Norbert which provides the functionality.

Group Management with Norbert

Group management can mean different things to different people so for our purposes we’ll say that such a system at least:

  1. Maintains a record of all the servers in the group
  2. Allows servers to come and go dynamically
  3. Notifies clients and other group members when a server comes or goes

Several applications in the SNA team here at LinkedIn needed group management so we extracted that functionality from our horizontally distributed social graph engine, generalized it and packaged it up as our open source Norbert project.  In the rest of this article I’m going to describe how Norbert defines a group (or cluster) and how to use the ClusterClient interface to add group management to your own application.

Centrally Managed Cluster Metadata

First you can (optionally) specify partition ids.  Partition ids can be used to create “virtual” partitions, partitions that don’t exist by themselves but are co-located with other partitions.  Some consistent hash strategies use virtual partitions to more evenly balance data or load in a system, however you do not need to define partition ids if they are not useful for your application.  Whether you use partitions ids or not the next higher entity in a Norbert cluster is a Node.  A Node is defined by three pieces of data:

  1. An arbitrary integer id.  Norbert doesn’t place any specific meaning on this id so you can allocate them to Nodes in whatever way makes the most sense for your application or deployment strategy.
  2. A string, which is called “url.”  Norbert’s group management features don’t place any specific meaning on this string but, as it’s name suggests, it’s meant to be used to locate and communicate with Nodes over a network.  It’s format will likely be dictated by the technology you use for client/server communication.
  3. The partition ids assigned to the Node, if you are using partition ids.

Finally you specify the name for the set of Nodes that make up your cluster.  Norbert persistently stores all of the metadata for the Nodes within a given cluster centrally in ZooKeeper.

Node Availability

If the metadata about the Nodes is persistently stored in ZooKeeper how do you know what subset of the Nodes is currently available to service requests?  A second tree of ephemeral znodes is kept in ZooKeeper which tracks the availability of each individual Node in the cluster.  The availability data, rather than being centrally managed by a system administrator, is handled directly by your software. Once your application has finished starting up you tell Norbert that you’re ready to handle requests.  So why not just have the application specify both the Node metadata and the Node availability in a single ephemeral znode in ZooKeeper?  The reason is to avoid a rogue service on your network from causing problems.  Perhaps you are in the middle of setting up a new system and accidentally start your application.  You likely don’t want it to start receiving traffic yet, and this mechanism prevents that from happening.  A Node isn’t considered online and ready to handle traffic until both it’s metadata is defined and it’s available znode created.

The ClusterClient API

To add group management to your application using Norbert requires interacting with a single interface called ClusterClient.  There are currently two implementations of ClusterClient, an in memory only client for testing purposes and a client which uses ZooKeeper.  Once you’ve instantiated your ClusterClient you will find APIs to create nodes, delete nodes, mark a particular node available or unavailable, etc.  But there is one specific API that I’d like to focus on:

ClusterListenerKey addListener(ClusterListener listener);

The ClusterListener interface is defined as:

public interface ClusterListener {
/**
* Handle the case that you are now connected to the cluster.
*
* @param nodes the current list of available Nodes stored in the cluster metadata
*/
void handleClusterConnected(Set nodes);

/**
* Handle the case that the cluster topology has changed.
*
* @param nodes the current list of availableNodes stored in the cluster metadata
*/
void handleClusterNodesChanged(Set nodes);

/**
* Handle the case that the cluster is now disconnected.
*/
void handleClusterDisconnected();

/**
* Handle the case that the cluster is now shutdown.
*/
void handleClusterShutdown();
}

Every registered ClusterListener is notified by Norbert whenever the group membership changes, you are connected or disconnected from ZooKeeper and when the ClusterClient is shut down.  And that is all you really need to do to add group management capabilities to your application using Norbert, implement a ClusterListener and have it update your application’s routing tables whenever there are membership changes.