CCAH Exam Notes

Disclaimer: This is a formatted version of notes at Link

Hadoop Distributed Filesystem (HDFS)

Goals for HDFS

HDFS Design

Daemons that make up HDFS

NameNode

Secondary NameNode

Datanode

Process of reading a file

Process of writing a file

NameNode High Availability

NameNode Federation

Clients

Commands

MapReduce

MapReduce v1

High Level overview of a MapReduce v1

MapReduce v2 (YARN)

Planning a cluster

Distribution and software version

Hardware

Master nodes (jobtracker,NameNode,secondary NameNode)

NameNode considerations

Worker/datanodes

Cluster sizing

OS selection, prep, layout

Hostnames

Users, Groups, Privs

Tuning and conf changes

Network usage and design

Traditional Tree Topology

Spine Fabric

Install and Config

Install

Config

core-site.xml important properties

hdfs-site.xml important properties

mapred-site.xml important properties

Logging

Monitoring cluster

Quick HTTP Ports Reference

Quotas and Job Schedulers

Quota

Schedulers

FIFO (First In First Out) Scheduler

Fair Scheduler

Capacity Scheduler (CS)

Monitoring and Managing

Ecosystem

Apache Hive

Pig

Zookeeper

Sqoop

Other Important Projects

Hadoop Distributed Filesystem (HDFS)

Built to support high throughput, streaming reads and writes of extremely large files.

NAS and SAN's offer centralized, low-latency access to either a block device or a filesystem on the order of terabytes in size. They do not scale to meet the need of thousands of machines pulling hundreds of Gigs of content all at one time.

Goals for HDFS

HDFS Design

Daemons that make up HDFS

There are three daemons that make up a standard HDFS cluster.

NameNode

hadoop dfsadmin -safemode get

Secondary NameNode

Datanode

Process of reading a file

Reading file in HDFS called /foo/bar.txt.

  1. The client uses a Hadoop client program to make the request.
  2. Client program reads the cluster config file on the local machine which tells it where the namemode is located. This has to be configured ahead of time.
  3. The client contacts the NameNode and requests the file it would like to read.
  4. Client validation is checked by username or by strong authentication mechanism like Kerberos.
  5. Once client is validated request is  checked against the owner and permissions of the file.
  6. If the file exists and the user has access to it then the NameNode responds with the first block id and provides a list of datanodes a copy of the block can be found, sorted by their distance to the client (reader).
  7. The client now contacts the most appropriate datanode directly and read the block data it needs. This process repeats until all blocks in the file have been read or the client closes the file stream.

Process of writing a file

Writing a new file to HDFS called /foo/babar.txt.

  1. The client uses a Hadoop client program to make the request.
  2. Client program reads the cluster config file on the local machine which tells it where the namemode is located. This has to be configured ahead of time.
  3. A request is sent to the NameNode to create the file metadata
  4. Client validation is checked by username or by an authentication mechanism like Kerberos.
  5. If the user has the necessary permissions to do so, the metadata entry for the new file is made. However, it initially has no associated blocks.
  6. NameNode responds to the client and indicates the open request was successful and that it may now begin writing data.
  7. The client starts breaking up the file into pieces (packets, not TCP ones), queues them in memory and starts a data stream from this queue.
  8. The client contacts the NameNode requesting a set of datanodes to which replicas of the next block should be written.
  9. The namemode responds and the clients data packets are then streamed to the first datanode, which writes the data to disk, and to the next datanode, which writes to its disk, and so on. This is called a replication pipeline. Each datanode in the replication pipeline acknowledges each packet as it's successfully written.
  10. The client application maintains a list of packets for which acknowledgments have not yet been received and when it receives a response, it knows the data has been written to all nodes in the pipeline. This process of writing packets to the pipeline continues until the block size is reached, at which point the client goes back to the NameNode for the next set of datanodes  to write to.
  11. Eventually, the client indicates it's finished sending data by closing the stream, which flushes any remaining packets out to disk and updates the NameNode to indicate the file is now complete.

NameNode High Availability

NameNode Federation

Clients

Commands

# Display basic usage information

hadoop fs  

# List files in a dir. Uses fs.default.name value in core-site.xml file if full url syntax is not used.

hadoop fs -ls /user/dude

or

hadoop fs -ls hdfs://NameNode.blah.com:8020/home/dude

# Upload file with -put or -copyFromLocal which copies file form local filesystem

hadoop fs -put /etc/resolv.conf /user/dude/

# Download file from HDFS using -get or -copyToLocal.

hadoop fs -get /user/dude/resolv.conf ./

# Set a replication factor for a file or dir of files with the -R

hadoop fs -setrep 5 -R /user/dude/rep5/

# Run a fsck on the files we set the rep factor on and see if it looks correct

hadoop fsck /user/dude/rep5 -files -blocks -locations

MapReduce

MapReduce v1

