你好,游客 登錄
背景:
閱讀新聞

2019最新六合图库资料:Hadoop Map/Reduce教程

[日期:2009-04-07] 來源:Hadoop網站  作者: [字體: ]

目的

六合图库118万众图库 www.xorsm.icu 這篇教程從用戶的角度出發,全面地介紹了Hadoop Map/Reduce框架的各個方面。

先決條件

請先確認Hadoop被正確安裝、配置和正常運行中。更多信息見:

概述

Hadoop Map/Reduce是一個使用簡易的軟件框架,基于它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,并以一種可靠容錯的方式并行處理上T級別的數據集。

一個Map/Reduce 作業(job) 通?;嵐咽淙氳氖菁蟹治舾啥懶⒌氖菘?,由 map任務(task)以完全并行的方式處理它們??蚣芑岫詍ap的輸出先進行排序, 然后把結果輸入給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。 整個框架負責任務的調度和監控,以及重新執行已經失敗的任務。

通常,Map/Reduce框架和分布式文件系統是運行在一組相同的節點上的,也就是說,計算節點和存儲節點通常在一起。這種配置允許框架在那些已經存好數據的節點上高效地調度任務,這可以使整個集群的網絡帶寬被非常高效地利用。

Map/Reduce框架由一個單獨的master JobTracker 和每個集群節點一個slave TaskTracker共同組成。master負責調度構成一個作業的所有任務,這些任務分布在不同的slave上,master監控它們的執行,重新執行已經失敗的任務。而slave僅負責執行由master指派的任務。

應用程序至少應該指明輸入/輸出的位置(路徑),并通過實現合適的接口或抽象類提供map和reduce函數。再加上其他作業的參數,就構成了作業配置(job configuration)。然后,Hadoop的 job client提交作業(jar包/可執行程序等)和配置信息給JobTracker,后者負責分發這些軟件和配置信息給slave、調度任務并監控它們的執行,同時提供狀態和診斷信息給job-client。

雖然Hadoop框架是用JavaTM實現的,但Map/Reduce應用程序則不一定要用 Java來寫 。

  • Hadoop Streaming是一種運行作業的實用工具,它允許用戶創建和運行任何可執行程序 (例如:Shell工具)來做為mapper和reducer。
  • Hadoop Pipes是一個與SWIG兼容的C++ API (沒有基于JNITM技術),它也可用于實現Map/Reduce應用程序。

輸入與輸出

Map/Reduce框架運轉在<key, value> 鍵值對上,也就是說, 框架把作業的輸入看為是一組<key, value> 鍵值對,同樣也產出一組 <key, value> 鍵值對做為作業的輸出,這兩組鍵值對的類型可能不同。

框架需要對keyvalue的類(classes)進行序列化操作, 因此,這些類需要實現 Writable接口。 另外,為了方便框架執行排序操作,key類必須實現 WritableComparable接口。

一個Map/Reduce 作業的輸入和輸出類型如下所示:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

例子:WordCount v1.0

在深入細節之前,讓我們先看一個Map/Reduce的應用示例,以便對它們的工作方式有一個初步的認識。

WordCount是一個簡單的應用,它可以計算出指定數據集中每一個單詞出現的次數。

這個應用適用于 單機模式, 偽分布式模式完全分布式模式 三種Hadoop安裝方式。

源代碼

WordCount.java
1. package org.myorg;
2.
3. import java.io.IOException;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10. import org.apache.hadoop.util.*;
11.
12. public class WordCount {
13.
14.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
15.      private final static IntWritable one = new IntWritable(1);
16.      private Text word = new Text();
17.
18.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19.        String line = value.toString();
20.        StringTokenizer tokenizer = new StringTokenizer(line);
21.        while (tokenizer.hasMoreTokens()) {
22.          word.set(tokenizer.nextToken());
23.          output.collect(word, one);
24.        }
25.      }
26.    }
27.
28.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
29.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30.        int sum = 0;
31.        while (values.hasNext()) {
32.          sum += values.next().get();
33.        }
34.        output.collect(key, new IntWritable(sum));
35.      }
36.    }
37.
38.    public static void main(String[] args) throws Exception {
39.      JobConf conf = new JobConf(WordCount.class);
40.      conf.setJobName("wordcount");
41.
42.      conf.setOutputKeyClass(Text.class);
43.      conf.setOutputValueClass(IntWritable.class);
44.
45.      conf.setMapperClass(Map.class);
46.      conf.setCombinerClass(Reduce.class);
47.      conf.setReducerClass(Reduce.class);
48.
49.      conf.setInputFormat(TextInputFormat.class);
50.      conf.setOutputFormat(TextOutputFormat.class);
51.
52.      FileInputFormat.setInputPaths(conf, new Path(args[0]));
53.      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55.      JobClient.runJob(conf);
57.    }
58. }
59.

用法

假設環境變量HADOOP_HOME對應安裝時的根目錄,HADOOP_VERSION對應Hadoop的當前安裝版本,編譯WordCount.java來創建jar包,可如下操作:

