HADOOP: SETUP MAVEN PROJECT FOR MAPREDUCE IN 5MN
http://hadoopi.wordpress.com/2013/05/25/setup-maven-project-for-hadoop-in-5mn/
People You May Know” Friendship Recommendation with Hadoop
http://importantfish.com/people-you-may-know-friendship-recommendation-with-hadoop/
a jobtracker and
a number of tasktrackers. The jobtracker coordinates all the jobs run on the system by
scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress
reports to the jobtracker, which keeps a record of the overall progress of each job. If a
task fails, the jobtracker can reschedule it on a different tasktracker.
Hadoop divides the input to a MapReduce job into fixed-size pieces called input
splits, or just splits. Hadoop creates one map task for each split, which runs the userdefined
map function for each record in the split.
Hadoop does its best to run the map task on a node where the input data resides in
HDFS. This is called the data locality optimization because it doesn’t use valuable cluster
bandwidth.
Sometimes, however, all three nodes hosting the HDFS block replicas
for a map task’s input split are running other map tasks, so the job scheduler will look
for a free map slot on a node in the same rack as one of the blocks. Very occasionally
even this is not possible, so an off-rack node is used, which results in an inter-rack
network transfer.
the optimal split size is the same as the block size: it is the
largest size of input that can be guaranteed to be stored on a single node. If the split
spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so
some of the split would have to be transferred across the network to the node running the task.
Map tasks write their output to the local disk, not to HDFS.
The output of the reduce is normally stored in HDFS for reliability.
for each HDFS block of the reduce output, the first replica is stored on the local node, with
other replicas being stored on off-rack nodes.
Combiner Functions
a combiner function to be run on the map output, and the combiner function’s output forms the input to the reduce function. Because the combiner function
is an optimization, Hadoop does not provide a guarantee of how many times it
will call it for a particular map output record, if at all. In other words, calling the
combiner function zero, one, or many times should produce the same output from the
reducer.
Hadoop Streaming
Hadoop provides an API to MapReduce that allows you to write your map and reduce
functions in languages other than Java. Hadoop Streaming uses Unix standard streams
as the interface between Hadoop and your program, so you can use any language that
can read standard input and write to standard output to write your MapReduce
program.
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
In contrast to
the Java API, where you are provided an iterator over each key group, in Streaming you
have to find key group boundaries in your program.
cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb | \
sort | ch02/src/main/ruby/max_temperature_reduce.rb
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
Hadoop Pipes
Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce. Unlike Streaming,
which uses standard input and output to communicate with the map and reduce
code, Pipes uses sockets as the channel over which the tasktracker communicates with
the process running the C++ map or reduce function. JNI is not used.
Avro Data Types and Schemas
Avro defines a small number of primitive data types, which can be used to build
application-specific data structures by writing schemas.
{
"type": "array",
"items": "long"
}
{
"type": "map",
"values": "string"
}
{
"type": "record",
"name": "StringPair",
"doc": "A pair of strings.",
"fields": [
{"name": "left", "type": "string"},
{"name": "right", "type": "string"}
]
}
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(getClass().getResourceAsStream("StringPair.avsc"));
GenericRecord datum = new GenericData.Record(schema);
datum.put("left", "L");
datum.put("right", "R");
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
GenericRecord result = reader.read(null, decoder);
Using avro-maven-plugin
DatumWriter<StringPair> writer =
new SpecificDatumWriter<StringPair>(StringPair.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(datum, encoder);
DatumReader<StringPair> reader =
new SpecificDatumReader<StringPair>(StringPair.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
StringPair result = reader.read(null, decoder);
Avro Datafiles
Avro’s object container file format is for storing sequences of Avro objects. It is very
similar in design to Hadoop’s sequence files,
File file = new File("data.avro");
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum);
dataFileWriter.close();
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(file, reader);
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
// process record
}
Schema Resolution
{"name": "description", "type": "string", "default": "}
Another common use of a different reader’s schema is to drop fields in a record, an
operation called projection. This is useful when you have records with a large number
of fields and you want to read only some of them.
Another useful technique for evolving Avro schemas is the use of name aliases. Aliases
allow you to use different names in the schema used to read the Avro data than in the
schema originally used to write the data.
{"name": "first", "type": "string", "aliases": ["left"]},
Speculative Execution
The MapReduce model is to break jobs into tasks and run the tasks in parallel to make
the overall job execution time smaller than it would be if the tasks ran sequentially.
Parallel Copying with distcp for copying large amounts of data to and from Hadoop filesystems in parallel
hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar
By default, distcp will skip files that already exist in the destination, but they can be
overwritten by supplying the -overwrite option. You can also update only the files that
have changed using the -update option.
hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo
dfs.http.address property, which defaults to 50070.
hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/bar
Hadoop balancer tool
Hadoop Archives: HAR files
Hadoop Archives, or HAR files, are a file archiving facility that packs files into HDFS
blocks more efficiently, thereby reducing namenode memory usage while still allowing
transparent access to files.
hadoop archive -archiveName files.har /my/files /my
hadoop fs -ls /my/files.har
hadoop fs -lsr har:///my/files.har
hadoop fs -lsr har:///my/files.har/my/files/dir
% hadoop fs -lsr har://hdfs-localhost:8020/my/files.har/my/files/dir
To delete a HAR file, you need to use the recursive form of delete because from the
underlying filesystem’s point of view, the HAR file is a directory:
% hadoop fs -rmr /my/files.har
HDFS is a filesystem designed for storing very large files with streaming data access
patterns, running on clusters of commodity hardware.
Files in HDFS may be written to by a single writer. Writes are always made at the
end of the file. There is no support for multiple writers or for modifications at
arbitrary offsets in the file.
there are tools to perform filesystem maintenance, such as df and fsck, that operate on
the filesystem block level.
hadoop fsck / -files -blocks
Namenodes and Datanodes
The namenode manages the
filesystem namespace. It maintains the filesystem tree and the metadata for all the files
and directories in the tree. This information is stored persistently on the local disk in
the form of two files: the namespace image and the edit log. The namenode also knows
the datanodes on which all the blocks for a given file are located; however, it does
not store block locations persistently, because this information is reconstructed from
datanodes when the system starts.
HDFS Federation
HDFS Federation, introduced in the 2.x release series, allows a cluster to scale by adding
namenodes, each of which manages a portion of the filesystem namespace
HDFS High-Availability
there is a pair of namenodes in an activestandby
configuration. In the event of the failure of the active namenode, the standby
takes over its duties to continue servicing client requests without a significant interruption.
The namenodes must use highly available shared storage to share the edit log.
Datanodes must send block reports to both namenodes because the block mappings
are stored in a namenode’s memory, and not on disk.
• Clients must be configured to handle namenode failover, using a mechanism that
is transparent to users.
Failover and fencing
Failover controllers are pluggable, but the first
implementation uses ZooKeeper to ensure that only one namenode is active.
fs.default.name, set to hdfs://localhost/, default dfs port is 8020.
dfs.replication
Basic Filesystem Operations
hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://localhost/user/tom/quangle.txt
hadoop fs -copyToLocal quangle.txt quangle.copy.txt
md5 input/docs/quangle.txt quangle.copy.txt
directories are treated as metadata and stored by the namenode, not the datanodes.
Hadoop Filesystems
Local file fs.LocalFileSystem A filesystem for a locally connected disk with clientside
checksums. Use RawLocalFileSystem for a local filesystem with no checksums.
HDFS hdfs hdfs.DistributedFileSystem
HFTP hftp hdfs.HftpFileSystem A filesystem providing read-only access to HDFS over
HTTP. Often used with distcp to copy data between HDFS
clusters running different versions.
HSFTP hsftp hdfs.HsftpFileSystem A filesystem providing read-only access to HDFS over
HTTPS.
WebHDFS webhdfs hdfs.web.WebHdfsFile
System
A filesystem providing secure read-write access to HDFS
over HTTP. WebHDFS is intended as a replacement for
HFTP and HSFTP.
HAR har fs.HarFileSystem A filesystem layered on another filesystem for archiving
files. Hadoop Archives are typically used for archiving files
in HDFS to reduce the namenode’s memory usage.
FTP ftp fs.ftp.FTPFileSystem A filesystem backed by an FTP server.
S3 (native) s3n fs.s3native.
NativeS3FileSystem
A filesystem backed by Amazon S3.
S3 (blockbased)
s3 fs.s3.S3FileSystem A filesystem backed by Amazon S3, which stores files in
blocks (much like HDFS) to overcome S3’s 5 GB file size
limit.
Distributed
RAID
hdfs hdfs.DistributedRaidFi
leSystem
A “RAID” version of HDFS designed for archival storage.
For each file in HDFS, a (smaller) parity file is created,
which allows the HDFS replication to be reduced from
three to two, which reduces disk usage by 25% to 30%
while keeping the probability of data loss the same. Distributed
RAID requires that you run a RaidNode daemon
on the cluster.
View viewfs viewfs.ViewFileSystem A client-side mount table for other Hadoop filesystems.
Commonly used to create mount points for federated
namenodes
The hadoop fs command has a -text option to display sequence files in textual form.
It looks at a file’s magic number so that it can attempt to detect the type of the file and
appropriately convert it to text.
hadoop fs -text numbers.seq | head
Sorting and merging SequenceFiles
The SequenceFile format
A sequence file consists of a header followed by one or more records
The first three bytes of a sequence file are the bytes SEQ, which acts as a magic number,
followed by a single byte representing the version number. The header contains other
fields, including the names of the key and value classes, compression details, userdefined
metadata, and the sync marker.14 Recall that the sync marker is used to allow
a reader to synchronize to a record boundary from any position in the file. Each file has
a randomly generated sync marker, whose value is stored in the header. Sync markers
appear between records in the sequence file. They are designed to incur less than a 1%
storage overhead, so they don’t necessarily appear between every pair of records.
Block compression compresses multiple records at once; it is therefore more compact
than and should generally be preferred over record compression because it has the
opportunity to take advantage of similarities between records.
Records
are added to a block until it reaches a minimum size in bytes, defined by the
io.seqfile.compress.blocksize property; the default is 1 million bytes. A sync marker
is written before the start of every block. The format of a block is a field indicating the
number of records in the block, followed by four compressed fields: the key lengths,
the keys, the value lengths, and the values.
the temporary outputs of maps are stored using SequenceFile.
The SequenceFile provides a Writer, Reader and Sorter classes for writing, reading and sorting respectively.
MapFile
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can
be thought of as a persistent form of java.util.Map.
writer = new MapFile.Writer(conf, fs, uri,key.getClass(), value.getClass());
key.set(i + 1);
value.set(DATA[i % DATA.length]);
writer.append(key, value);
If we look at the MapFile, we see it’s actually a directory containing two files called
data and index.
The data file contains all of the entries.
The index file contains a fraction of the keys and contains a mapping from the key to
that key’s offset in the data file.
by default only every 128th key is included in the index,
although you can change this value either by setting the io.map.index.interval
property or by calling the setIndexInterval() method on the MapFile.Writer instance.
A reason to increase the index interval would be to decrease the amount of memory
that the MapFile needs to store the index. Conversely, you might decrease the interval
to improve the time for random selection (since fewer records need to be skipped on
average) at the expense of memory usage.
reader.get(new IntWritable(496), value);
For this operation, the MapFile.Reader reads the index file into memory (this is cached
so that subsequent random access calls will use the same in-memory index). The reader
then performs a binary search on the in-memory index to find the key in the index that
is less than or equal to the search key.
Next, the reader seeks to this offset
in the data file and reads entries until the key is greater than or equal to the search key
(496). In this case, a match is found and the value is read from the data file.
MapFile variants
SetFile is a specialization of MapFile for storing a set of Writable keys. The keys
must be added in sorted order.
• ArrayFile is a MapFile where the key is an integer representing the index of the
element in the array and the value is a Writable value.
• BloomMapFile is a MapFile that offers a fast version of the get() method, especially
for sparsely populated files. The implementation uses a dynamic bloom filter for
testing whether a given key is in the map. The test is very fast because it is inmemory,
but it has a nonzero