1 of 72

Distributed and Cloud Computing�K. Hwang, G. Fox and J. Dongarra�Chapter 6: Cloud Programming �and Software Environments�Part 1Adapted from Kai Hwang, University of Southern California�with additions from �Matei Zaharia, EECS, UC Berkeley��November 25, 2012

1

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 1

2 of 72

FEATURES OF CLOUD AND GRID PLATFORMS

  • Cloud Capabilities and Platform Features
  • Traditional Features Common to Grids and Clouds
  • Data Features and Databases
  • Programming and Runtime Support

2

3 of 72

Cloud Capabilities and Platform Features

3

4 of 72

Cloud Capabilities and Platform Features

4

5 of 72

Cloud Capabilities and Platform Features

5

6 of 72

Cloud Capabilities and Platform Features

6

7 of 72

Traditional Features Common to Grids and Clouds

  • Workflow
  • Data Transport
  • Security, Privacy, and Availability

7

8 of 72

Security, Privacy, and Availability

  • Use virtual clustering to achieve dynamic resource provisioning with minimum overhead cost.
  • Use stable and persistent data storage with fast queries for information retrieval.
  • Use special APIs for authenticating users and sending e-mail using commercial accounts.
  • Cloud resources are accessed with security protocols such as HTTPS and SSL.
  • Fine-grained access control is desired to protect data integrity and deter intruders or hackers.
  • Shared data sets are protected from malicious alteration, deletion, or copyright violations.
  • Features are included for availability enhancement and disaster recovery with life migration of VMs.
  • Use a reputation system to protect data centers. This system only authorizes trusted clients and stops pirates.

8

9 of 72

Data Features and Databases

  • Program Library
  • Blobs and Drives
  • DPFS(Data Parallel File System)
  • SQL and Relational Databases
  • Table and NOSQL Nonrelational Databases
  • Queuing Services

9

10 of 72

10

11 of 72

Programming and Runtime Support

  • Worker and Web Roles
  • Map Reduce
  • Cloud Programming Models
  • SaaS

11

12 of 72

Parallel Computing and Programming Enviroments

  • Partitioning
  • Computation partitioning
  • Data partitioning
  • Mapping
  • Synchronization
  • Communication
  • Scheduling

12

13 of 72

Parallel Computing and Programming Enviroments

  • MapReduce
  • Hadoop
  • Amazon Web Services

13

14 of 72

What is MapReduce?

  • Simple data-parallel programming model
  • For large-scale data processing
    • Exploits large set of commodity computers
    • Executes process in distributed manner
    • Offers high availability
  • Pioneered by Google
    • Processes 20 petabytes of data per day
  • Popularized by open-source Hadoop project
    • Used at Yahoo!, Facebook, Amazon, …

14

15 of 72

What is MapReduce used for?

  • At Google:
    • Index construction for Google Search
    • Article clustering for Google News
    • Statistical machine translation
  • At Yahoo!:
    • “Web map” powering Yahoo! Search
    • Spam detection for Yahoo! Mail
  • At Facebook:
    • Data mining
    • Ad optimization
    • Spam detection

15

16 of 72

Motivation: Large Scale Data Processing

  • Many tasks composed of processing lots of data to produce lots of other data
  • Want to use hundreds or thousands of CPUs� ... but this needs to be easy!
  • MapReduce provides
    • User-defined functions
    • Automatic parallelization and distribution
    • Fault-tolerance
    • I/O scheduling
    • Status and monitoring

16

17 of 72

What is MapReduce used for?

  • In research:
    • Astronomical image analysis (Washington)
    • Bioinformatics (Maryland)
    • Analyzing Wikipedia conflicts (PARC)
    • Natural language processing (CMU)
    • Particle physics (Nebraska)
    • Ocean climate simulation (Washington)
    • <Your application here>

17

18 of 72

Distributed Grep

Very

big

data

Split data

Split data

Split data

Split data

grep

grep

grep

grep

matches

matches

matches

matches

cat

All

matches

grep is a command-line utility for searching plain-text data sets for lines matching a regular expression.

cat is a standard Unix utility that concatenates and lists files

18

19 of 72

Distributed Word Count

Very

big

data

Split data

Split data

Split data

Split data

count

count

count

count

count

count

count

count

merge

merged

count

19

20 of 72

Map+Reduce

  • Map:
    • Accepts input key/value pair
    • Emits intermediate key/value pair

  • Reduce :
    • Accepts intermediate key/value* pair
    • Emits output key/value pair

Very

big

data

Result

M

A

P

R

E

D

U

C

E

Partitioning

Function

20

21 of 72

21

22 of 72

22

23 of 72

23

24 of 72

24

25 of 72

Architecture overview

Job tracker

Task tracker

Task tracker

