Skip to content

Tech Talk: Chris Douglas (Yahoo!) — “Next Generation Hadoop MapReduce”

Next Generation Hadoop MapReduce
Chris Douglas (Yahoo!)
Monday, March 7, 2011

ABSTRACT

The Apache Hadoop MapReduce framework has hit a scalability limit around 4,000 machines. We are developing the next generation of Apache Hadoop MapReduce that factors the framework into a generic resource scheduler and a per-job, user-defined component that manages the application execution. Since downtime is more expensive at scale high-availability is built-in from the beginning; as are security and multi-tenancy to support many users on the larger clusters. The new architecture will also increase innovation, agility and hardware utilization. For more information, see the Yahoo developer blog.

BIOGRAPHY

Chris Douglas has been a member of the Yahoo! Hadoop team since 2007. He has worked on teams focused on HDFS, performance and utilization, and MapReduce. [He is probably better known as the user login "chrisdo"; the one that shows up in the compiled by line in Apache Hadoop 0.20.2.]

Tagged

Tech Talk: Matei Zaharia (UC Berkeley) — “Spark: A Framework for Iterative and Interactive Cluster Computing”

Spark: A Framework for Iterative and Interactive Cluster Computing
Matei Zaharia (UC Berkeley)
Tuesday, February 8, 2011

ABSTRACT

Although the MapReduce programming model has been highly successful, it is not suitable for all applications. We present Spark, a framework optimized for one such type of applications – iterative jobs where a dataset is reused across multiple parallel operations, as is common in many machine learning and graph algorithms. Spark provides a functional programming model similar to MapReduce, but also lets users store datasets in memory between iterations, leading to up to 10x better performance than Hadoop. Spark also makes programming jobs easy by integrating into the Scala programming language. Finally, the ability of Spark to load a dataset into memory and query it repeatedly makes it especially suitable for interactive analysis of big datasets. We have modified the Scala interpreter to make it possible to use Spark interactively, providing a much more responsive experience than Hive and Pig (sub-second latency as opposed to tens of seconds for Hadoop).

BIOGRAPHY

Matei Zaharia is a fourth year graduate student at UC Berkeley. He works with professors Scott Shenker and Ion Stoica on topics in cloud computing, operating systems and networking. He is also a committer on the Apache Hadoop project. He got his undergraduate degree at the University of Waterloo in Canada.

Tagged

Tech Talk: Leo Neumeyer & Anish Nair (Yahoo!) — “S4: Distributed Stream Computing Platform”

S4: Distributed Stream Computing Platform
Leo Neumeyer & Anish Nair (Yahoo!)
Tuesday, January 25, 2011

ABSTRACT

S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data. Keyed data events are routed with affinity to Processing Elements (PEs), which consume the events, update state and optionally, produce event streams of their own. The architecture resembles the Actors model, providing semantics of encapsulation and location transparency, thus allowing applications to be massively concurrent while exposing a simple programming interface to application developers. In this talk, we outline the S4 architecture in detail, describe various applications, including real-life deployments. Our design is primarily driven by large scale applications for data mining and machine learning in a production environment. We show that the S4 design is surprisingly flexible and lends itself to run in large clusters built with commodity hardware.

BIOGRAPHY

Leo Neumeyer studied electrical and computer engineering in Argentina and Canada. In 1992 he joined the Speech Technology and Research lab at SRI International (formerly Stanford Research Institute), where he helped build one of the most advanced speech recognition systems that was commercialized by its spin off company, Nuance Communications. Leo did research in signal processing, speech recognition, and language learning technologies. In 1999, he co-founded Mindstech International, a startup that developed technology to teach spoken English in Asia over the Internet. In 2006 he joined Yahoo! Labs where he led the search advertising optimization sciences group. More recently he championed S4, an open source distributed stream computing software platform that was developed to model user feedback in real-time to improve search revenue and user experience. He published over 24 technical papers and 8 patents.

