AWS Products Used: | Amazon EC2, Amazon S3 |
---|---|
Language(s): | Java |
Date Published: | 2007-07-18 |
By Tom White
Managing large datasets is hard; running computations on large datasets is even harder. On
Apache's Hadoop project aims to solve these problems by providing a framework for running large da
Apache Hadoop, a sub-project of the well-known Lucene text search library, provides several components for building distributed applications. For this article we shall focus on the MapReduce component, which provides a simple but powerful programming model for writing parallel programs by defining how the processing can be split into small fragments of work.
The MapReduce concept (and name) comes from Google, which is described in an excellent paper by Jeffrey Dean and Sanjay Ghemawat, which is well worth reading. Google's MapReduce implementation, while extensively used inside the company, is obviously not available for general use. A goal of the Hadoop project is to provide an open source implementation of MapReduce that anyone can run on their own cluster, or on rented hardware, such as an Amazon EC2 cluster. While the Hadoop implementation is similar to that described in the Dean and Ghemawat paper, it is worth noting that there are differences in design and nomenclature.
For our example we are going to write a program that takes web server access log files (as produced by an Apache Web Server, for example) and counts the number of hits in each minute slot over a week. We will analyze months of logs and plot the distribution in order to get a view of how traffic volumes tend to vary over the course of a week. The beauty of this approach is that the same program will scale to months or years of massive logs, simply by increasing the cluster size.
The best way to think about MapReduce programs is to think about the input and output of each phase: the Map phase and the Reduce phase. Each phase has key-value pairs as input, and key-value pairs as output. The types and number of records of the input and output may be different, although the Map output types must be the same as the Reduce input types. Let's see how to choose the types for our MapReduce program
The input to the Map phase is the access log files, and we use an input format that gives us key-value pairs which are the character offset within the access log (which we ignore) and the corresponding line. Our Map function takes a log line, pulls out the timestamp field for when the server finished processing the request, converts it into a minute-in-week slot, then writes out a (<minute-in-week slot>, <1>) key-value pair. We are mapping each line in the access log to its minute-in-week slot.
The Reduce is given <minute-in-week slot> keys and an iterator over all the values for the key that were produced by the maps. So all we have to do is sum the values as we iterate over them, in order to produce a final output which are (<minute-in-week slot>, <total hits>) key-value pairs. Hadoop's MapReduce infrastructure is actually doing a lot behind the scenes to make this work, and to make it reliable in the face of hardware failure. It even sorts the output of the Reduce phase by key, so we have exactly what we need to plot our graph. (We won't use Hadoop to plot our graph for us, but more of that later.)
To illustrate the way this all works, take the following three-line access log:
192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET / HTTP/1.1" 200 1722
192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET /styles/layout.css HTTP/1.1" 200 2187
192.168.0.5 - - [22/Aug/2005:22:08:00 +0000] "GET /projects.html HTTP/1.1" 200 4024
Our Map takes each input line and produces the following output (note 22:07 on 22 August 2005 is the 1387th minute in the week, where the week starts on Monday):
<0, 192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET / HTTP/1.1" 200 1722> -> <1387, 1>
<71, 192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET /styles/layout.css HTTP/1.1" 200 2187> -> <1387, 1>
<159, 192.168.0.5 - - [22/Aug/2005:22:08:00 +0000] "GET /projects.html HTTP/1.1" 200 40247> -> <1388, 1>
Our Reduce then adds up the 1 values emitted by the Map, to produce totals:
<1387, (1, 1)> -> <1387, 2>
<1388, (1)> -> <1388, 1>
The Reduce output is written out as a tab-separated file. It says there were two requests in the 1387th minute-in-week slot, and on
1387 2
1388 1
Let's translate the above ideas into Java co
public class AccessLogFileAnalyzer {
//...
}
It contains our Map and Reduce inner classes and a main method to launch the job. Here's the Map class:
public static class MapClass extends MapReduceBase implements Mapper {
private final static LongWritable ONE = new LongWritable(1);
private static Pattern p = Pattern
.compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
" ([^ ]*) ([^ ]*).*");
private static DateTimeFormatter formatter = DateTimeFormat
.forPattern("dd/MMM/yyyy:HH:mm:ss Z");
private IntWritable minute = new IntWritable();
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {
String line = ((Text) value).toString();
Matcher matcher = p.matcher(line);
if (matcher.matches()) {
String timestamp = matcher.group(4);
minute.set(getMinuteBucket(timestamp));
output.collect(minute, ONE);
}
}
private int getMinuteBucket(String timestamp) {
DateTime dt = formatter.parseDateTime(timestamp);
return dt.getMinuteOfDay() + (dt.getDayOfWeek() - 1)
* DateTimeConstants.MINUTES_PER_DAY;
}
}
The interesting work is done by the
The Reduce co
public static class ReduceClass extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += ((LongWritable) values.next()).get();
}
output.collect(key, new LongWritable(sum));
}
}
We simply iterate through the values we are passed, which are the same types as the Map output values (
Hadoop actually comes with a library of stock maps and reducers, and in this case we could have used
The final piece of co
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err
.println("Usage: AccessLogFileAnalyzer <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(AccessLogFileAnalyzer.class);
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(ReduceClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setNumReduceTasks(1);
JobClient.runJob(conf);
}
A
Having set the input and output configuration parameters, we specify the Map and Reduce classes to use. We also set the Combiner class. A Combiner is just a Reduce task that runs in the same process as the Map task after the Map task has finished. The reason this is often a good idea is to greatly reduce the amount of da
The last job configuration setting concerns the number of maps and reduces. Selecting good values for these is something of an art, but for this application on
The final line of the program does all the work of submitting the job and waiting for it to finish.
Before letting our new program loose on a large cluster of machines it is a good idea to check it works on a small amount of test da
We run
0 6
1 15
2 30
3 22
4 20
5 25
...
10075 6
10076 10
10077 11
10078 4
10079 4
This looks good. So let's try analyzing a big dataset.
So far we have on
We have several options about how to get the da
Note that files stored on S3 are not subject to the usual 5GB limitation since there is a special Hadoop filesystem that breaks files into blocks so we can store arbitrarily large files on S3.
We'll illustrate option 3 for the purposes of this article. And while a full discussion of the pros and cons of each option is beyond the scope of this article, it's worth mentioning that S3 scores well as a persistent store for the log files (it can double as a back up) and it is easy enough to copy them from the web server after they have been rotated at the end of the day. Transfers between S3 and EC2 are free, so we can launch our EC2 Hadoop cluster, transfer the files to it, run our job, then shut down the cluster. This means we on
Hadoop comes with tools to move files between different filesystems. To copy files from a locally mounted filesystem you would install Hadoop locally, then run a command like:
bin/hadoop fs -put /path/to/source /path/to/target
The target path is actually a URI. To copy files to S3 we use the
bin/hadoop fs -put /path/to/source s3://<ID>:<SECRET>@<BUCKET>/path/to/target
where
How do we get our MapReduce application to the cluster? Simple, we package it in a jar along with its dependencies. It's a bit like a WAR file, except dependent jars go in a lib subdirectory, rather than in WEB-INF/lib. Here's the relevant Ant task for our application:
<jar jarfile="${build.dir}/aws-job.jar">
<fileset dir="${build.classes}"/>
<fileset dir="${basedir}" includes="lib/" excludes="**/hadoop*.jar"/>
<manifest>
<attribute name="Main-Class"
value="org/tiling/hadoop_aws_article/AccessLogFileAnalyzer"/>
</manifest>
</jar>
Hadoop is packaged as a public EC2 image (an AMI) so it is easy for us to get up and running with a cluster. If the version of Hadoop you want to run is not available - for example, if you want to run a patched version - then it is easy to build your own.
Here we'll just use a stock Hadoop AMI. We can find which versions of Hadoop are available as AMIs by using the Amazon EC2 tools. (Version 0.13.0 was used for this article.)
ec2-describe-images -a | grep hadoop-ec2-images
While it is possible to use the EC2 tools to launch Hadoop instances, Hadoop comes with a set of scripts that make the job of launching a cluster much easier. The scripts come with the standard Hadoop distribution, so start by downloading and unpacking the latest version of Hadoop on your local workstation. (The latest nightly build is recommended since it fixes a bug in the scripts that at the time of writing is not in an official released version). Then edit the EC2 configuration in src/contrib/ec2/bin/hadoop-ec2-env.sh to specify your Amazon Web Service settings, the Hadoop version to run on the cluster (which does not have to match the version of the distribution we unpacked on our workstation), the hostname for the master, and the size of the cluster.
The hostname you select should be on
How big should the cluster be? It depends on the number of maps and reduces, but in most cases make it as big as you can. By default EC2 users are limited to 20 instances, so this is a natural starting point.
With the configuration out of the way (and see the Running Hadoop on EC2 wiki page if you need more pointers) we're ready to go. Here's what to type:
bin/hadoop-ec2 run
This command does the following:
It's also possible to run these commands on
The next step is to copy our da
cd /usr/local/hadoop-<version>
bin/hadoop fs -mkdir logs
bin/hadoop distcp s3://<ID>:<SECRET>@<BUCKET>/path/to/logs logs
We also need to copy our job jar from our workstation by running (from our workstation):
. bin/hadoop-ec2-env.sh
scp $SSH_OPTS /path/to/aws-job.jar root@$MASTER_HOST:
We are ready to run the job at last. From the master node:
bin/hadoop jar ~/aws-job.jar logs out
The command line output will periodically report progress. However, it is worth using the web interface to monitor the job, since it gives more information and allows you to drill down into log files running on the various nodes, which can be invaluable in the event of failures. The interface is available at
The final summary output is shown in the table below. The system maintains counts of input and output da
It took about 5 minutes to transfer the da
Counter | Map | Reduce | Total | |
---|---|---|---|---|
Map-Reduce Framework | Map input records | 449,662,417 | 0 | 449,662,417 |
Map output records | 449,661,579 | 0 | 449,661,579 | |
Map input bytes | 105,793,389,172 | 0 | 105,793,389,172 | |
Map output bytes | 5,395,938,948 | 0 | 5,395,938,948 | |
Combine input records | 449,661,579 | 0 | 449,661,579 | |
Combine output records | 60,730 | 0 | 60,730 | |
Reduce input groups | 0 | 10,080 | 10,080 | |
Reduce input records | 0 | 60,730 | 60,730 | |
Reduce output records | 0 | 10,080 | 10,080 |
The output from the job is very small, so we can easily copy it to our workstation then run a simple R script to produce a graph of traffic over the week.
png("web_hits_over_week.png")
data <- read.table("part-00000")
plot(data, axes=FALSE, ann=FALSE, type="p", pch=".")
lines(c(1440*7,0), c(0, 0), col="gray")
for (i in 0:7) {
lines(c(1440,1440)*i, c(0, max(data)), col="gray")
}
dev.off()
We've run our job and we're happy with the results (which we've safely copied from the cluster), so we can shut down the cluster. From our workstation:
bin/hadoop-ec2 terminate-cluster
MapReduce is a great programming model for large scale da
Tom White is a committer on the Apache Hadoop project. He is the primary author of Hadoop's Amazon EC2 and S3 integration.
Related Documents |
|
Discussion | | |
The 5 most recent discussion messages. View full discussion.
|
|
|
评论