Task tracker

Master node

Slave node 1

Slave node 2

Slave node N

Workers

user

Workers

Workers

25

26 of 72

GFS: underlying storage system

  • Goal
    • global view
    • make huge files available in the face of node failures
  • Master Node (meta server)
    • Centralized, index all chunks on data servers
  • Chunk server (data server)
    • File is split into contiguous chunks, typically 16-64MB.
    • Each chunk replicated (usually 2x or 3x).
      • Try to keep replicas in different racks.

26

27 of 72

GFS architecture

GFS Master

C0

C1

C2

C5

Chunkserver 1

C0

C5

Chunkserver N

C1

C3

C5

Chunkserver 2

C2

Client

27

28 of 72

Functions in the Model

  • Map
    • Process a key/value pair to generate intermediate key/value pairs
  • Reduce
    • Merge all intermediate values associated with the same key
  • Partition
    • By default : hash(key) mod R
    • Well balanced

28

29 of 72

Programming Concept

  • Map
    • Perform a function on individual values in a data set to create a new list of values
    • Example: square x = x * x� map square [1,2,3,4,5] � returns [1,4,9,16,25]
  • Reduce
    • Combine values in a data set to create a new value
    • Example: sum = (each elem in arr, total +=)� reduce [1,2,3,4,5]� returns 15 (the sum of the elements)‏

29

30 of 72

30

31 of 72

Fig.6.5 �Dataflow Implementation

of MapReduce

31

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 31

32 of 72

A Simple Example

  • Counting words in a large set of documents

map(string value)‏

//key: document name

//value: document contents

for each word w in value

EmitIntermediate(w, “1”);

reduce(string key, iterator values)‏

//key: word

//values: list of counts

int results = 0;

for each v in values

result += ParseInt(v);

Emit(AsString(result));

The map function emits each word w plus an associated count of occurrences (just a “1” is recorded in this �pseudo-code)

The reduce function sums together all counts emitted for a particular word

32

33 of 72

A Word Counting Example on <Key, Count> Distribution

33

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 33

34 of 72

How Does it work?

  • Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits.
  • Reduce invocations are distributed by paritioning the intermediate key space into R pieces using a hash function: hash(key) mod R.
    • R and the partitioning function are specified by the programmer.

34

35 of 72

MapReduce : Operation Steps

When the user program calls the MapReduce function, the following sequence of actions occurs :

1) The MapReduce library in the user program first splits the input files into M pieces – 16 megabytes to 64 megabytes (MB) per piece. It then starts up many copies of program on a cluster of machines.

2) One of the copies of program is master. The rest are workers that are assigned work by the master.

35

35

36 of 72

MapReduce : Operation Steps

3) A worker who is assigned a map task :

    • reads the contents of the corresponding input split
    • parses key/value pairs out of the input data and passes each pair to the user - defined Map function.

The intermediate key/value pairs produced by the Map function are buffered in memory.

4) The buffered pairs are written to local disk, partitioned into R regions by the partitioning function.

The location of these buffered pairs on the local disk are passed back to the master, who forwards these locations to the reduce workers.

36

36

37 of 72

MapReduce : Operation Steps

5) When a reduce worker is notified by the master about these locations, it reads the buffered data from the local disks of the map workers.

When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together.

6) The reduce worker iterates over the sorted intermediate data and for each unique intermediate key, it passes the key and the corresponding set of intermediate values to the user’s Reduce function.

The output of the Reduce function is appended to a final output file.

37

37

38 of 72

MapReduce : Operation Steps

7) When all map tasks and reduce tasks have been completed, the master wakes up the user program.

At this point, MapReduce call in the user program returns back to the user code.

After successful completion, output of the mapreduce execution is available in the R output files.

38

38

39 of 72

Logical Data Flow in 5 Processing �Steps in MapReduce Process

(Key, Value) Pairs are generated by the Map function over multiple available Map Workers (VM instances). These pairs are then sorted and group based on key ordering. Different key-groups are then processed by multiple Reduce Workers in parallel.

39

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 39

40 of 72

Locality issue

  • Master scheduling policy
    • Asks GFS for locations of replicas of input file blocks
    • Map tasks typically split into 64MB (== GFS block size)
    • Map tasks scheduled so GFS input block replica are on same machine or same rack
  • Effect
    • Thousands of machines read input at local disk speed
    • Without this, rack switches limit read rate

40

41 of 72

Fault Tolerance

  • Reactive way
    • Worker failure
      • Heartbeat, Workers are periodically pinged by master
        • NO response = failed worker
      • If the processor of a worker fails, the tasks of that worker are reassigned to another worker.

    • Master failure
      • Master writes periodic checkpoints
      • Another master can be started from the last checkpointed state
      • If eventually the master dies, the job will be aborted