Anish Nair is an applied scientist at Yahoo! Labs, working mainly on prediction and optimization problems in search monetization. His areas of interest and prior experience are natural language processing, information retrieval, speech recognition, personalization, and of late, stream computing. He has published work in various areas: computational linguistics, distributed systems, psychometrics and cognitive science. Prior to Yahoo!, he developed algorithms for automatically evaluating people’s spoken ability, at Ordinate Corporation (now part of Pearson) and worked on various computational linguistics problems while a graduate student at USC. Anish‚Äôs current focus is envisioning and developing applications for S4, the stream computing platform.

Tagged

Build a distributed realtime tweet search system in no time. Part 2/2

Chirper is all about realtime indexing, so we wanted to highlight that on the frontend as much as possible, the search box performs the instant search as you type in the terms, and also show the number of tweets as they are indexed live on the system.
It was important to keep the frontend as simple as possible, keeping the frontend very thin and simple is not hard, it requires discipline to not add complex logic and operations, delegating all the complexity and scaling challenges to the backend and middleware services.

Ideas and decisions:

We want to ship the frontend with the backend code as one package with no extra setup, and minimal code (no “nonsense xml” servlet configurations).
We chose Scalatra, Scalatra is to scala what sinatra is for Ruby, it allows you to define the resources with very minimal code, for example:
get("/salute") {
"Hello World"
}

It also offers powerful templating features through Scalate, which allows you to render JSP, HAML, Mustache, etc.

Realtime updating on the page will require some Ajax and DOM manipulation.
Dealing with browser inconsistencies is a pain, JQuery makes it a breeze and also offers powerful ajax functions, it is also used as the glue for other libraries like backbone.js and mustache.js.

Javascript code tends to get messy and big over time, let’s use something that allows the frontend to be a good starting point and scale the code over time.
Backbone.js is a MVC pattern implementation for javascript apps, very minimal, from all the options out there Backbone seems to be the cleanest dont-try-to-do-everything option. While for this frontend is still a bit of an overkilll, it offers a good starting point for a larger project. Also enforces writing code in a specific style wich makes it easy to know where all the pieces are put together.

The backbone layer holds a model (Tweet), a collection of tweets (TweetList), and two views, one for the visual representation of a Tweet model (TweetView), and one for the left sidebar that contains the search box and list of results (AppView).

Do take a look at the commented code for the app.js and the index.ssp file to learn more about how everything works in more detail.

Some UI niceties like good looking buttons, realtime update of timestamps, and animation.
A nice touch would be to refresh the time associated with a tweet, for that we used a JQuery plugin called EasyDate. For the buttons and layout we used CSS3 properties that degrade fine to older browsers (and IE). For the images we used one sprited png file.

And that’s it!, it’s a very simple page, that in order to build fast we leveraged lots of great opensource software that is out there, check out the code and give it a try locally. It’s really simple.

Build a distributed realtime tweet search system in no time. Part 1/2

Last Friday was an InDay (A day at LinkedIn where you can take a break from your day-to-day work and build something cool) As a part of the SNA team, we have been building some really cool distributed systems, from storage, to messaging, to search. So we thought it’d be cool on this InDay to put it all together and see what we can come up with.

A challenge we gave ourselves was to see how quickly we can build a scalable Tweet search system. Some requirements:

  • The entire stack must be scalable, e.g. by adding  commodity hardware, it should theoretically handle all tweets with real internet traffic.
  • The part of the system should be able to scale differently, e.g. there is no reason the messaging system needs to scale the same way as the storage system.
  • Fault tolerant – The system should be able to tolerate system failures as well as data corruption.
  • Only 1 day to build it.

We scribbled together the following architecture:

Every component here is an open-source project we have deployed into our production environment with its own scaling characteristics.

Some design considerations and implementation details:

TweetStreamer is a twitter streamer using the twitter API written in Scala. We chose to use it because it is light-weight, simple and frankly, does exactly what we wanted. It can be easily configured to handle different types of twitter feeds.

We now have a data feed, one obvious thing to do is to store it. Voldemort seemed to be an obvious choice. Key = twitter id, value = tweet status (json string).