$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .

假設:

  • /usr/joe/wordcount/input - 是HDFS中的輸入路徑
  • /usr/joe/wordcount/output - 是HDFS中的輸出路徑

用示例文本文件做為輸入:

$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

運行應用程序:

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

輸出是:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

應用程序能夠使用-files選項來指定一個由逗號分隔的路徑列表,這些路徑是task的當前工作目錄。使用選項-libjars可以向map和reduce的classpath中添加jar包。使用-archives選項程序可以傳遞檔案文件做為參數,這些檔案文件會被解壓并且在task的當前工作目錄下會創建一個指向解壓生成的目錄的符號鏈接(以壓縮包的名字命名)。 有關命令行選項的更多細節請參考 Commands manual。

使用-libjars-files運行wordcount例子:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output

解釋

WordCount應用程序非常直截了當。

Mapper(14-26行)中的map方法(18-25行)通過指定的 TextInputFormat(49行)一次處理一行。然后,它通過StringTokenizer 以空格為分隔符將一行切分為若干tokens,之后,輸出< <word>, 1> 形式的鍵值對。

對于示例中的第一個輸入,map輸出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二個輸入,map輸出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

關于組成一個指定作業的map數目的確定,以及如何以更精細的方式去控制這些map,我們將在教程的后續部分學習到更多的內容。

WordCount還指定了一個combiner (46行)。因此,每次map運行之后,會對輸出按照key進行排序,然后把輸出傳遞給本地的combiner(按照作業的配置與Reducer一樣),進行本地聚合。

第一個map的輸出是:
< Bye, 1>
< Hello, 1>
< World, 2>

第二個map的輸出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>

Reducer(28-36行)中的reduce方法(29-35行) 僅是將每個key(本例中就是單詞)出現的次數求和。

因此這個作業的輸出就是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

代碼中的run方法中指定了作業的幾個方面, 例如:通過命令行傳遞過來的輸入/輸出路徑、key/value的類型、輸入/輸出的格式等等JobConf中的配置信息。隨后程序調用了JobClient.runJob(55行)來提交作業并且監控它的執行。

我們將在本教程的后續部分學習更多的關于JobConf, JobClient, Tool和其他接口及類(class)。

Map/Reduce - 用戶界面

這部分文檔為用戶將會面臨的Map/Reduce框架中的各個環節提供了適當的細節。這應該會幫助用戶更細粒度地去實現、配置和調優作業。然而,請注意每個類/接口的javadoc文檔提供最全面的文檔;本文只是想起到指南的作用。

我們會先看看MapperReducer接口。應用程序通?;嵬ü峁?SPAN class=codefrag>map和reduce方法來實現它們。

然后,我們會討論其他的核心接口,其中包括: JobConf,JobClient,Partitioner, OutputCollector,Reporter, InputFormat,OutputFormat等等。

最后,我們將通過討論框架中一些有用的功能點(例如:DistributedCache, IsolationRunner等等)來收尾。

核心功能描述

應用程序通?;嵬ü峁?SPAN class=codefrag>map和reduce來實現 MapperReducer接口,它們組成作業的核心。

Mapper

Mapper將輸入鍵值對(key/value pair)映射到一組中間格式的鍵值對集合。

Map是一類將輸入記錄集轉換為中間格式記錄集的獨立任務。 這種轉換的中間格式記錄集不需要與輸入記錄集的類型一致。一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對。

Hadoop Map/Reduce框架為每一個InputSplit產生一個map任務,而每個InputSplit是由該作業的InputFormat產生的。

概括地說,對Mapper的實現者需要重寫 JobConfigurable.configure(JobConf)方法,這個方法需要傳遞一個JobConf參數,目的是完成Mapper的初始化工作。然后,框架為這個任務的InputSplit中每個鍵值對調用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。應用程序可以通過重寫Closeable.close()方法來執行相應的清理工作。

輸出鍵值對不需要與輸入鍵值對的類型一致。一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對。通過調用 OutputCollector.collect(WritableComparable,Writable)可以收集輸出的鍵值對。

應用程序可以使用Reporter報告進度,設定應用級別的狀態消息,更新Counters(計數器),或者僅是表明自己運行正常。

框架隨后會把與一個特定key關聯的所有中間過程的值(value)分成組,然后把它們傳給Reducer以產出最終的結果。用戶可以通過 JobConf.setOutputKeyComparatorClass(Class)來指定具體負責分組的 Comparator。

Mapper的輸出被排序后,就被劃分給每個Reducer。分塊的總數目和一個作業的reduce任務的數目是一樣的。用戶可以通過實現自定義的 Partitioner來控制哪個key被分配給哪個 Reducer。

用戶可選擇通過 JobConf.setCombinerClass(Class)指定一個combiner,它負責對中間過程的輸出進行本地的聚集,這會有助于降低從MapperReducer數據傳輸量。