41

42 of 72

Fault Tolerance

  • Proactive way (Redundant Execution)
    • The problem of stragglers (slow workers)
      • Other jobs consuming resources on machine
      • Bad disks with soft errors transfer data very slowly
      • Weird things: processor caches disabled (!!)

    • When computation almost done, reschedule in-progress tasks
    • Whenever either the primary or the backup executions finishes, mark it as completed

42

43 of 72

Fault Tolerance

  • Input error: bad records
    • Map/Reduce functions sometimes fail for particular inputs
    • Best solution is to debug & fix, but not always possible
    • On segment fault
      • Send UDP packet to master from signal handler
      • Include sequence number of record being processed
    • Skip bad records
      • If master sees two failures for same record, next worker is told to skip the record

43

44 of 72

Status monitor

44

45 of 72

Points need to be emphasized

  • No reduce can begin until map is complete
  • Master must communicate locations of intermediate files
  • Tasks scheduled based on location of data
  • If map worker fails any time before reduce finishes, task must be completely rerun
  • MapReduce library does most of the hard work for us!

45

46 of 72

Other Examples

  • Distributed Grep:
    • Map function emits a line if it matches a supplied pattern.
    • Reduce function is an identity function that copies the supplied intermediate data to the output.
  • Count of URL accesses:
    • Map function processes logs of web page requests and outputs <URL, 1>,
    • Reduce function adds together all values for the same URL, emitting <URL, total count> pairs.
  • Reverse Web-Link graph; e.g., all URLs with reference to http://dblab.usc.edu:
    • Map function outputs <tgt, src> for each link to a tgt in a page named src,
    • Reduce concatenates the list of all src URLS associated with a given tgt URL and emits the pair: <tgt, list(src)>.
  • Inverted Index; e.g., all URLs with 585 as a word:
    • Map function parses each document, emitting a sequence of <word, doc_ID>,
    • Reduce accepts all pairs for a given word, sorts the corresponding doc_IDs and emits a <word, list(doc_ID)> pair.
    • Set of all output pairs forms a simple inverted index.

46

47 of 72

MapReduce Implementations

MapReduce

Cluster,

1, Google

2, Apache Hadoop

Multicore CPU,

Phoenix @ stanford

GPU,

Mars@HKUST

47

48 of 72

Hadoop : software platform originally developed by Yahoo enabling users to write and run applications over vast distributed data.

Attractive Features in Hadoop :

  • Scalable : can easily scale to store and process petabytes of � data in the Web space
  • Economical : An open-source MapReduce minimizes the � overheads in task spawning and massive data communication.
  • Efficient: Processing data with high-degree of parallelism � across a large number of commodity nodes
  • Reliable : Automatically maintains multiple copies of data to � facilitate redeployment of computing tasks on failures

48

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 48

49 of 72

Typical Hadoop Cluster

  • 40 nodes/rack, 1000-4000 nodes in cluster
  • 1 Gbps bandwidth within rack, 8 Gbps out of rack
  • Node specs (Yahoo terasort):�8 x 2GHz cores, 8 GB RAM, 4 disks (= 4 TB?)

Aggregation switch

Rack switch

Image from http://wiki.apache.org/hadoop-data/attachments/HadoopPresentations/attachments/YahooHadoopIntro-apachecon-us-2008.pdf

49

50 of 72

Typical Hadoop Cluster

Image from http://wiki.apache.org/hadoop-data/attachments/HadoopPresentations/attachments/aw-apachecon-eu-2009.pdf

50

51 of 72

Challenges

Cheap nodes fail, especially if you have many

    • Mean time between failures for 1 node = 3 years
    • Mean time between failures for 1000 nodes = 1 day
    • Solution: Build fault-tolerance into system

Commodity network = low bandwidth

    • Solution: Push computation to the data

Programming distributed systems is hard

    • Solution: Data-parallel programming model: users write “map” & “reduce” functions, system distributes work and handles faults

51

52 of 72

Hadoop Components

  • Distributed file system (HDFS)
    • Single namespace for entire cluster
    • Replicates data 3x for fault-tolerance

  • MapReduce framework
    • Executes user jobs specified as “map” and “reduce” functions
    • Manages work distribution & fault-tolerance

52

53 of 72

Hadoop Distributed File System

  • Files split into 128MB blocks
  • Blocks replicated across several datanodes (usually 3)
  • Single namenode stores metadata (file names, block locations, etc)
  • Optimized for large files, sequential reads
  • Files are append-only

Namenode

Datanodes

1

2

3

4

1

2

4

2

1

3

1

4

3

3

2

4

File1

53

54 of 72

54

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 54

55 of 72