We also send the feed to a Kafka topic to be consumed by down-stream services, e.g. Sensei (our search system). You can find the code here.

The Sensei/search integration is more work: (code here), We need to setup of the following pieces:

  • How we would parse the query.
  • How to consume indexing events: e.g. extract from a tweet json string pieces of data we want to index. The search node consumes from Kafka, and builds an index-able object from a json string. In this case, we are only interested in 3 pieces of data: unique tweet id, the actual tweet text and the time (to be able to sort on): see (here)
  • Index data retention policy as we accumulate more and more tweets. For this case, we configured index retention to be 7 days, with the index rolling forward nightly. Here is a wiki on how this works under-the-hood.

The result set contains an array of search hits, each hit containing the tweet id, relevance score and the time. We then for each tweet id, get the original tweet status object from Voldemort and build the result json object. This is done in a Scalatra servlet: see code here.

This pretty much describes the system. Let’s understand some design decisions that were made:

  • One design choice was letting the process that writes to Voldemort also be a Kafka consumer. Although this would be cleaner, we would risk a data-race where search may return hit array before they are yet added to Voldemort. By making sure it is first added to Voldemort, we can rely on it being an authoritative storage for our tweets.
  • You may have already realized Kafka is acting as a proxy for twitter stream, and we could have also streamed tweets directly into the search systems, bypassing the Kafka layer. What we would be missing is the ability to play back tweet events from a specific check-point. One really nice feature about Kafka is that you can keep a consumption point to have data replayed. This makes reindexing for cases such as data corruption and schema changes, etc., possible. Furthermore, to scale search, we would have a growing number of search nodes consume from the same Kafka stream. Kafka is written in a way where adding consumers does not affect through-put of the system really helps in scaling the entire system.
  • Another important design decision was on using Voldemort for storage. One solution would be instead store tweets in the search index, e.g. Lucene stored fields. The benefits with this approach would be stronger consistency between search and store, and also the stored data would follow the retention policy of that’s defined by the search system. However, other than the fact that Lucene stored field is no-where near as optimal comparing to a Voldemort cluster (an implementation issue), there are more convincing reasons:
    • We can first see the consistency benefit for having search and store be together is negligible. Actually, if we follow our assumption of tweets being append-only and we always write to Voldemort first, we really wouldn’t have consistency issues. Yet, having data storage reside on the same search system would disproportionally introduce contention for IO bandwidth and OS cache, as data volume increases, search performance can be negatively impacted.
    • The point about retention is rather valid. Our decision ultimately came down to two points: 1) Voldemort’s growth factor is very different, e.g. adding new records into the system is much cheaper, so it is feasible to have a much longer data retention policy. (see retention setting for Voldemort stores) 2) Having have cluster of tweet storage allows us to integrate with other systems if desired for analytics, display etc.

We tossed up what we built on Github (Chirper Project). As you can see, the amount of code is rather minimal, and yes, we were able to build it in one day.

Here is a screenshot:

screen-shot-2011-02-21-at-94602-pm

chirper screenshot

As you can see, the UI is rather fancy. Read on for part 2 of 2 of this post where Alejandro will detail out the UI portion of this application.

Azkaban Flow UI

One of the most important aspects of a production offline processing system is a workflow scheduler. A workflow scheduler allows you to string together a group of processes to run in an order that respects the dependencies between the jobs (i.e. one processes output is another’s input). At LinkedIn we run a number of very large workflows made up of Hadoop jobs that do large data processing tasks. When assembled together one of these workflows might take on a very complex problem like predicting People You May Know, generating collaborative filtering matches for all the items on our social graph, or computing data about users and their skills to populate our new Skill Pages feature, or generating beautiful graph layouts for visualizing member’s professional networks.

To support this kind of production offline processing we built Azkaban, a workflow scheduler. In addition to dependency scheduling Azkaban handles various other tasks required to maintain production jobs such as maintaining historical logs for each job run, graphing runtime trends for jobs, locking resources, and sending email alerts for job failures or successes. Overall we think this kind of functionality is as important for batch programming as a good MVC framework is for web programming.