High Level overview of a MapReduce v1

  1. User creates a job and submits it to the jobtracker process.
  2. A map task is created that runs the user-supplied map function on each record. The map function takes a key-value pair as input and produces zero or more intermediate key-value pairs.

partition 1, dog => poodle  

partition 2, cat => bengal

partition 3, bird => hawk

partition 1, dog => pug

partition 3, bird => duck  

  1. The next step is shuffle and sort. This is performed by the reducers (reduce tasks). Each reducer is assigned one of the partitions on which it should work. This is a flurry of network copies between each reducer in the cluster so it can get the partition (intermediate key-value) data it was assigned to work on.
  1. After the partition data has been copied we can start performing a merge sort of the data. A merge sort takes a number of sorted items and merges them together to form a fully sorted list. Each reducer produces a separate output file, usually in HDFS. Each reducer output file usually named part-, where  is the number of the reduce task within the job. The output format of the file is specified by the author of the MapReduce job.

MapReduce v2 (YARN)

Planning a cluster

Distribution and software version

Hardware

Master nodes (JobTracker, NameNode, SecondaryNameNode)

NameNode considerations

Worker/datanodes

Cluster sizing

Sample growth plan

Average daily ingest rate

2TB

Replication factor

3

Daily raw consumption                  

6 TB

Ingest x replication

Node raw storage                        

24 TB

12 x 2 TB SATA II HDD

MapReduce temp space reserve

25%

For intermediate MapReduce data

Node-usable raw storage                

18 TB

Node raw storage  MapReduce reserve

1 year (flat growth)                    

122 nodes  

Ingest x replication x 365 / node raw storage

OS selection, prep, layout

Hostnames

Users, Groups, Privs

Tuning and conf changes

hdfs - nofile 32768

mapred - nofile 32768

hbase - nofile 32768

vm.swappiness=0

vm.overcommit_memory=1

Network usage and design

Traditional Tree Topology

Spine Fabric

Install and Config

Install

Config

core-site.xml important properties

hdfs-site.xml important properties

mapred-site.xml important properties

Logging