Secure Query Processing with Hadoop/MapReduce

  • Query Rewriting and Optimization Principles defined and implemented for two types of data
  • (i) Relational data: Secure query processing with HIVE
  • (ii) RDF Data: Secure query processing with SPARQL
  • Demonstrated with XACML Policies (content, temporal, association)
  • Joint demonstration with Kings College and U. of Insubria
    • First demo (2010): Each party submits their data and policies
    • Our cloud will manage the data and policies
    • Second demo (2011): Multiple clouds

55

Copyright © 2012, Elsevier Inc. All rights reserved.

1 - 55

56 of 72

Higher-level languages over Hadoop: Pig and Hive

56

57 of 72

Motivation

  • Many parallel algorithms can be expressed by a series of MapReduce jobs

  • But MapReduce is fairly low-level: must think about keys, values, partitioning, etc

  • Can we capture common “job building blocks”?

57

58 of 72

Pig

  • Started at Yahoo! Research
  • Runs about 30% of Yahoo!’s jobs
  • Features:
    • Expresses sequences of MapReduce jobs
    • Data model: nested “bags” of items
    • Provides relational (SQL) operators (JOIN, GROUP BY, etc)
    • Easy to plug in Java functions
    • Pig Pen development environment for Eclipse

58

59 of 72

An Example Problem

Suppose you have user data in one file, page view data in another, and you need to find the top 5 most visited pages by users aged 18 - 25.

Load Users

Load Pages

Filter by age

Join on name

Group on url

Count clicks

Order by clicks

Take top 5

Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt

59

60 of 72

In MapReduce

Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt

60

61 of 72

In Pig Latin

Users = load ‘users’ as (name, age);�Filtered = filter Users by � age >= 18 and age <= 25; �Pages = load ‘pages’ as (user, url);�Joined = join Filtered by name, Pages by user;�Grouped = group Joined by url;�Summed = foreach Grouped generate group,� count(Joined) as clicks;�Sorted = order Summed by clicks desc;�Top5 = limit Sorted 5;

store Top5 into ‘top5sites’;

Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt

61

62 of 72

Ease of Translation

Notice how naturally the components of the job translate into Pig Latin.

Load Users

Load Pages

Filter by age

Join on name

Group on url

Count clicks

Order by clicks

Take top 5

Users = load …�Filtered = filter … �Pages = load …�Joined = join …�Grouped = group …�Summed = … count()…�Sorted = order …�Top5 = limit

Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt

62

63 of 72

Ease of Translation

Notice how naturally the components of the job translate into Pig Latin.

Load Users

Load Pages

Filter by age

Join on name

Group on url

Count clicks

Order by clicks

Take top 5

Users = load …�Filtered = filter … �Pages = load …�Joined = join …�Grouped = group …�Summed = … count()…�Sorted = order …�Top5 = limit

Job 1

Job 2

Job 3

Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt

63

64 of 72

Hive

  • Developed at Facebook
  • Used for majority of Facebook jobs
  • “Relational database” built on Hadoop
    • Maintains list of table schemas
    • SQL-like query language (HQL)
    • Can call Hadoop Streaming scripts from HQL
    • Supports table partitioning, clustering, complex�data types, some optimizations

64

65 of 72

Sample Hive Queries

SELECT p.url, COUNT(1) as clicks

FROM users u JOIN page_views p ON (u.name = p.user)

WHERE u.age >= 18 AND u.age <= 25

GROUP BY p.url

ORDER BY clicks

LIMIT 5;

  • Find top 5 pages visited by users aged 18-25:
  • Filter page views through Python script:

SELECT TRANSFORM(p.user, p.date)

USING 'map_script.py'

AS dt, uid CLUSTER BY dt

FROM page_views p;

65

66 of 72

Amazon Elastic MapReduce

  • Provides a web-based interface and command-line tools for running Hadoop jobs on Amazon EC2
  • Data stored in Amazon S3
  • Monitors job and shuts down machines after use
  • Small extra charge on top of EC2 pricing

  • If you want more control over how you Hadoop runs, you can launch a Hadoop cluster on EC2 manually using the scripts in src/contrib/ec2

66

67 of 72

Elastic MapReduce Workflow

67

68 of 72

Elastic MapReduce Workflow

68

69 of 72

Elastic MapReduce Workflow

69

70 of 72

Elastic MapReduce Workflow

70

71 of 72

Conclusions

  • MapReduce programming model hides the complexity of work distribution and fault tolerance

  • Principal design philosophies:
    • Make it scalable, so you can throw hardware at problems
    • Make it cheap, lowering hardware, programming and admin costs

  • MapReduce is not suitable for all problems, but when it works, it may save you quite a bit of time

  • Cloud computing makes it straightforward to start using Hadoop (or other parallel software) at scale

71

72 of 72

Resources

72