flowexample2

As our workflows grew in complexity, we found it difficult to keep track of the high-level shape of the flow. We provided a sort of expandable javascript tree view of the job workflow graph, but it did little to visualize the sequence of jobs that would run, or to get a quick view of which jobs had completed. To help support these very large graphs, we decided to try to build a more sophisticated view that would allow an easy visual picture of complex jobs.

azk-disable

Disabling a single job in a flow by right-clicking.

In the latest version of Azkaban, you may have noticed a cool new tool for viewing these flows. This view directly renders a real directed acyclic graph representation of your workflow in a web-friendly manner. You can enable/disable jobs by right-clicking nodes in the graph. Progress is displayed by graying out completed nodes, so the progression of execution can be viewed on the graph itself. For large flows, you can zoom in to get more details on a sub-portion of the graph, or zoom out to get a more structural view.

The two main challenges were first figuring out a way to draw graphs that was both browser-friendly and interactive and second laying out job graphs in an attractive intuitive manner.

Drawing Graphs in a Browser

We knew we wanted the workflow graph to be a real part of the UI not just an image–we wanted to be able to pan, zoom and be able to select and manipulate nodes. These are the following options we considered for implementing an interactive graph visualization:

  • HTML5 Canvas is now being supported in more and more browsers. Unfortunately it works with very low level primitive (basic shape) drawing. Changing the look of a node or arrow would’ve been difficult. We would need to constantly redraw the canvas as the view changes. The primitives also do not have DOM support, which makes node selection harder, nor are individual draw elements easily effected by CSS. It’s powerful, but also can be convoluted (this is coming from an OpenGL enthusiast). Essentially, we’d have to implement all basic functionality that is required for a graph view from scratch.
  • Adobe Flash was considered briefly. It’s well tested, and available on every browser except mobile devices. However, it’s not exactly the most open-source friendly technology. It also has limitation when it comes cross Flash/HTML page communication. I also prefer to work with javascript rather than action script.
  • SVG is what we ended up choosing. It gave a lot of basic shapes that were desirable:  rectangles, text, lines with the ability to apply arrow heads. With it’s implied scene-graph structure, it’s was very easy to implement panning and zoom. Each individual primitive was DOM manipulatable, and they were all CSS style-able. They also support many of the events that effect regular html documents. SVG is also supported out-of-box in all major browsers that aren’t named IE (IE 9 has built-in support. Prior IE’s need SVG plugins).

XHTML is required to embed svg directly into a webpage. When embedding, the <svg> tag is recognized by compatible browsers.

An Azkaban node starts out with a <g> element, which denotes a group. SVG provides a sort of hierarchical layout called a scene graph, that simplifies applying transformations to the entire group. This allows an easy way to build zoom and pan type features by simply applying a translation offset to move the graph up, down, left, or right or likewise giving a scale factor to zoom in and out.

Making Graphs Pretty

None of these drawing options would matter unless we can display the graph in an informative manner. Bad graph layouts may hinder the understanding of the graph rather than help.  We employed a simple layered graph technique that’s based around Sugiyama’s algorithm. We’ve found that even without additional enhancements, the basic algorithm for the layouts were acceptable. For the curious, a good overview of other graph layout algorithms can be found here.

After the graph layout algorithm

After layout

Graph before layout

Before layout

Here is how the actual layout works: first we define a top level set, which contains only nodes without any parents. Then each remaining node in the graph is assigned a level based on its distance from the top level.

Dummy nodes are then inserted at levels when adjacent nodes are separated by more than one level. For instance, if there exists a node at level 1 connected to a node at level 3, a dummy node is inserted at level 2 between these two nodes. These dummy nodes help in the alignment of the graph during the layout phase.
Next an “uncrossing algorithm” is then run on the graph to minimize the number of times edges cross. Level i is first fixed, and level i+1 is free to be repositioned. For each node in the free level, the average position of its neighbours (the so-called barrycenter) in the fixed level is used to position the node.  Afterwards, level i+1 is fixed, and level i+2 is then freed. This process is repeated until the last level is laid out.