這些被排好序的中間過程的輸出結果保存的格式是(key-len, key, value-len, value),應用程序可以通過JobConf控制對這些中間結果是否進行壓縮以及怎么壓縮,使用哪種 CompressionCodec。

需要多少個Map?

Map的數目通常是由輸入數據的大小決定的,一般就是所有輸入文件的總塊(block)數。

Map正常的并行規模大致是每個節點(node)大約10到100個map,對于CPU 消耗較小的map任務可以設到300個左右。由于每個任務初始化需要一定的時間,因此,比較合理的情況是map執行的時間至少超過1分鐘。

這樣,如果你輸入10TB的數據,每個塊(block)的大小是128MB,你將需要大約82,000個map來完成任務,除非使用 setNumMapTasks(int)(注意:這里僅僅是對框架進行了一個提示(hint),實際決定因素見這里)將這個數值設置得更高。

Reducer

Reducer將與一個key關聯的一組中間數值集歸約(reduce)為一個更小的數值集。

用戶可以通過 JobConf.setNumReduceTasks(int)設定一個作業中reduce任務的數目。

概括地說,對Reducer的實現者需要重寫 JobConfigurable.configure(JobConf)方法,這個方法需要傳遞一個JobConf參數,目的是完成Reducer的初始化工作。然后,框架為成組的輸入數據中的每個<key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,應用程序可以通過重寫Closeable.close()來執行相應的清理工作。

Reducer有3個主要階段:shuffle、sort和reduce。

Shuffle

Reducer的輸入就是Mapper已經排好序的輸出。在這個階段,框架通過HTTP為每個Reducer獲得所有Mapper輸出中與之相關的分塊。

Sort

這個階段,框架將按照key的值對Reducer的輸入進行分組 (因為不同mapper的輸出中可能會有相同的key)。

Shuffle和Sort兩個階段是同時進行的;map的輸出也是一邊被取回一邊被合并的。

Secondary Sort

如果需要中間過程對key的分組規則和reduce前對key的分組規則不同,那么可以通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中間過程的key如何被分組,所以結合兩者可以實現按值的二次排序。

Reduce

在這個階段,框架為已分組的輸入數據中的每個 <key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。

Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統的。

應用程序可以使用Reporter報告進度,設定應用程序級別的狀態消息,更新Counters(計數器),或者僅是表明自己運行正常。

Reducer的輸出是沒有排序的。

需要多少個Reduce?

Reduce的數目建議是0.951.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。

用0.95,所有reduce可以在maps一完成時就立刻啟動,開始傳輸map的輸出結果。用1.75,速度快的節點可以在完成第一輪reduce任務后,可以開始第二輪,這樣可以得到比較好的負載均衡的效果。

增加reduce的數目會增加整個框架的開銷,但可以改善負載均衡,降低由于執行失敗帶來的負面影響。

上述比例因子比整體數目稍小一些是為了給框架中的推測性任務(speculative-tasks) 或失敗的任務預留一些reduce的資源。

無Reducer

如果沒有歸約要進行,那么設置reduce任務的數目為是合法的。

這種情況下,map任務的輸出會直接被寫入由 setOutputPath(Path)指定的輸出路徑??蚣茉詘閹切慈?SPAN class=codefrag>FileSystem之前沒有對它們進行排序。

Partitioner

Partitioner用于劃分鍵值空間(key space)。

Partitioner負責控制map輸出結果key的分割。Key(或者一個key子集)被用于產生分區,通常使用的是Hash函數。分區的數目與一個作業的reduce任務的數目是一樣的。因此,它控制將中間過程的key(也就是這條記錄)應該發送給m個reduce任務中的哪一個來進行reduce操作。

HashPartitioner是默認的 Partitioner。

Reporter

Reporter是用于Map/Reduce應用程序報告進度,設定應用級別的狀態消息, 更新Counters(計數器)的機制。

MapperReducer的實現可以利用Reporter 來報告進度,或者僅是表明自己運行正常。在那種應用程序需要花很長時間處理個別鍵值對的場景中,這種機制是很關鍵的,因為框架可能會以為這個任務超時了,從而將它強行殺死。另一個避免這種情況發生的方式是,將配置參數mapred.task.timeout設置為一個足夠高的值(或者干脆設置為零,則沒有超時限制了)。

應用程序可以用Reporter來更新Counter(計數器)。

OutputCollector

OutputCollector是一個Map/Reduce框架提供的用于收集 MapperReducer輸出數據的通用機制 (包括中間輸出結果和作業的輸出結果)。

Hadoop Map/Reduce框架附帶了一個包含許多實用型的mapper、reducer和partitioner 的類庫。

作業配置

JobConf代表一個Map/Reduce作業的配置。

JobConf是用戶向Hadoop框架描述一個Map/Reduce作業如何執行的主要接口??蚣芑嵐湊?SPAN class=codefrag>JobConf描述的信息忠實地去嘗試完成這個作業,然而:

  • 一些參數可能會被管理者標記為 final,這意味它們不能被更改。
  • 一些作業的參數可以被直截了當地進行設置(例如: setNumReduceTasks(int)),而另一些參數則與框架或者作業的其他參數之間微妙地相互影響,并且設置起來比較復雜(例如: setNumMapTasks(int))。