Monitoring cluster

  NameNode localhost.localdomain:8020 (active)

  Started: sat sep 13 4:4:44 EST 2013

  Version: 2.0.0-cdh4.3.0, 234929d9s9839s98sd7f9d7s9g

  Compiled: Mon May 3 3:3:33 by blah from Unknown

  Upgrades: There are not upgrades in progress

  Cluster ID: cluster11

  Blockpool ID: BP-23434343333-127.0.0.1-34322343ll3434

  [Browse the filesystem]

  [NameNode Logs]

  --------------------------------------------------------------

  Cluster Summary

  Security is OFF

  124 files and directories, 91 blocks = 215 total.

  Heap memory used 41.31 MB is 50% of Committed Heap Memory 81.06 MB. Max Heap Memory is 171.94 MB.

  Non Heap Memory used 47.55 MB is 70% of Committed Non Heap Memory 67.05 MB. Max Non Heap Memory is 130MB

  Configured Capacity       : 11.07GB

  DFS Used                                  : 58.96 MB

  Non DFS Used                  : 2.95GB

  DFS Remaining                : 8.06GB

  DFS Used%                          : 0.52%

  DFS Remaining%               : 72.82%

  Block Pool Used                  : 58.96 MB

  Block Pool Used%              : 0.52%

  DataNodes Usages              : Min %    Median %     Max %     stdev %

                                  0.52%    0.52%        0.52%     0.00%

  [Live Nodes]                         : 1 (Decomissioned: 0)

  [Dead Nodes]                       : 0 (Decomissioned: 0)

  [Decomissioning Nodes]        : 0

  Number of unreplicated blocks : 91

  NameNode Journal Status:

  Current transaction ID: 894

  ---------------------------------   --------------------------------------------

  Journal Manager                     State

  --------------------------------    ---------------------------------------------

  FileJournalManager(root=/dfs/nn)       EditLogFileOutputStream(/dfs/nn/current/edits_inprogress_00000000844

  NameNode Storage:

  -------------------  -----            -----------

  Storage Directory    Type             State

  ------------------   ----             -------------

  /dfs/nn              IMAGE_AND_EDITS  Active

- URL of view of aggregate cluster view that shows all NameNodes that make up the cluster.

  This is the federated NameNode cluster view.

  http://NameNode.blah.com:50070/dfsclusterhealth.jsp

  - Html page looks like:

    Cluster ' cluster11'

   

    Cluster Summary

    Total files and directories 124

    Configured Capacity           : 11.07GB

    DFS Used                      : 58.96 MB

    Non DFS Used                  : 2.95GB

    DFS Remaining                 : 8.06GB

    DFS Used%                     : 0.52%

    DFS Remaining%                : 72.82%

   -----------------------------------------

   NameNodes

   Number of NameNodes : 1

   NameNode                Blockpool Files and   Blocks Missing  Live Datenode   Dead Datanode

                           Used      Directories        Blocks  (Decomissioned) (Decomissioned)

  ---------------------   ---------  ----------- -----  ------  --------------  --------------

  localhost.localdomain    58.96MB   124         91     0       1 (0)           0 (0)    

- URL of view of the jobtracker http://jobtracker.blah.com:50030/jobtracker.jsp

  - Html page looks like:

  localhost Hadoop Map/Reduce Administration

  State: RUNNING

  Started: Sat Sep 14 12:22:33 PDT 2013

  Version: 2.0.0.-mr1-cdh4.3.0, Unknown

  Compiled: Mon May 14 12:22:33 by blah from Unknown

  Identifier: 202023003020

  Cluster Summary (Heap Size is 81.06MB/171.94 MB)

 

  Running  Running     Total     Nodes   Occupied  Occupied  Reserve  Map      Reduce    Avg.    Blacklisted  Excluded

   Map     Reduce   Submissions           Map      Reduced   Reduce   Task     Task    Task/Node Nodes        Nodes

  Tasks    Tasks                         Slots     Slots     Slots   Capacity Capacity

  ------   -------  ----------- ------   --------  --------  ------- -------- -------- --------- -----------  --------

    0         0          0        1         0        0         0        0        0       6.00       0            0

Scheduling Information

Queue Name   State    Scheduling Information

-----------  ------   ----------------------

default      running    N/A

Filter (Jobid,Priority, User, Name ______________

Running Jobs

------------

None

Retired Jobs

-------------

None

Local Logs

[Log] directory, [Job Tracker History]

- sudo -u hdfs hadoop dfsadmin -report

Quick HTTP Ports Reference

HDFS                        Port        Config Parameter

-------------

Namenode                50070        dfs.http.address

Datanodes                50075        dfs.datanode.http.address

Secondary NameNode        50090        dfs.secondary.http.address

Backup/Checkpoint node        50105        dfs.backup.http.address

Mapreduce        Port    Config Parameter

------------

JobTracker        50030        mapred.job.tracker.http.address

Tasktrackers        50060        mapred.task.tracker.http.address

Quotas and Job Schedulers

Quota

dfsadmin -setSpaceQuota size path

hadoop fs -count -q /user/blah

hadoop dfsadmin -setQuota number path

Schedulers

FIFO (First In First Out) Scheduler

Fair Scheduler

Capacity Scheduler (CS)

Monitoring and Managing

  1. Add hosts ip address to the dfs.hosts file if using host include functionality.
  2. run hdfs dfsadmin -refreshNodes as HDFS superuser.
  3. Update any rack awareness files.
  4. Start datanode process on the datanode.
  5. Check hadoop dfsadmin -report or webUI for new node.
  6. Run the balancer to re-distribute the blocks.
  1. Follow the procedure for adding a datanode to HDFS.
  2. Run the balancer utility to distribute existing block data to the new datanode.
  3. Start the tasktracker process.
  4. Confirm that the jobtracker can communicate with the new tasktracker by checking the number of available tasktrackers in its web user interface.
  1. Add the ip of the node to the dfs.hosts.exclude file.
  2. Run hadoop dfsadmin -refreshNodes as HDFS superuser.
  3. Monitor the NameNode web UI to make sure it starts the decommission process.
  4. Wait a very long time and keep watching the NameNode web UI until it says "decommissioned" next to the nodes name.
  5. Stop the datanode process.
  6. Remove it from the HDFS include and exclude files as well as any rack topology database.
  7. Run hadoop dfsadmin -refreshNodes so NameNode sees the removal.

sudo -u hdfs hadoop fsck /

  1. The specified path is a directory.
  2. The directory exists.
  3. The directory is readable.
  4. The directory is writable.
  1. Stop any Hadoop-related processes (optionally following the decommissioning process for the datanode).
  2. Replace any failed disks.
  3. Follow the process for adding the node back into the cluster.
  4. Run the Hadoop fsck utility to validate the health of HDFS. Over-replicated blocks are normal immediately after a node is reintroduced to the cluster, which is automatically corrected (over replicated files are deleted) over time.

Ecosystem

Apache Hive

export HIVE_INSTALL=/home/blah/hive-x.y.z-dev

export PATH=$PATH:$HIVE_INSTALL/bin

hive

hive>

Pig

tar xzf pig-x.y.z.tar.gz

export PIG_INSTALL=/home/tom/pig-x.y.z

export PATH=$PATH:$PIG_INSTALL/bin

pig -x mapreduce

Zookeeper

tar xzf zookeeper-x.y.z.tar.gz

export ZOOKEEPER_INSTALL=/home/blah/zookeeper-x.y.z

export PATH=$PATH:$ZOOKEEPER_INSTALL/bin

tickTime=2000
dataDir=/Users/tom/zookeeper
clientPort=2181

zkServer.sh start

Sqoop

tar xvzf sqoop-x.y.z.tar.gz

export SQOOP_HOME=/home/blah/sqoop   export ENV var

$SQOOP_HOME/bin/sqoop

then run it. standard location is /usr/bin/

Other Important Projects