There are many optimization that could be made on this algorithm for better results. The number dummy nodes that need to be added can reduce performance greatly. Several approaches such as using the network simplex method to reduce edge distances can help reduce these additional nodes.

We also found that if multiple nodes share the same barycenter value, it could result in ambiguous positioning that could result in additional edge crossing. However we discovered that just doing the naïve approach produced good enough results for most Azkaban graphs. Here’s the layout algorithm we used:

for each node n in G: AssignLevel(n)
AddDummyNodesToGraph(G)
for each level i =2... MaxNumLevels:
  for each node in level i:
    CalculateBaryCenter(node, level i-1)

To check out the new functionality and see graphs of your own workflows, download the new version of Azkaban here.

Tagged

Tech Talk: Kevin Murphy (UBC) — “Probabilistic models for density estimation, structural discovery and semi-supervised learning from video”

Probabilistic models for density estimation, structural discovery and semi-supervised learning from video
Kevin Murphy (University of British Columbia)
Monday, January 10, 2011

ABSTRACT

In this talk, I give an overview of three different projects which my group is working on, all of which involve probabilistic modeling of one form or another.

The first project is concerned with creating efficient density models for data of mixed type (categorical, real, ordinal, etc.), in the presence of missing data. Such data is particularly common in social science surveys, but arises in many other areas of data analysis, too. Our approach is based on mixtures of factor analysers, extended to handle exponential family response variables. Although this model is not new, we propose a new variational bound for the multinomial likelihood function, which allows us to efficiently fit the model to real and categorical data using variational EM. We show that this outperforms previous approaches (in terms of accuracy per unit of CPU time) based on MAP estimation, and a Bayesian approach using Hamiltonian MCMC. (For details, see “Variational bounds for mixed-data factor analysis”, M. Khan, B. Marlin, G. Bouchard, K. Murphy, NIPS 2010.)

The second project is also concerned with density modeling, but uses sparse graphical models rather than low-rank decompositions. The advantage of such models is that they are often more interpretable. We have made several contributions in this area, but in this talk I will focus on one in which we extended the graphical lasso algorithm for learning the structure of Gaussian graphical models to handle unknown grouping of the variables. The technique is based on a new variational bound to the group Laplace distribution. This can be used in a variational EM algorithm, where the E step estimates the group assignments, and the M step uses a standard convex solver using the known groups. (For details, see “Group sparse priors for covariance estimation”, B. Marlin, M. Schmidt, K. Murphy, UAI 2009.) I will also discuss some work we are currently doing to extend this approach to learn sparse _latent_ GGMs from categorical data, using the variational bound on the multinomial likelihood mentioned above.

Finally, I will briefly present some work in progress in which we use a graphical model to align play-by-play text data from sports videos (basketball, ice hockey, etc) to detections of players, which are tracked over time. This allows us to learn the identity of the players without any explicit supervision.

BIOGRAPHY

Kevin Murphy is an associate professor at the University of British Columbia in Vancouver, Canada, in the departments of computer science and statistics, which he joined in 2004. He holds a Canada research chair in machine learning/ computational statistics. He is currently (2010) on sabbatical in the Bay Area. Prior to coming to UBC, Kevin did a postdoc at MIT, his PhD at UC Berkeley, his MSc at U. Pennsylvania, and his BSc at U. Cambridge. Kevin is best known for his work in the area of Bayesian networks/ graphical models. He is interested in model selection in the N << D regime (when the number of data samples is much less than the number of variables), in semi-supervised learning (particularly with applications to computer vision), and in data fusion (particularly with applications to bioinformatics).

Tagged

Optimizing TCP Socket Across Data Centers

