http://wiki.apache.org/hadoop/WordCount
Sorting
Storing temperatures as
Text objects doesn’t work for sorting purposes, because signed integers don’t sort
lexicographically.1 Instead, we are going to store the data using sequence files whose
IntWritable keys represent the temperature (and sort correctly) and whose Text values
are the lines of data.
is a map-only job that also filters the input to
remove records that don’t have a valid temperature reading. Each map creates a single
block-compressed sequence file as output.
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
MapFileOutputFormat provides a pair of convenience static methods for performing
lookups against MapReduce output
IntWritable key = new IntWritable(Integer.parseInt(args[1]));
Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
Partitioner<IntWritable, Text> partitioner =
new HashPartitioner<IntWritable, Text>();
Text val = new Text();
Writable entry =
MapFileOutputFormat.getEntry(readers, partitioner, key, val);
The getReaders() method opens a MapFile.Reader for each of the output files created
by the MapReduce job. The getEntry() method then uses the partitioner to choose the
reader for the key and finds the value for that key by calling Reader’s get() method.
We can also use the readers directly to get all the records for a given key. The array of
readers that is returned is ordered by partition, so that the reader for a given key may
be found using the same partitioner that was used in the MapReduce job:
Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
Partitioner<IntWritable, Text> partitioner =
new HashPartitioner<IntWritable, Text>();
Text val = new Text();
Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
Writable entry = reader.get(key, val);
do {
parser.parse(val.toString());
} while(reader.next(nextKey, val) && key.equals(nextKey));
Total Sort: Keeping all output in sorted order
The job of the partitioner is to deterministically assign a reducer to each key
the default partitioner uses a hashing function to uniformly assign keys to reducers. This often works well in distributing work evenly across reducers.
If we have prior knowledge that the keys are approximately uniformly distributed, we can use a partitioner that assigns key ranges to each reducer and still be certain that the reducers’ loads are fairly balanced.
The naive answer is to use
a single partition.4 But this is incredibly inefficient for large files because one machine
has to process all of the output.
The secret to doing this is to use a partitioner that respects the total order of the output.
you have to choose your partition sizes carefully to ensure that they are fairly even.
Hadoop comes with a selection of samplers.
The InputSampler class defines a nested Sampler interface whose implementations
return a sample of keys given an InputFormat and Job.
writePartition File() static method on InputSampler is used, which creates a sequence file to store the
keys that define the partitions.
The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler =
new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
InputSampler.writePartitionFile(job, sampler);
// Add to DistributedCache
Configuration conf = job.getConfiguration();
String partitionFile =TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri = new URI(partitionFile + "#" +
TotalOrderPartitioner.DEFAULT_PATH);
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
We use a RandomSampler, which chooses keys with a uniform probability—here, 0.1.
There are also parameters for the maximum number of samples to take and the maximum
number of splits to sample (here, 10,000 and 10, respectively; these settings are
the defaults when InputSampler is run as an application), and the sampler stops when
the first of these limits is met. Samplers run on the client, making it important to limit
the number of splits that are downloaded so the sampler runs quickly.
SplitSampler, which
samples only the first n records in a split, is not so good for sorted data,5 because it
doesn’t select keys from throughout the split.
IntervalSampler chooses keys at regular intervals through the split
and makes a better choice for sorted data. RandomSampler is a good general-purpose
sampler.
Sorting using Hadoop – TotalOrderPartitioner
http://pipiper.wordpress.com/2013/05/02/sorting-using-hadoop-totalorderpartitioner/
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setSortComparatorClass(SortKeyComparator.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
Path inputDir = new Path(partitionLocation);
Path partitionFile = new Path(inputDir, "partitioning");
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
int numSamples = numReduceTasks;
int maxSplits = numReduceTasks - 1;
InputSampler.Sampler sampler = new InputSampler.RandomSampler(pcnt, numSamples, maxSplits);
InputSampler.writePartitionFile(job, sampler);
Sort users by last visit
http://my.safaribooksonline.com/book/software-engineering-and-development/patterns/9781449341954/4dot-data-organization-patterns/i_sect14_d1e4430_html
Secondary Sort
The MapReduce framework sorts the records by key before they reach the reducers.
For any particular key, however, the values are not sorted.
we change our keys to be composite: a combination of year and
temperature. We want the sort order for keys to be by year (ascending) and then by
temperature (descending).
A partitioner ensures only that one reducer receives all the records for a year.
• Make the key a composite of the natural key and the natural value.
• The sort comparator should order by the composite key, that is, the natural key
and natural value.
• The partitioner and grouping comparator for the composite key should consider
only the natural key for partitioning and grouping.
context.write(new IntPair(parser.getYearInt(),
parser.getAirTemperature()), NullWritable.get());
return Math.abs(key.getFirst() * 127) % numPartitions;
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
http://techbus.safaribooksonline.com/book/databases/9781608453429
Top Ten
This pattern utilizes both the mapper and the reducer. The mappers will find their local top K, then all of the individual top K sets will compete for the final top K in the reducer. Since the number of records coming out of the mappers is at most K and K is relatively small, we’ll only need one reducer.
class mapper:
setup():
initialize top ten sorted list
map(key, record):
insert record into top ten sorted list
if length of array is greater-than 10 then
truncate list to a length of 10
cleanup():
for record in top sorted ten list:
emit null,record
class reducer:
setup():
initialize top ten sorted list
reduce(key, records):
sort records
truncate records to top 10
for record in records:
emit record
Pig
Pig will have issues performing this query in any sort of optimal way. The most straightforward pattern is to mirror the SQL query, but the ordering is expensive just to find a few records. This is a situation in which you’ll find major gains in using Java MapReduce instead of Pig.
B = ORDER A BY col4 DESC;
C = LIMIT B 10;
Mapper code
private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();
repToRecordMap.put(Integer.parseInt(reputation), new Text(value));
// If we have more than ten records, remove the one with the lowest rep
// As this tree map is sorted in descending order, the user with
// the lowest reputation is the last key.
if (repToRecordMap.size() > 10) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
// Output our ten records to the reducers with a null key
for (Text t : repToRecordMap.values()) {
context.write(NullWritable.get(), t);
}
}
Reducer code
public void reduce(NullWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for (Text value : values) {
Map<String, String> parsed = transformXmlToMap(value.toString());
// If we have more than ten records, remove the one with the lowest rep
// As this tree map is sorted in descending order, the user with
// the lowest reputation is the last key.
if (repToRecordMap.size() > 10) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
for (Text t : repToRecordMap.descendingMap().values()) {
// Output our ten records to the file system with a null key
context.write(NullWritable.get(), t);
}
}
Distinct
It exploits MapReduce’s ability to group keys together to remove duplicates. This pattern uses a mapper to transform the data and doesn’t do much in the reducer. The combiner can always be utilized in this pattern and can help considerably if there are a large number of duplicates. Duplicate records are often located close to another in a data set, so a combiner will deduplicate them in the map phase.
Set the number of reducers relatively high, since the mappers will forward almost all their data to the reducers.
Pig
b = DISTINCT a;
Mapper code
The Mapper will get the user ID from each input record. This user ID will be output as the key with a null value.
context.write(outUserId, NullWritable.get());
Reducer code
context.write(key, NullWritable.get());
Sorting
Storing temperatures as
Text objects doesn’t work for sorting purposes, because signed integers don’t sort
lexicographically.1 Instead, we are going to store the data using sequence files whose
IntWritable keys represent the temperature (and sort correctly) and whose Text values
are the lines of data.
is a map-only job that also filters the input to
remove records that don’t have a valid temperature reading. Each map creates a single
block-compressed sequence file as output.
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
MapFileOutputFormat provides a pair of convenience static methods for performing
lookups against MapReduce output
IntWritable key = new IntWritable(Integer.parseInt(args[1]));
Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
Partitioner<IntWritable, Text> partitioner =
new HashPartitioner<IntWritable, Text>();
Text val = new Text();
Writable entry =
MapFileOutputFormat.getEntry(readers, partitioner, key, val);
The getReaders() method opens a MapFile.Reader for each of the output files created
by the MapReduce job. The getEntry() method then uses the partitioner to choose the
reader for the key and finds the value for that key by calling Reader’s get() method.
We can also use the readers directly to get all the records for a given key. The array of
readers that is returned is ordered by partition, so that the reader for a given key may
be found using the same partitioner that was used in the MapReduce job:
Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
Partitioner<IntWritable, Text> partitioner =
new HashPartitioner<IntWritable, Text>();
Text val = new Text();
Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
Writable entry = reader.get(key, val);
do {
parser.parse(val.toString());
} while(reader.next(nextKey, val) && key.equals(nextKey));
Total Sort: Keeping all output in sorted order
The job of the partitioner is to deterministically assign a reducer to each key
the default partitioner uses a hashing function to uniformly assign keys to reducers. This often works well in distributing work evenly across reducers.
If we have prior knowledge that the keys are approximately uniformly distributed, we can use a partitioner that assigns key ranges to each reducer and still be certain that the reducers’ loads are fairly balanced.
The naive answer is to use
a single partition.4 But this is incredibly inefficient for large files because one machine
has to process all of the output.
The secret to doing this is to use a partitioner that respects the total order of the output.
you have to choose your partition sizes carefully to ensure that they are fairly even.
Hadoop comes with a selection of samplers.
The InputSampler class defines a nested Sampler interface whose implementations
return a sample of keys given an InputFormat and Job.
writePartition File() static method on InputSampler is used, which creates a sequence file to store the
keys that define the partitions.
The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler =
new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
InputSampler.writePartitionFile(job, sampler);
// Add to DistributedCache
Configuration conf = job.getConfiguration();
String partitionFile =TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri = new URI(partitionFile + "#" +
TotalOrderPartitioner.DEFAULT_PATH);
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
We use a RandomSampler, which chooses keys with a uniform probability—here, 0.1.
There are also parameters for the maximum number of samples to take and the maximum
number of splits to sample (here, 10,000 and 10, respectively; these settings are
the defaults when InputSampler is run as an application), and the sampler stops when
the first of these limits is met. Samplers run on the client, making it important to limit
the number of splits that are downloaded so the sampler runs quickly.
SplitSampler, which
samples only the first n records in a split, is not so good for sorted data,5 because it
doesn’t select keys from throughout the split.
IntervalSampler chooses keys at regular intervals through the split
and makes a better choice for sorted data. RandomSampler is a good general-purpose
sampler.
Sorting using Hadoop – TotalOrderPartitioner
http://pipiper.wordpress.com/2013/05/02/sorting-using-hadoop-totalorderpartitioner/
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setSortComparatorClass(SortKeyComparator.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
Path inputDir = new Path(partitionLocation);
Path partitionFile = new Path(inputDir, "partitioning");
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
int numSamples = numReduceTasks;
int maxSplits = numReduceTasks - 1;
InputSampler.Sampler sampler = new InputSampler.RandomSampler(pcnt, numSamples, maxSplits);
InputSampler.writePartitionFile(job, sampler);
Sort users by last visit
http://my.safaribooksonline.com/book/software-engineering-and-development/patterns/9781449341954/4dot-data-organization-patterns/i_sect14_d1e4430_html
Secondary Sort
The MapReduce framework sorts the records by key before they reach the reducers.
For any particular key, however, the values are not sorted.
we change our keys to be composite: a combination of year and
temperature. We want the sort order for keys to be by year (ascending) and then by
temperature (descending).
A partitioner ensures only that one reducer receives all the records for a year.
• Make the key a composite of the natural key and the natural value.
• The sort comparator should order by the composite key, that is, the natural key
and natural value.
• The partitioner and grouping comparator for the composite key should consider
only the natural key for partitioning and grouping.
context.write(new IntPair(parser.getYearInt(),
parser.getAirTemperature()), NullWritable.get());
return Math.abs(key.getFirst() * 127) % numPartitions;
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
http://techbus.safaribooksonline.com/book/databases/9781608453429
Top Ten
This pattern utilizes both the mapper and the reducer. The mappers will find their local top K, then all of the individual top K sets will compete for the final top K in the reducer. Since the number of records coming out of the mappers is at most K and K is relatively small, we’ll only need one reducer.
class mapper:
setup():
initialize top ten sorted list
map(key, record):
insert record into top ten sorted list
if length of array is greater-than 10 then
truncate list to a length of 10
cleanup():
for record in top sorted ten list:
emit null,record
class reducer:
setup():
initialize top ten sorted list
reduce(key, records):
sort records
truncate records to top 10
for record in records:
emit record
Pig
Pig will have issues performing this query in any sort of optimal way. The most straightforward pattern is to mirror the SQL query, but the ordering is expensive just to find a few records. This is a situation in which you’ll find major gains in using Java MapReduce instead of Pig.
B = ORDER A BY col4 DESC;
C = LIMIT B 10;
Mapper code
private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();
repToRecordMap.put(Integer.parseInt(reputation), new Text(value));
// If we have more than ten records, remove the one with the lowest rep
// As this tree map is sorted in descending order, the user with
// the lowest reputation is the last key.
if (repToRecordMap.size() > 10) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
// Output our ten records to the reducers with a null key
for (Text t : repToRecordMap.values()) {
context.write(NullWritable.get(), t);
}
}
Reducer code
public void reduce(NullWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for (Text value : values) {
Map<String, String> parsed = transformXmlToMap(value.toString());
// If we have more than ten records, remove the one with the lowest rep
// As this tree map is sorted in descending order, the user with
// the lowest reputation is the last key.
if (repToRecordMap.size() > 10) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
for (Text t : repToRecordMap.descendingMap().values()) {
// Output our ten records to the file system with a null key
context.write(NullWritable.get(), t);
}
}
Distinct
It exploits MapReduce’s ability to group keys together to remove duplicates. This pattern uses a mapper to transform the data and doesn’t do much in the reducer. The combiner can always be utilized in this pattern and can help considerably if there are a large number of duplicates. Duplicate records are often located close to another in a data set, so a combiner will deduplicate them in the map phase.
Set the number of reducers relatively high, since the mappers will forward almost all their data to the reducers.
Pig
b = DISTINCT a;
Mapper code
The Mapper will get the user ID from each input record. This user ID will be output as the key with a null value.
context.write(outUserId, NullWritable.get());
Reducer code
context.write(key, NullWritable.get());
No comments:
Post a Comment