通常,JobConf會指明Mapper、Combiner(如果有的話)、 Partitioner、Reducer、InputFormatOutputFormat的具體實現。JobConf還能指定一組輸入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及輸出文件應該寫在哪兒 (setOutputPath(Path))。

JobConf可選擇地對作業設置一些高級選項,例如:設置Comparator; 放到DistributedCache上的文件;中間結果或者作業輸出結果是否需要壓縮以及怎么壓縮; 利用用戶提供的腳本(setMapDebugScript(String)/setReduceDebugScript(String)) 進行調試;作業是否允許預防性(speculative)任務的執行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每個任務最大的嘗試次數 (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一個作業能容忍的任務失敗的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。

當然,用戶能使用 set(String, String)/get(String, String) 來設置或者取得應用程序需要的任意參數。然而,DistributedCache的使用是面向大規模只讀數據的。

任務的執行和環境

TaskTracker是在一個單獨的jvm上以子進程的形式執行 Mapper/Reducer任務(Task)的。

子任務會繼承父TaskTracker的環境。用戶可以通過JobConf中的 mapred.child.java.opts配置參數來設定子jvm上的附加選項,例如: 通過-Djava.library.path=<> 將一個非標準路徑設為運行時的鏈接用以搜索共享庫,等等。如果mapred.child.java.opts包含一個符號@[email protected], 它會被替換成map/reduce的taskid的值。

下面是一個包含多個參數和替換的例子,其中包括:記錄jvm GC日志; JVM JMX代理程序以無密碼的方式啟動,這樣它就能連接到jconsole上,從而可以查看子進程的內存和線程,得到線程的dump;還把子jvm的最大堆尺寸設置為512MB, 并為子jvm的java.library.path添加了一個附加路徑。

<property>
  <name>mapred.child.java.opts</name>
  <value>
     -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@[email protected]
     -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

用戶或管理員也可以使用mapred.child.ulimit設定運行的子任務的最大虛擬內存。mapred.child.ulimit的值以(KB)為單位,并且必須大于或等于-Xmx參數傳給JavaVM的值,否則VM會無法啟動。

注意:mapred.child.java.opts只用于設置task tracker啟動的子任務。為守護進程設置內存選項請查看 cluster_setup.html

${mapred.local.dir}/taskTracker/是task tracker的本地目錄, 用于創建本地緩存和job。它可以指定多個目錄(跨越多個磁盤),文件會半隨機的保存到本地路徑下的某個目錄。當job啟動時,task tracker根據配置文檔創建本地job目錄,目錄結構如以下所示:

  • ${mapred.local.dir}/taskTracker/archive/ :分布式緩存。這個目錄保存本地的分布式緩存。因此本地分布式緩存是在所有task和job間共享的。
  • ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目錄。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目錄。各個任務可以使用這個空間做為暫存空間,用于它們之間共享文件。這個目錄通過job.local.dir 參數暴露給用戶。這個路徑可以通過API JobConf.getJobLocalDir()來訪問。它也可以被做為系統屬性獲得。因此,用戶(比如運行streaming)可以調用System.getProperty("job.local.dir")獲得該目錄。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路徑,用于存放作業的jar文件和展開的jar。job.jar是應用程序的jar文件,它會被自動分發到各臺機器,在task啟動前會被自動展開。使用api JobConf.getJar() 函數可以得到job.jar的位置。使用JobConf.getJar().getParent()可以訪問存放展開的jar包的目錄。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一個job.xml文件,本地的通用的作業配置文件。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每個任務有一個目錄task-id,它里面有如下的目錄結構:
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一個job.xml文件,本地化的任務作業配置文件。任務本地化是指為該task設定特定的屬性值。這些值會在下面具體說明。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一個存放中間過程的輸出文件的目錄。它保存了由framwork產生的臨時map reduce數據,比如map的輸出文件等。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的當前工作目錄。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的臨時目錄。(用戶可以設定屬性mapred.child.tmp 來為map和reduce task設定臨時目錄。缺省值是./tmp。如果這個值不是絕對路徑, 它會把task的工作路徑加到該路徑前面作為task的臨時文件路徑。如果這個值是絕對路徑則直接使用這個值。 如果指定的目錄不存在,會自動創建該目錄。之后,按照選項 -Djava.io.tmpdir='臨時文件的絕對路徑'執行java子任務。 pipes和streaming的臨時文件路徑是通過環境變量TMPDIR='the absolute path of the tmp dir'設定的)。 如果mapred.child.tmp./tmp值,這個目錄會被創建。

下面的屬性是為每個task執行時使用的本地參數,它們保存在本地化的任務作業配置文件里:

名稱 類型 描述
mapred.job.id String job id
mapred.jar String job目錄下job.jar的位置
job.local.dir String job指定的共享存儲空間
mapred.tip.id String task id
mapred.task.id String task嘗試id
mapred.task.is.map boolean 是否是map task
mapred.task.partition int task在job中的id
map.input.file String map讀取的文件名
map.input.start long map輸入的數據塊的起始位置偏移
map.input.length long map輸入的數據塊的字節數
mapred.work.output.dir String task臨時輸出目錄

task的標準輸出和錯誤輸出流會被讀到TaskTracker中,并且記錄到 ${HADOOP_LOG_DIR}/userlogs

DistributedCache 可用于map或reduce task中分發jar包和本地庫。子jvm總是把 當前工作目錄 加到 java.library.pathLD_LIBRARY_PATH。 因此,可以通過 System.loadLibrarySystem.load裝載緩存的庫。有關使用分布式緩存加載共享庫的細節請參考 native_libraries.html

作業的提交與監控

JobClient是用戶提交的作業與JobTracker交互的主要接口。

JobClient 提供提交作業,追蹤進程,訪問子任務的日志記錄,獲得Map/Reduce集群狀態信息等功能。

作業提交過程包括:

  1. 檢查作業輸入輸出樣式細節
  2. 為作業計算InputSplit值。
  3. 如果需要的話,為作業的DistributedCache建立必須的統計信息。
  4. 拷貝作業的jar包和配置文件到FileSystem上的Map/Reduce系統目錄下。
  5. 提交作業到JobTracker并且監控它的狀態。

作業的歷史文件記錄到指定目錄的"_logs/history/"子目錄下。這個指定目錄由hadoop.job.history.user.location設定,默認是作業輸出的目錄。因此默認情況下,文件會存放在mapred.output.dir/_logs/history目錄下。用戶可以設置hadoop.job.history.user.locationnone來停止日志記錄。

用戶使用下面的命令可以看到在指定目錄下的歷史日志記錄的摘要。
$ bin/hadoop job -history output-dir
這個命令會打印出作業的細節,以及失敗的和被殺死的任務細節。
要查看有關作業的更多細節例如成功的任務、每個任務嘗試的次數(task attempt)等,可以使用下面的命令
$ bin/hadoop job -history all output-dir

用戶可以使用 OutputLogFilter 從輸出目錄列表中篩選日志文件。

一般情況,用戶利用JobConf創建應用程序并配置作業屬性, 然后用 JobClient 提交作業并監視它的進程。

作業的控制

有時候,用一個單獨的Map/Reduce作業并不能完成一個復雜的任務,用戶也許要鏈接多個Map/Reduce作業才行。這是容易實現的,因為作業通常輸出到分布式文件系統上的,所以可以把這個作業的輸出作為下一個作業的輸入實現串聯。

然而,這也意味著,確保每一作業完成(成功或失敗)的責任就直接落在了客戶身上。在這種情況下,可以用的控制作業的選項有:

作業的輸入

InputFormat 為Map/Reduce作業描述輸入的細節規范。

Map/Reduce框架根據作業的InputFormat來:

  1. 檢查作業輸入的有效性。
  2. 把輸入文件切分成多個邏輯InputSplit實例, 并把每一實例分別分發給一個 Mapper。
  3. 提供RecordReader的實現,這個RecordReader從邏輯InputSplit中獲得輸入記錄, 這些記錄將由Mapper處理。

基于文件的InputFormat實現(通常是 FileInputFormat的子類) 默認行為是按照輸入文件的字節大小,把輸入數據切分成邏輯分塊(logical InputSplit )。 其中輸入文件所在的FileSystem的數據塊尺寸是分塊大小的上限。下限可以設置mapred.min.split.size 的值。

考慮到邊界情況,對于很多應用程序來說,很明顯按照文件大小進行邏輯分割是不能滿足需求的。 在這種情況下,應用程序需要實現一個RecordReader來處理記錄的邊界并為每個任務提供一個邏輯分塊的面向記錄的視圖。

TextInputFormat 是默認的InputFormat。

如果一個作業的InputformatTextInputFormat, 并且框架檢測到輸入文件的后綴是.gz.lzo,就會使用對應的CompressionCodec自動解壓縮這些文件。 但是需要注意,上述帶后綴的壓縮文件不會被切分,并且整個壓縮文件會分給一個mapper來處理。

InputSplit

InputSplit 是一個單獨的Mapper要處理的數據塊。

一般的InputSplit 是字節樣式輸入,然后由RecordReader處理并轉化成記錄樣式。

FileSplit 是默認的InputSplit。 它把 map.input.file 設定為輸入文件的路徑,輸入文件是邏輯分塊文件。

RecordReader

RecordReaderInputSlit讀入<key, value>對。

一般的,RecordReader 把由InputSplit 提供的字節樣式的輸入文件,轉化成由Mapper處理的記錄樣式的文件。 因此RecordReader負責處理記錄的邊界情況和把數據表示成keys/values對形式。

作業的輸出

OutputFormat 描述Map/Reduce作業的輸出樣式。

Map/Reduce框架根據作業的OutputFormat來:

  1. 檢驗作業的輸出,例如檢查輸出路徑是否已經存在。
  2. 提供一個RecordWriter的實現,用來輸出作業結果。 輸出文件保存在FileSystem上。

TextOutputFormat是默認的 OutputFormat。

任務的Side-Effect File

在一些應用程序中,子任務需要產生一些side-file,這些文件與作業實際輸出結果的文件不同。

在這種情況下,同一個Mapper或者Reducer的兩個實例(比如預防性任務)同時打開或者寫 FileSystem上的同一文件就會產生沖突。因此應用程序在寫文件的時候需要為每次任務嘗試(不僅僅是每次任務,每個任務可以嘗試執行很多次)選取一個獨一無二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。

為了避免沖突,Map/Reduce框架為每次嘗試執行任務都建立和維護一個特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目錄,這個目錄位于本次嘗試執行任務輸出結果所在的FileSystem上,可以通過 ${mapred.work.output.dir}來訪問這個子目錄。 對于成功完成的任務嘗試,只有${mapred.output.dir}/_temporary/_${taskid}下的文件會移動${mapred.output.dir}。當然,框架會丟棄那些失敗的任務嘗試的子目錄。這種處理過程對于應用程序來說是完全透明的。

在任務執行期間,應用程序在寫文件時可以利用這個特性,比如 通過 FileOutputFormat.getWorkOutputPath()獲得${mapred.work.output.dir}目錄, 并在其下創建任意任務執行時所需的side-file,框架在任務嘗試成功時會馬上移動這些文件,因此不需要在程序內為每次任務嘗試選取一個獨一無二的名字。

注意:在每次任務嘗試執行期間,${mapred.work.output.dir} 的值實際上是 ${mapred.output.dir}/_temporary/_{$taskid},這個值是Map/Reduce框架創建的。 所以使用這個特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路徑下創建side-file即可。

對于只使用map不使用reduce的作業,這個結論也成立。這種情況下,map的輸出結果直接生成到HDFS上。

RecordWriter

RecordWriter 生成<key, value> 對到輸出文件。

RecordWriter的實現把作業的輸出結果寫到 FileSystem。

其他有用的特性

Counters

Counters 是多個由Map/Reduce框架或者應用程序定義的全局計數器。 每一個Counter可以是任何一種 Enum類型。同一特定Enum類型的Counter可以匯集到一個組,其類型為Counters.Group。

應用程序可以定義任意(Enum類型)的Counters并且可以通過 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架會匯總這些全局counters。

DistributedCache

DistributedCache 可將具體應用相關的、大尺寸的、只讀的文件有效地分布放置。

DistributedCache 是Map/Reduce框架提供的功能,能夠緩存應用程序所需的文件 (包括文本,檔案文件,jar文件等)。

應用程序在JobConf中通過url(hdfs://)指定需要被緩存的文件。 DistributedCache假定由hdfs://格式url指定的文件已經在 FileSystem上了。

Map-Redcue框架在作業所有任務執行之前會把必要的文件拷貝到slave節點上。 它運行高效是因為每個作業的文件只拷貝一次并且為那些沒有文檔的slave節點緩存文檔。

DistributedCache 根據緩存文檔修改的時間戳進行追蹤。 在作業執行期間,當前應用程序或者外部程序不能修改緩存文件。

distributedCache可以分發簡單的只讀數據或文本文件,也可以分發復雜類型的文件例如歸檔文件和jar文件。歸檔文件(zip,tar,tgz和tar.gz文件)在slave節點上會被解檔(un-archived)。 這些文件可以設置執行權限。

用戶可以通過設置mapred.cache.{files|archives}來分發文件。 如果要分發多個文件,可以使用逗號分隔文件所在路徑。也可以利用API來設置該屬性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通過命令行選項 -cacheFile/-cacheArchive 分發文件。

用戶可以通過 DistributedCache.createSymlink(Configuration)方法讓DistributedCache當前工作目錄下創建到緩存文件的符號鏈接。 或者通過設置配置文件屬性mapred.create.symlinkyes。 分布式緩存會截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當前工作目錄會有名為lib.so的鏈接, 它會鏈接分布式緩存中的lib.so.1。

DistributedCache可在map/reduce任務中作為 一種基礎軟件分發機制使用。它可以被用于分發jar包和本地庫(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)DistributedCache.addFileToClassPath(Path, Configuration) API能夠被用于 緩存文件和jar包,并把它們加入子jvm的classpath。也可以通過設置配置文檔里的屬性 mapred.job.classpath.{files|archives}達到相同的效果?;捍嫖募捎糜詵址⒑妥霸乇鏡乜?。

Tool

Tool 接口支持處理常用的Hadoop命令行選項。

Tool 是Map/Reduce工具或應用的標準。應用程序應只處理其定制參數, 要把標準命令行選項通過 ToolRunner.run(Tool, String[]) 委托給 GenericOptionsParser處理。

Hadoop命令行的常用選項有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>

IsolationRunner

IsolationRunner 是幫助調試Map/Reduce程序的工具。

使用IsolationRunner的方法是,首先設置 keep.failed.tasks.files屬性為true (同時參考keep.tasks.files.pattern)。

然后,登錄到任務運行失敗的節點上,進入 TaskTracker的本地路徑運行 IsolationRunner
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

IsolationRunner會把失敗的任務放在單獨的一個能夠調試的jvm上運行,并且采用和之前完全一樣的輸入數據。

Profiling

Profiling是一個工具,它使用內置的java profiler工具進行分析獲得(2-3個)map或reduce樣例運行分析報告。

用戶可以通過設置屬性mapred.task.profile指定系統是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修改屬性值。如果設為true, 則開啟profiling功能。profiler信息保存在用戶日志目錄下。缺省情況,profiling功能是關閉的。

如果用戶設定使用profiling功能,可以使用配置文檔里的屬性 mapred.task.profile.{maps|reduces} 設置要profile map/reduce task的范圍。設置該屬性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范圍的缺省值是0-2。

用戶可以通過設定配置文檔里的屬性mapred.task.profile.params 來指定profiler配置參數。修改屬性要使用api JobConf.setProfileParams(String)。當運行task時,如果字符串包含%s。 它會被替換成profileing的輸出文件名。這些參數會在命令行里傳遞到子JVM中。缺省的profiling 參數是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。

調試

Map/Reduce框架能夠運行用戶提供的用于調試的腳本程序。 當map/reduce任務失敗時,用戶可以通過運行腳本在任務日志(例如任務的標準輸出、標準錯誤、系統日志以及作業配置文件)上做后續處理工作。用戶提供的調試腳本程序的標準輸出和標準錯誤會輸出為診斷文件。如果需要的話這些輸出結果也可以打印在用戶界面上。

在接下來的章節,我們討論如何與作業一起提交調試腳本。為了提交調試腳本, 首先要把這個腳本分發出去,而且還要在配置文件里設置。

如何分發腳本文件:

用戶要用 DistributedCache 機制來分發鏈接腳本文件

如何提交腳本:

一個快速提交調試腳本的方法是分別為需要調試的map任務和reduce任務設置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 屬性的值。這些屬性也可以通過 JobConf.setMapDebugScript(String) JobConf.setReduceDebugScript(String) API來設置。對于streaming, 可以分別為需要調試的map任務和reduce任務使用命令行選項-mapdebug 和 -reducedegug來提交調試腳本。

腳本的參數是任務的標準輸出、標準錯誤、系統日志以及作業配置文件。在運行map/reduce失敗的節點上運行調試命令是:
$script $stdout $stderr $syslog $jobconf

Pipes 程序根據第五個參數獲得c++程序名。 因此調試pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program

默認行為

對于pipes,默認的腳本會用gdb處理core dump, 打印 stack trace并且給出正在運行線程的信息。

JobControl

JobControl是一個工具,它封裝了一組Map/Reduce作業以及他們之間的依賴關系。

數據壓縮

Hadoop Map/Reduce框架為應用程序的寫入文件操作提供壓縮工具,這些工具可以為map輸出的中間數據和作業最終輸出數據(例如reduce的輸出)提供支持。它還附帶了一些 CompressionCodec的實現,比如實現了 zliblzo壓縮算法。 Hadoop同樣支持gzip文件格式。

考慮到性能問題(zlib)以及Java類庫的缺失(lzo)等因素,Hadoop也為上述壓縮解壓算法提供本地庫的實現。更多的細節請參考 這里。

中間輸出

應用程序可以通過 JobConf.setCompressMapOutput(boolean)api控制map輸出的中間結果,并且可以通過 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。

作業輸出

應用程序可以通過 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制輸出是否需要壓縮并且可以使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。

如果作業輸出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,來設定 SequenceFile.CompressionType (i.e. RECORD / BLOCK - 默認是RECORD)。

例子:WordCount v2.0

這里是一個更全面的WordCount例子,它使用了我們已經討論過的很多Map/Reduce框架提供的功能。

運行這個例子需要HDFS的某些功能,特別是 DistributedCache相關功能。因此這個例子只能運行在 偽分布式 或者 完全分布式模式的 Hadoop上。

源代碼

WordCount.java
1. package org.myorg;
2.
3. import java.io.*;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.filecache.DistributedCache;
8. import org.apache.hadoop.conf.*;
9. import org.apache.hadoop.io.*;
10. import org.apache.hadoop.mapred.*;
11. import org.apache.hadoop.util.*;
12.
13. public class WordCount extends Configured implements Tool {
14.
15.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
16.
17.      static enum Counters { INPUT_WORDS }
18.
19.      private final static IntWritable one = new IntWritable(1);
20.      private Text word = new Text();
21.
22.      private boolean caseSensitive = true;
23.      private Set<String> patternsToSkip = new HashSet<String>();
24.
25.      private long numRecords = 0;
26.      private String inputFile;
27.
28.      public void configure(JobConf job) {
29.        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
30.        inputFile = job.get("map.input.file");
31.
32.        if (job.getBoolean("wordcount.skip.patterns", false)) {
33.          Path[] patternsFiles = new Path[0];
34.          try {
35.            patternsFiles = DistributedCache.getLocalCacheFiles(job);
36.          } catch (IOException ioe) {
37.            System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
38.          }
39.          for (Path patternsFile : patternsFiles) {
40.            parseSkipFile(patternsFile);
41.          }
42.        }
43.      }
44.
45.      private void parseSkipFile(Path patternsFile) {
46.        try {
47.          BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
48.          String pattern = null;
49.          while ((pattern = fis.readLine()) != null) {
50.            patternsToSkip.add(pattern);
51.          }
52.        } catch (IOException ioe) {
53.          System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
54.        }
55.      }
56.
57.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
58.        String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
59.
60.        for (String pattern : patternsToSkip) {
61.          line = line.replaceAll(pattern, "");
62.        }
63.
64.        StringTokenizer tokenizer = new StringTokenizer(line);
65.        while (tokenizer.hasMoreTokens()) {
66.          word.set(tokenizer.nextToken());
67.          output.collect(word, one);
68.          reporter.incrCounter(Counters.INPUT_WORDS, 1);
69.        }
70.
71.        if ((++numRecords % 100) == 0) {
72.          reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
73.        }
74.      }
75.    }
76.
77.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
78.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
79.        int sum = 0;
80.        while (values.hasNext()) {
81.          sum += values.next().get();
82.        }
83.        output.collect(key, new IntWritable(sum));
84.      }
85.    }
86.
87.    public int run(String[] args) throws Exception {
88.      JobConf conf = new JobConf(getConf(), WordCount.class);
89.      conf.setJobName("wordcount");
90.
91.      conf.setOutputKeyClass(Text.class);
92.      conf.setOutputValueClass(IntWritable.class);
93.
94.      conf.setMapperClass(Map.class);
95.      conf.setCombinerClass(Reduce.class);
96.      conf.setReducerClass(Reduce.class);
97.
98.      conf.setInputFormat(TextInputFormat.class);
99.      conf.setOutputFormat(TextOutputFormat.class);
100.
101.      List<String> other_args = new ArrayList<String>();
102.      for (int i=0; i < args.length; ++i) {
103.        if ("-skip".equals(args[i])) {
104.          DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
105.          conf.setBoolean("wordcount.skip.patterns", true);
106.        } else {
107.          other_args.add(args[i]);
108.        }
109.      }
110.
111.      FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
112.      FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
113.
114.      JobClient.runJob(conf);
115.      return 0;
116.    }
117.
118.    public static void main(String[] args) throws Exception {
119.      int res = ToolRunner.run(new Configuration(), new WordCount(), args);
120.      System.exit(res);
121.    }
122. }
123.