Recently, I had a real opportunity to work on machines across different data centers (DCs). The task is simple: we’d like to replicate data stored in Kafka, a messaging system developed at LinkedIn, from one DC to another. We measured the transfer throughput and it’s extremely low. Even though there is a 1Gb link between the two DCs, we can only replicate data at a rate of less than a few Mb/s. We immediately realized that this could be due to the high network latency between the DCs. In TCP, the window size controls how many bytes a socket sends before it has to wait for an acknowledgment from the receiver. When there is high latency between the client and the server, we need to make the window size large enough to amortize the overhead of each acknowledgement. The round-trip latency between the two DCs is about 80ms, which allows us to send as much as 1Gb * .08 / 8= 10MB. To better use the bandwidth, we set the TCP window size to 1MB, instead of the original 64KB. However, the throughput didn’t improve at all.

After some investigation, we realized that the problem was how we set the window size. The TCP protocol originally only allows the maximum window size to be 64KB. Subsequently, the need for a larger window size over high-latency networks was discovered. However, it’s too late to change the TCP header, where window size is specified. As a compromise, the TCP protocol is extended to support a window scaling factor, which allows the window size to be interpreted up to 214 times bigger. The window scaling factor is not part of the TCP header. Instead, it’s negotiated during the initial handshake when a TCP connection is established. Therefore, to set the TCP window size to a value larger than 64KB, one has to set it before the socket connection is established. Otherwise, it will be too late. The socket code in Kafka originally looks like the following:

channel = SocketChannel.open()
channel.connect(address)
channel.socket.setReceiveBufferSize(bufferSize)

Since setReceiveBufferSize() is called after the socket connection has been made, it has no effect on changing the socket window size. We then switched the order of the last two statements. Our replication throughput immediately increased by a factor of more than 10 times. Finally, we also added a call to channel.socket.getReceiveBufferSize() to verify the window size actually accepted by TCP.

Tech Talk: Benjamin Hindman (Berkeley) — “Mesos: Efficiently Sharing the Datacenter”

Mesos: Efficiently Sharing the Datacenter
Benjamin Hindman (Berkeley)
Monday, November 8, 2010

ABSTRACT

More and more applications are being developed to run primarily in datacenters. Prominent examples include web applications (e.g., using Ruby on Rails) and analytics (e.g., using Hadoop) in addition to the numerous services (e.g., memcached) that support front ends and back ends alike. Most organizations decide how to allocate resources to each of these applications in an ad hoc and manual fashion, and for simplicity, often at the granularity of machines. The resulting allocations typically yield horrible system-wide utilization, and often fail to enable applications (and thus the organization) to benefit from elastic computing. In this talk I’ll present the design and implementation of a system we have created called Mesos. Mesos aims to provide basic primitives and an application programming interface that engineers can use to develop their datacenter applications. Mesos allocates resources to applications programmatically, and applications can adjust their resource consumption as their needs change. In addition to describing Mesos in this talk, I’ll provide some results of using the system, including a discussion of some preliminary deployments.

BIOGRAPHY

Benjamin Hindman is a 4th year graduate student at UC Berkeley primarily working on enabling efficient sharing of resources in parallel and distributed systems. He is currently spending a lot of time working with Twitter to deploy Mesos. In addition to building large parallel and distributed systems, his work includes language support for making programming such systems easier.

Tagged

Tech Talk: Nathan Marz — “Clojure at BackType”

Clojure at BackType
Nathan Marz (BackType)
Tuesday, October 26, 2010

ABSTRACT

Clojure has led to a significant reduction in complexity in BackType’s systems. BackType uses Clojure all over the backend, from processing data on Hadoop to a custom database to realtime workers. In this talk Nathan will give a crash course on Clojure and using it to build data-driven systems.

BIOGRAPHY

Nathan Marz is the Lead Engineer at BackType, a marketing intelligence company that collects and analyzes in real-time terabytes of data from Twitter, Facebook, social news sites, and millions of blogs. He is the author of Cascalog, an open source tool for processing data on Hadoop using Clojure. Nathan is a frequent speaker at events about Big Data, including Cloud Connect, the Hadoop Summit, Hadoop Day, and various user groups in the Bay Area. Nathan writes a blog at <http://nathanmarz.com>.

Tagged ,