運行樣例

輸入樣例:

$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

運行程序:

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

輸出:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

注意此時的輸入與第一個版本的不同,輸出的結果也有不同。

現在通過DistributedCache插入一個模式文件,文件中保存了要被忽略的單詞模式。

$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

再運行一次,這次使用更多的選項:

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

應該得到這樣的輸出:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

再運行一次,這一次關閉大小寫敏感性(case-sensitivity):

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

輸出:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

程序要點

通過使用一些Map/Reduce框架提供的功能,WordCount的第二個版本在原始版本基礎上有了如下的改進:

  • 展示了應用程序如何在Mapper (和Reducer)中通過configure方法 修改配置參數(28-43行)。
  • 展示了作業如何使用DistributedCache 來分發只讀數據。 這里允許用戶指定單詞的模式,在計數時忽略那些符合模式的單詞(104行)。
  • 展示Tool接口和GenericOptionsParser處理Hadoop命令行選項的功能 (87-116, 119行)。
  • 展示了應用程序如何使用Counters(68行),如何通過傳遞給map(和reduce) 方法的Reporter實例來設置應用程序的狀態信息(72行)。

Java和JNI是Sun Microsystems, Inc.在美國和其它國家的注冊商標。

推薦 打印 | 錄入: | 閱讀:
相關新聞      
本文評論   
評論聲明
  • 尊重網上道德,遵守中華人民共和國的各項有關法律法規
  • 承擔一切因您的行為而直接或間接導致的民事或刑事法律責任
  • 本站管理人員有權保留或刪除其管轄留言中的任意內容
  • 本站有權在網站內轉載或引用您的評論
  • 參與本評論即表明您已經閱讀并接受上述條款
全天北京pk10冠军计划 pk拾有计划软件吗 斗地主二打一下载 球探比分网 时时彩源码 财神爷pk10计划安卓 七星彩专家预测号码 北京pk10赛车在线计划 北京pk10技巧图 黑龙江时时最快开奖 重庆时时赢面大赌法 极速时时网站是多少 四川时时怎么玩 全天pk10计划两期稳定版 pk10买9码杀一码好方法 快3三不同号投注技巧