このドキュメントでは、Hadoop MapReduce フレームワークのユーザー向け機能を包括的に説明し、チュートリアルとして機能します。
Hadoop がインストール、構成、および実行されていることを確認してください。詳細は
Hadoop MapReduce は、コモディティハードウェアの大規模クラスタ (数千ノード) で、信頼性が高くフォールトトレラントな方法で、大量のデータ (マルチテラバイトデータセット) を並列処理するアプリケーションを簡単に作成するためのソフトウェアフレームワークです。
MapReduce *ジョブ* は通常、入力データセットを独立したチャンクに分割し、*マップタスク* によって完全に並列に処理されます。フレームワークはマップの出力をソートし、*リデュースタスク* への入力とします。通常、ジョブの入力と出力はどちらもファイルシステムに保存されます。フレームワークは、タスクのスケジューリング、監視、および失敗したタスクの再実行を担当します。
通常、計算ノードとストレージノードは同じです。つまり、MapReduce フレームワークと Hadoop 分散ファイルシステム ( HDFS アーキテクチャガイド を参照) は同じノードセットで実行されています。この構成により、フレームワークはデータが既に存在するノードでタスクを効果的にスケジュールできるため、クラスタ全体の集約帯域幅が非常に高くなります。
MapReduce フレームワークは、単一のマスター `ResourceManager`、クラスタノードごとに 1 つのワーカー `NodeManager`、アプリケーションごとに `MRAppMaster` で構成されます ( YARN アーキテクチャガイド を参照)。
少なくとも、アプリケーションは入力/出力の場所を指定し、適切なインターフェース/抽象クラスの実装を介して *マップ* および *リデュース* 関数を提供します。これらと他のジョブパラメータは、*ジョブ構成* を構成します。
次に、Hadoop *ジョブクライアント* はジョブ (jar/実行可能ファイルなど) と構成を `ResourceManager` に送信します。`ResourceManager` は、ソフトウェア/構成をワーカーに配布し、タスクをスケジュールして監視し、ステータスと診断情報をジョブクライアントに提供する責任を負います。
Hadoop フレームワークは Java™ で実装されていますが、MapReduce アプリケーションは Java で記述する必要はありません。
Hadoop ストリーミング は、任意の実行可能ファイル (例: シェルユーティリティ) をマッパーまたはレデューサーとして使用してジョブを作成および実行できるユーティリティです。
Hadoop Pipes は、MapReduce アプリケーションを実装するための SWIG 互換 C++ API です (JNI™ ベースではありません)。
MapReduce フレームワークは `<key, value>` ペアのみを操作します。つまり、フレームワークはジョブへの入力を `<key, value>` ペアのセットとして表示し、ジョブの出力として `<key, value>` ペアのセットを生成します。型の異なる可能性があります。
`key` および `value` クラスはフレームワークによってシリアル化可能である必要があるため、Writable インターフェースを実装する必要があります。さらに、`key` クラスは、フレームワークによるソートを容易にするために、WritableComparable インターフェースを実装する必要があります。
MapReduce ジョブの入力と出力の型
(入力) `<k1, v1> ->` **map** `-> <k2, v2> ->` **combine** `-> <k2, v2> ->` **reduce** `-> <k3, v3>` (出力)
詳細に入る前に、MapReduce アプリケーションの例を挙げて、それらがどのように機能するかを理解しましょう。
`WordCount` は、指定された入力セット内の各単語の出現回数をカウントする単純なアプリケーションです。
これは、ローカルスタンドアロン、擬似分散、または完全に分散された Hadoop インストールで動作します ( 単一ノード設定 )。
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
環境変数が次のように設定されていると仮定します
export JAVA_HOME=/usr/java/default export PATH=${JAVA_HOME}/bin:${PATH} export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
`WordCount.java` をコンパイルして jar を作成します
$ bin/hadoop com.sun.tools.javac.Main WordCount.java $ jar cf wc.jar WordCount*.class
以下を仮定します
入力としてのサンプルテキストファイル
$ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02 $ bin/hadoop fs -cat /user/joe/wordcount/input/file01 Hello World Bye World $ bin/hadoop fs -cat /user/joe/wordcount/input/file02 Hello Hadoop Goodbye Hadoop
アプリケーションを実行します
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
出力
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2
アプリケーションは、オプション `-files` を使用して、タスクの現在の作業ディレクトリに存在するパスのコンマ区切りリストを指定できます。 `-libjars` オプションを使用すると、アプリケーションはマップとリデュースのクラスパスに jar を追加できます。オプション `-archives` を使用すると、コンマ区切りのアーカイブリストを引数として渡すことができます。これらのアーカイブは解凍され、アーカイブの名前のリンクがタスクの現在の作業ディレクトリに作成されます。コマンドラインオプションの詳細については、コマンドガイド を参照してください。
`-libjars`、`-files`、`-archives` を使用した `wordcount` 例の実行
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
ここでは、myarchive.zip が配置され、「myarchive.zip」という名前のディレクトリに解凍されます。
ユーザーは、`-files` および `-archives` オプションを介して渡されるファイルとアーカイブに、# を使用して異なるシンボリック名を指定できます。
例えば、
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
ここでは、ファイル dir1/dict.txt と dir2/dict.txt には、それぞれシンボリック名 dict1 と dict2 を使用してタスクからアクセスできます。アーカイブ mytar.tgz は配置され、「tgzdir」という名前のディレクトリに解凍されます。
アプリケーションは、コマンドラインでそれぞれ -Dmapreduce.map.env、-Dmapreduce.reduce.env、-Dyarn.app.mapreduce.am.env オプションを使用して環境変数を指定することにより、Mapper、Reducer、および Application Master タスクの環境変数を指定できます。
たとえば、以下は、Mapper と Reducer に対して環境変数 FOO_VAR=bar と LIST_VAR=a,b,c を設定します。
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -Dmapreduce.map.env.FOO_VAR=bar -Dmapreduce.map.env.LIST_VAR=a,b,c -Dmapreduce.reduce.env.FOO_VAR=bar -Dmapreduce.reduce.env.LIST_VAR=a,b,c input output
WordCount
アプリケーションは非常に分かりやすいです。
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }
Mapper
の実装は、map
メソッドを介して、指定された TextInputFormat
によって提供される 1 行ずつを処理します。次に、StringTokenizer
を使用して、行を空白で区切られたトークンに分割し、< <単語>, 1>
のキーと値のペアを出力します。
指定されたサンプル入力に対して、最初のマップは以下を出力します。
< Hello, 1> < World, 1> < Bye, 1> < World, 1>
2 番目のマップは以下を出力します。
< Hello, 1> < Hadoop, 1> < Goodbye, 1> < Hadoop, 1>
指定されたジョブに対して生成されるマップの数、およびそれらをきめ細かく制御する方法については、チュートリアルの後半で詳しく説明します。
job.setCombinerClass(IntSumReducer.class);
WordCount
は combiner
も指定します。したがって、各マップの出力は、*キー*でソートされた後、ローカル集計のためにローカル combiner(ジョブ構成によると Reducer
と同じ)に渡されます。
最初のマップの出力
< Bye, 1> < Hello, 1> < World, 2>
2 番目のマップの出力
< Goodbye, 1> < Hadoop, 2> < Hello, 1>
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }
Reducer
の実装は、reduce
メソッドを介して、各キー(この例では単語)の出現回数である値を合計するだけです。
したがって、ジョブの出力は次のとおりです。
< Bye, 1> < Goodbye, 1> < Hadoop, 2> < Hello, 2> < World, 2>
main
メソッドは、Job
で、入力/出力パス(コマンドライン経由で渡される)、キー/値の型、入力/出力フォーマットなど、ジョブのさまざまな側面を指定します。次に、job.waitForCompletion
を呼び出してジョブを送信し、その進行状況を監視します。
Job
、InputFormat
、OutputFormat
、およびその他のインターフェースとクラスについては、チュートリアルの後半で詳しく説明します。
このセクションでは、MapReduce フレームワークのすべてのユーザー向け側面について、ある程度の詳細を提供します。これは、ユーザーがジョブを実装、構成、および調整するのに役立ちます。ただし、各クラス/インターフェースの javadoc が最も包括的なドキュメントであることに注意してください。これはチュートリアルとしてのみ意図されています。
まず、Mapper
と Reducer
インターフェースを見てみましょう。アプリケーションは通常、map
メソッドと reduce
メソッドを提供するためにこれらを実装します。
次に、Job
、Partitioner
、InputFormat
、OutputFormat
などの他のコアインターフェースについて説明します。
最後に、DistributedCache
、IsolationRunner
などのフレームワークの便利な機能について説明します。
アプリケーションは通常、map
メソッドと reduce
メソッドを提供するために、Mapper
インターフェースと Reducer
インターフェースを実装します。これらはジョブの中核を形成します。
Mapper は入力キーと値のペアを、中間キーと値のペアのセットにマップします。
マップは、入力レコードを中間レコードに変換する個々のタスクです。変換された中間レコードは、入力レコードと同じ型である必要はありません。指定された入力ペアは、ゼロまたは多くの出力ペアにマップされる場合があります。
Hadoop MapReduce フレームワークは、ジョブの InputFormat
によって生成された各 InputSplit
ごとに 1 つのマップタスクを生成します。
全体として、Mapper の実装は Job.setMapperClass(Class) メソッドを介してジョブに渡されます。次に、フレームワークは、そのタスクの InputSplit
内の各キーと値のペアに対して map(WritableComparable, Writable, Context) を呼び出します。アプリケーションは、cleanup(Context)
メソッドをオーバーライドして、必要なクリーンアップを実行できます。
出力ペアは、入力ペアと同じ型である必要はありません。指定された入力ペアは、ゼロまたは多くの出力ペアにマップされる場合があります。出力ペアは context.write(WritableComparable, Writable) の呼び出しで収集されます。
アプリケーションは Counter
を使用して統計情報を報告できます。
指定された出力キーに関連付けられたすべての中間値は、その後フレームワークによってグループ化され、Reducer
に渡されて最終出力が決定されます。ユーザーは、Job.setGroupingComparatorClass(Class) を介して Comparator
を指定することにより、グループ化を制御できます。
Mapper
の出力はソートされ、Reducer
ごとに分割されます。パーティションの総数は、ジョブの Reduce タスクの数と同じです。ユーザーは、カスタム Partitioner
を実装することにより、どのキー(したがってレコード)がどの Reducer
に移動するかを制御できます。
ユーザーは、Job.setCombinerClass(Class) を介して combiner
をオプションで指定して、中間出力のローカル集計を実行できます。これは、Mapper
から Reducer
に転送されるデータ量を削減するのに役立ちます。
中間ソート出力は、常に単純な(key-len、key、value-len、value)形式で格納されます。アプリケーションは、中間出力を圧縮するかどうか、およびその方法、および Configuration
を介して使用する CompressionCodec を制御できます。
マップの数は通常、入力の合計サイズ、つまり入力ファイルのブロックの総数によって決まります。
マップの適切な並列処理レベルは、ノードあたり 10〜100 マップのようです。ただし、CPU 負荷の非常に軽いマップタスクの場合は 300 マップに設定されています。タスクのセットアップには時間がかかるため、マップの実行に少なくとも 1 分かかるようにするのが最善です。
したがって、10TB の入力データが予想され、ブロックサイズが 128MB
の場合、Configuration.set(MRJobConfig.NUM_MAPS
, int)(フレームワークへのヒントのみを提供します)を使用してさらに高く設定しない限り、82,000 のマップが作成されます。
Reducer は、キーを共有する一連の中間値を、より小さな値のセットに削減します。
ジョブの Reduce の数は、ユーザーが Job.setNumReduceTasks(int) を介して設定します。
全体として、Reducer
の実装は、Job.setReducerClass(Class) メソッドを介してジョブの Job
に渡され、それをオーバーライドして初期化できます。次に、フレームワークは、グループ化された入力の各 <key, (値のリスト)>
ペアに対して reduce(WritableComparable, Iterable<Writable>, Context) メソッドを呼び出します。アプリケーションは、cleanup(Context)
メソッドをオーバーライドして、必要なクリーンアップを実行できます。
Reducer
には、シャッフル、ソート、および Reduce の 3 つの主要なフェーズがあります。
Reducer
への入力は、Mapper のソートされた出力です。このフェーズでは、フレームワークは HTTP を介してすべての Mapper の出力の関連するパーティションを取得します。
フレームワークは、このステージでキー別に Reducer
入力をグループ化します(異なる Mapper が同じキーを出力している可能性があるため)。
シャッフルフェーズとソートフェーズは同時に発生します。マップ出力がフェッチされている間、それらはマージされます。
中間キーをグループ化するための等価ルールが、削減前にキーをグループ化するためのルールと異なる必要がある場合、Job.setSortComparatorClass(Class) を介して Comparator
を指定できます。Job.setGroupingComparatorClass(Class) を使用して中間キーのグループ化方法を制御できるため、これらを組み合わせて使用して *値のセカンダリソート* をシミュレートできます。
このフェーズでは、グループ化された入力の各 <key, (値のリスト)>
ペアに対して reduce(WritableComparable, Iterable<Writable>, Context) メソッドが呼び出されます。
Reduce タスクの出力は、通常、Context.write(WritableComparable, Writable) を介して FileSystem に書き込まれます。
アプリケーションは Counter
を使用して統計情報を報告できます。
Reducer
の出力は *ソートされていません*。
Reduce の適切な数は、0.95
または 1.75
に(<*ノード数*> * <*ノードあたりの最大コンテナー数*>)を掛けた値のようです。
0.95
を使用すると、すべての Reduce がすぐに起動し、マップの完了とともに出力の転送を開始できます。1.75
を使用すると、高速ノードは最初の Reduce ラウンドを完了し、負荷分散をはるかにうまく行う Reduce の第 2 ウェーブを起動します。
Reduce の数を増やすとフレームワークのオーバーヘッドは増加しますが、負荷分散が向上し、障害のコストが削減されます。
上記のスケーリング係数は、整数よりわずかに小さく、投機的タスクと失敗したタスクのためにフレームワークにいくつかの Reduce スロットを予約しています。
削減が不要な場合、Reduce タスクの数を *ゼロ* に設定することは有効です。
この場合、マップタスクの出力は、FileOutputFormat.setOutputPath(Job, Path) によって設定された出力パスに、FileSystem
に直接送られます。フレームワークは、マップ出力を FileSystem
に書き込む前にソートしません。
Partitioner はキースペースを分割します。
Partitioner は、中間マップ出力のキーの分割を制御します。キー(またはキーのサブセット)は、通常は *ハッシュ関数* によってパーティションを導出するために使用されます。パーティションの総数は、ジョブの Reduce タスクの数と同じです。したがって、これは、中間キー(したがってレコード)が削減のために送信される m
個の Reduce タスクのどれを制御します。
HashPartitioner はデフォルトの Partitioner
です。
Counter は、MapReduce アプリケーションが統計情報を報告するための機能です。
Mapper
と Reducer
の実装は、Counter
を使用して統計情報を報告できます。
Hadoop MapReduce には、一般的に役立つ Mapper、Reducer、および Partitioner の ライブラリ がバンドルされています。
Job は MapReduce ジョブ構成を表します。
Job
は、ユーザーが実行するために Hadoop フレームワークに MapReduce ジョブを記述するための主要なインターフェースです。フレームワークは、Job
で説明されているとおりにジョブを忠実に実行しようとしますが、
一部の構成パラメーターは、管理者によって final としてマークされている場合があり(最終パラメーター を参照)、したがって変更できません。
一部のジョブパラメータは簡単に設定できます(例:Job.setNumReduceTasks(int))が、他のパラメータはフレームワークやジョブ設定の残りの部分と微妙に相互作用するため、設定がより複雑になります(例:Configuration.set(JobContext.NUM_MAPS
, int))。
Job
は通常、Mapper
、コンバイナ(存在する場合)、Partitioner
、Reducer
、InputFormat
、OutputFormat
の実装を指定するために使用されます。 FileInputFormatは、入力ファイルのセット(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))と(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String)))、および出力ファイルの書き込み先(FileOutputFormat.setOutputPath(Path))を示します。
オプションとして、Job
は、使用するComparator
、DistributedCache
に配置するファイル、中間および/またはジョブ出力を圧縮するかどうか(および方法)、ジョブタスクを_投機的_に実行できるかどうか(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean))、タスクごとの最大試行回数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))など、ジョブの他の高度な側面を指定するために使用されます。
もちろん、ユーザーはConfiguration.set(String, String)/ Configuration.get(String)を使用して、アプリケーションに必要な任意のパラメータを設定/取得できます。ただし、大量の(読み取り専用)データにはDistributedCache
を使用してください。
MRAppMaster
は、Mapper
/Reducer
_タスク_を、別のJVMのチャイルドプロセスとして実行します。
子タスクは、親MRAppMaster
の環境を継承します。ユーザーは、mapreduce.{map|reduce}.java.opts
設定パラメータとJob
の設定パラメータを介して、子JVMに追加のオプションを指定できます。たとえば、ランタイムリンカーが共有ライブラリを検索するための非標準パスを-Djava.library.path=<>
などを介して指定できます。 mapreduce.{map|reduce}.java.opts
パラメータにシンボル_@taskid@_が含まれている場合、MapReduceタスクのtaskid
の値に置き換えられます。
複数の引数と置換を使用した例を次に示します。JVM GCロギングと、パスワードなしのJVM JMXエージェントの起動を示しています。これにより、jconsoleなどが接続して、子のメモリ、スレッドを監視し、スレッドダンプを取得できます。また、マップとリデュースの子JVMの最大ヒープサイズをそれぞれ512MBと1024MBに設定します。また、子JVMのjava.library.path
に追加のパスを追加します。
<property> <name>mapreduce.map.java.opts</name> <value> -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value> -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value> </property>
ユーザー/管理者は、mapreduce.{map|reduce}.memory.mb
を使用して、起動された子タスク、およびそれが再帰的に起動するすべてのサブプロセスの最大仮想メモリを指定することもできます。ここで設定される値はプロセスごとの制限であることに注意してください。 mapreduce.{map|reduce}.memory.mb
の値はメガバイト(MB)単位で指定する必要があります。また、値はJavaVMに渡される-Xmx以上でなければなりません。そうでない場合、VMが起動しない可能性があります。
注:mapreduce.{map|reduce}.java.opts
は、MRAppMasterから起動された子タスクを設定するためにのみ使用されます。デーモンのメモリオプションの設定については、Hadoopデーモンの環境の設定に記載されています。
フレームワークの一部の部分で使用可能なメモリも設定可能です。マップタスクとリデューサタスクでは、操作の並行性とデータがディスクにヒットする頻度に影響を与えるパラメータを調整することで、パフォーマンスが影響を受ける可能性があります。ジョブのファイルシステムカウンター(特に、マップからのバイト数とリデュースへのバイト数に関連するもの)を監視することは、これらのパラメータの調整に非常に役立ちます。
マップから出力されたレコードはバッファにシリアル化され、メタデータはアカウンティングバッファに格納されます。以下のオプションで説明するように、シリアル化バッファまたはメタデータがいずれかのしきい値を超えると、バッファの内容はソートされ、バックグラウンドでディスクに書き込まれます。マップはレコードの出力を続けます。スピル中にいずれかのバッファが完全にいっぱいになると、マップスレッドはブロックされます。マップが終了すると、残りのレコードはディスクに書き込まれ、すべてのディスク上のセグメントが1つのファイルにマージされます。ディスクへのスピル数を最小限に抑えるとマップ時間を短縮できますが、バッファが大きいほどマッパーで使用可能なメモリも少なくなります。
名前 | タイプ | 説明 |
---|---|---|
mapreduce.task.io.sort.mb | int | マップから出力されたレコードを格納するシリアル化バッファとアカウンティングバッファの累積サイズ(メガバイト単位)。 |
mapreduce.map.sort.spill.percent | float | シリアル化バッファのソフト制限。到達すると、スレッドはバックグラウンドでコンテンツをディスクにスピルし始めます。 |
その他の注意事項
スピル中にいずれかのスピルしきい値を超えた場合、スピルが完了するまで収集は続行されます。たとえば、mapreduce.map.sort.spill.percent
が0.33に設定されていて、スピルの実行中にバッファの残りの部分が埋められた場合、次のスピルには収集されたすべてのレコード、つまりバッファの0.66が含まれ、追加のスピルは生成されません。言い換えれば、しきい値はトリガーを定義するものであり、ブロッキングするものではありません。
シリアル化バッファより大きいレコードは、最初にスピルをトリガーし、次に別のファイルにスピルされます。このレコードが最初にコンバイナを通過するかどうかは定義されていません。
前述のように、各リデュースは、パーティショナーによって割り当てられた出力をHTTP経由でメモリにフェッチし、これらの出力を定期的にディスクにマージします。マップ出力の中間圧縮がオンになっている場合、各出力はメモリに解凍されます。以下のオプションは、リデュース前のディスクへのこれらのマージの頻度と、リデュース中のマップ出力に割り当てられるメモリに影響します。
名前 | タイプ | 説明 |
---|---|---|
mapreduce.task.io.soft.factor | int | 同時にマージされるディスク上のセグメント数を指定します。マージ中の開いているファイルと圧縮コーデックの数を制限します。ファイル数がこの制限を超えると、マージは数回に分けて実行されます。この制限はマップにも適用されますが、ほとんどのジョブは、この制限に達する可能性が低いように構成する必要があります。 |
mapreduce.reduce.merge.inmem.thresholds | int | ディスクにマージされる前にメモリにフェッチされるソート済みマップ出力の数。前の注のスピルしきい値と同様に、これはパーティションの単位を定義するのではなく、トリガーを定義します。実際には、メモリ内セグメントのマージはディスクからのマージよりも安価なことが多いため、これは通常非常に高く(1000)設定されるか、無効にされます(0)(この表の後の注を参照)。このしきい値は、シャッフル中のメモリ内マージの頻度にのみ影響します。 |
mapreduce.reduce.shuffle.merge.percent | float | メモリ内マージが開始される前にフェッチされたマップ出力のメモリしきい値。メモリにマップ出力を格納するために割り当てられたメモリの割合として表されます。メモリに収まらないマップ出力は停止する可能性があるため、これを高く設定すると、フェッチとマージの間の並列性が低下する可能性があります。逆に、入力がメモリに完全に収まるリデュースでは、1.0などの高い値が有効です。このパラメータは、シャッフル中のメモリ内マージの頻度にのみ影響します。 |
mapreduce.reduce.shuffle.input.buffer.percent | float | シャッフル中にマップ出力を格納するために割り当てることができるメモリの割合(通常はmapreduce.reduce.java.opts で指定される最大ヒープサイズに対する割合)。フレームワーク用にメモリを確保しておく必要がありますが、一般に、大きくて多数のマップ出力を格納するのに十分な高さに設定すると有利です。 |
mapreduce.reduce.input.buffer.percent | float | リデュース中にマップ出力を保持できる最大ヒープサイズに対するメモリの割合。リデュースが開始されると、残りのマップ出力がこの定義のリソース制限を下回るまで、マップ出力はディスクにマージされます。デフォルトでは、リデュースで使用可能なメモリを最大化するために、リデュースが開始される前にすべてのマップ出力がディスクにマージされます。メモリ消費量の少ないリデュースの場合、ディスクへのトリップを回避するためにこれを増やす必要があります。 |
その他の注意事項
マップ出力がマップ出力のコピーに割り当てられたメモリの25%を超える場合、メモリを介してステージングすることなく、ディスクに直接書き込まれます。
コンバイナを使用して実行する場合、高いマージしきい値と大きなバッファに関する推論は当てはまらない場合があります。すべてのマップ出力がフェッチされる前に開始されたマージの場合、ディスクにスピルしている間にコンバイナが実行されます。場合によっては、バッファサイズを積極的に増やすのではなく、マップ出力の結合(ディスクスピルを小さくし、スピルとフェッチを並列化する)にリソースを費やすことで、リデュース時間を短縮できます。
リデュースを開始するためにメモリ内マップ出力をディスクにマージする場合、スピルするセグメントがあり、少なくともmapreduce.task.io.sort.factor
セグメントがすでにディスク上にあるために中間マージが必要な場合、メモリ内マップ出力は中間マージの一部になります。
以下のプロパティは、各タスクの実行のジョブ設定でローカライズされます
名前 | タイプ | 説明 |
---|---|---|
mapreduce.job.id | String | ジョブID |
mapreduce.job.jar | String | ジョブディレクトリ内のjob.jarの場所 |
mapreduce.job.local.dir | String | ジョブ固有の共有スクラッチスペース |
mapreduce.task.id | String | タスクID |
mapreduce.task.attempt.id | String | タスク試行ID |
mapreduce.task.is.map | boolean | これはマップタスクですか |
mapreduce.task.partition | int | ジョブ内のタスクのID |
mapreduce.map.input.file | String | マップが読み取っているファイル名 |
mapreduce.map.input.start | long | マップ入力分割の開始オフセット |
mapreduce.map.input.length | long | マップ入力分割のバイト数 |
mapreduce.task.output.dir | String | タスクの一時出力ディレクトリ |
注:ストリーミングジョブの実行中、「mapreduce」パラメータの名前は変換されます。ドット(.)はアンダースコア(_)になります。たとえば、mapreduce.job.idはmapreduce_job_idになり、mapreduce.job.jarはmapreduce_job_jarになります。ストリーミングジョブのマッパー/リデューサーで値を取得するには、アンダースコア付きのパラメータ名を使用します。
タスクの標準出力(stdout)およびエラー(stderr)ストリームとsyslogは、NodeManagerによって読み取られ、${HADOOP_LOG_DIR}/userlogs
に記録されます。
分散キャッシュは、マップタスクやリデュースタスクで使用するJARファイルとネイティブライブラリの両方を配布するためにも使用できます。子JVMは常に、その*カレントワーキングディレクトリ*をjava.library.path
とLD_LIBRARY_PATH
に追加します。そのため、キャッシュされたライブラリは、System.loadLibraryまたはSystem.loadを介してロードできます。分散キャッシュを介して共有ライブラリをロードする方法の詳細については、ネイティブライブラリに記載されています。
Jobは、ユーザー ジョブがResourceManager
と対話するための主要なインターフェースです。
Job
は、ジョブの送信、進捗状況の追跡、コンポーネントタスクのレポートとログへのアクセス、MapReduceクラスタのステータス情報の取得などの機能を提供します。
ジョブ送信プロセスには、次のものが含まれます。
ジョブの入力と出力の仕様を確認します。
ジョブのInputSplit
値を計算します。
必要に応じて、ジョブのDistributedCache
に必要なアカウンティング情報を設定します。
ジョブのjarと設定を、FileSystem
上のMapReduceシステムディレクトリにコピーします。
ジョブをResourceManager
に送信し、必要に応じてそのステータスを監視します。
ジョブ履歴ファイルは、ユーザー指定のディレクトリmapreduce.jobhistory.intermediate-done-dir
およびmapreduce.jobhistory.done-dir
にもログに記録されます。デフォルトはジョブ出力ディレクトリです。
ユーザーは、次のコマンド$ mapred job -history output.jhist
を使用して、指定されたディレクトリ内の履歴ログの概要を表示できます。このコマンドは、ジョブの詳細、失敗したtipと強制終了されたtipの詳細を出力します。成功したタスクや各タスクに対して行われたタスク試行などのジョブの詳細については、次のコマンド$ mapred job -history all output.jhist
を使用して表示できます。
通常、ユーザーはJob
を使用してアプリケーションを作成し、ジョブのさまざまな側面を記述し、ジョブを送信し、その進捗状況を監視します。
ユーザーは、単一のMapReduceジョブでは実行できない複雑なタスクを実行するために、MapReduceジョブをチェーンする必要がある場合があります。ジョブの出力は通常分散ファイルシステムに送られ、その出力は次のジョブの入力として使用できるため、これは非常に簡単です。
ただし、これは、ジョブが完了(成功/失敗)していることを確認する責任がクライアントにあることも意味します。このような場合、さまざまなジョブ制御オプションは次のとおりです。
Job.submit():ジョブをクラスタに送信し、すぐに返します。
Job.waitForCompletion(boolean):ジョブをクラスタに送信し、完了するまで待ちます。
InputFormatは、MapReduceジョブの入力仕様を記述します。
MapReduceフレームワークは、ジョブのInputFormat
に依存して、次のことを行います。
ジョブの入力仕様を検証します。
入力ファイルを入力ファイル)を論理InputSplit
インスタンスに分割します。それぞれのインスタンスは個々のMapper
に割り当てられます。
Mapper
による処理のために論理InputSplit
から入力レコードを読み取るために使用されるRecordReader
実装を提供します。
ファイルベースのInputFormat
実装(通常はFileInputFormatのサブクラス)のデフォルトの動作は、入力ファイルの合計サイズ(バイト単位)に基づいて入力を*論理的*なInputSplit
インスタンスに分割することです。ただし、入力ファイルのFileSystem
ブロックサイズは、入力分割の上限として扱われます。分割サイズの下限は、mapreduce.input.fileinputformat.split.minsize
を介して設定できます。
レコード境界を尊重する必要があるため、入力サイズに基づく論理分割は多くのアプリケーションでは不十分です。このような場合、アプリケーションはRecordReader
を実装する必要があります。これは、レコード境界を尊重し、個々のタスクに論理InputSplit
のレコード指向ビューを提供する役割を担います。
TextInputFormatは、デフォルトのInputFormat
です。
TextInputFormat
が特定のジョブのInputFormat
である場合、フレームワークは*.gz*拡張子を持つ入力ファイルを検出し、適切なCompressionCodec
を使用して自動的に解凍します。ただし、上記の拡張子を持つ圧縮ファイルは*分割*できず、各圧縮ファイルは単一のmapperによって全体として処理されることに注意してください。
InputSplitは、個々のMapper
によって処理されるデータを表します。
通常、InputSplit
は入力のバイト指向ビューを提供し、レコード指向のビューを処理して提示するのはRecordReader
の責任です。
FileSplitは、デフォルトのInputSplit
です。論理分割の入力ファイルのパスにmapreduce.map.input.file
を設定します。
RecordReaderは、InputSplit
から<key, value>
ペアを読み取ります。
通常、RecordReader
は、InputSplit
によって提供される入力のバイト指向ビューを変換し、処理のためにMapper
実装にレコード指向のビューを提供します。したがって、RecordReader
はレコード境界を処理する責任を負い、タスクにキーと値を提示します。
OutputFormatは、MapReduceジョブの出力仕様を記述します。
MapReduceフレームワークは、ジョブのOutputFormat
に依存して、次のことを行います。
ジョブの出力仕様を検証します。たとえば、出力ディレクトリが既に存在しないことを確認します。
ジョブの出力ファイルを書き込むために使用されるRecordWriter
実装を提供します。出力ファイルはFileSystem
に保存されます。
TextOutputFormat
は、デフォルトのOutputFormat
です。
OutputCommitterは、MapReduceジョブのタスク出力のコミットについて説明します。
MapReduceフレームワークは、ジョブのOutputCommitter
に依存して、次のことを行います。
初期化中にジョブをセットアップします。たとえば、ジョブの初期化中にジョブの一時出力ディレクトリを作成します。ジョブのセットアップは、ジョブがPREP状態にあり、タスクを初期化した後に、別のタスクによって行われます。セットアップタスクが完了すると、ジョブはRUNNING状態に移行します。
ジョブ完了後にジョブをクリーンアップします。たとえば、ジョブ完了後、一時出力ディレクトリを削除します。ジョブのクリーンアップは、ジョブの最後に別のタスクによって行われます。クリーンアップタスクが完了すると、ジョブはSUCCEDED/FAILED/KILLEDと宣言されます。
タスク一時出力を設定します。タスクのセットアップは、タスクの初期化中に、同じタスクの一部として行われます。
タスクにコミットが必要かどうかを確認します。これは、タスクにコミットが必要ない場合にコミット手順を回避するためです。
タスク出力のコミット。タスクが完了すると、タスクは必要に応じて出力をコミットします。
タスクコミットを破棄します。タスクが失敗/強制終了された場合、出力はクリーンアップされます。タスクがクリーンアップできなかった場合(例外ブロック内)、クリーンアップを実行するために、同じ試行IDで別のタスクが起動されます。
FileOutputCommitter
は、デフォルトのOutputCommitter
です。ジョブのセットアップ/クリーンアップタスクは、NodeManagerで使用可能なマップコンテナまたは縮小コンテナのいずれかを占有します。また、JobCleanupタスク、TaskCleanupタスク、JobSetupタスクはこの順序で最も高い優先順位を持ちます。
一部のアプリケーションでは、コンポーネントタスクは、実際のジョブ出力ファイルとは異なる副作用ファイルを作成および/または書き込む必要があります。
このような場合、同じMapper
またはReducer
の2つのインスタンスが同時に実行されている(たとえば、投機的タスク)と、FileSystem
上の同じファイル(パス)を開こうとしたり、書き込もうとしたりする際に問題が発生する可能性があります。したがって、アプリケーションライターは、タスクごとではなく、タスク試行ごとに一意の名前を選択する必要があります(試行ID、たとえば、attempt_200709221812_0001_m_000000_0
を使用)。
これらの問題を回避するために、MapReduceフレームワークは、OutputCommitter
がFileOutputCommitter
の場合、FileSystem
上の各タスク試行に対して、${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
という特別なサブディレクトリを保持します。タスク試行の出力が格納されます。タスク試行が正常に完了すると、${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
内のファイル(のみ)が${mapreduce.output.fileoutputformat.outputdir}
に*昇格*されます。もちろん、フレームワークは失敗したタスク試行のサブディレクトリを破棄します。このプロセスは、アプリケーションに対して完全に透過的です。
アプリケーションライターは、FileOutputFormat.getWorkOutputPath(Conext)を介してタスクの実行中に${mapreduce.task.output.dir}
に必要な副作用ファイルを作成することで、この機能を利用できます。フレームワークは、成功したタスク試行についても同様にそれらを昇格させるため、タスク試行ごとに一意のパスを選択する必要がなくなります。
注:特定のタスク試行の実行中の${mapreduce.task.output.dir}
の値は、実際には${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}
であり、この値はMapReduceフレームワークによって設定されます。したがって、この機能を利用するには、MapReduceタスクからFileOutputFormat.getWorkOutputPath(Conext)によって返されるパスに副作用ファイルを作成するだけです。
reducer = NONE(つまり、0個のreducer)のジョブのマップについては、マップの出力がその場合HDFSに直接送られるため、議論全体が当てはまります。
RecordWriterは、出力<key, value>
ペアを出力ファイルに書き込みます。
RecordWriter実装は、ジョブ出力をFileSystem
に書き込みます。
ユーザーはジョブをキューに送信します。キューはジョブの集合体として、システムが特定の機能を提供できるようにします。たとえば、キューはACLを使用して、ジョブを送信できるユーザーを制御します。キューは、主にHadoopスケジューラによって使用されることが想定されています。
Hadoopには、「default」と呼ばれる単一の必須キューが設定されています。キュー名は、Hadoopサイト設定のmapreduce.job.queuename
プロパティで定義されています。Capacity Schedulerなど、一部のジョブスケジューラは複数のキューをサポートしています。
ジョブは、mapreduce.job.queuename
プロパティ、または Configuration.set(MRJobConfig.QUEUE_NAME
, String) API を使用して、送信する必要があるキューを定義します。キュー名の設定はオプションです。ジョブが関連付けられたキュー名なしで送信された場合、'default' キューに送信されます。
Counters
は、MapReduce フレームワークまたはアプリケーションによって定義されたグローバルカウンタを表します。各 Counter
は任意の Enum
型にすることができます。特定の Enum
のカウンタは、Counters.Group
型のグループにまとめられます。
アプリケーションは、任意の Counters
(Enum
型) を定義し、map
メソッドまたは reduce
メソッド (あるいはその両方) で Counters.incrCounter(Enum, long) または Counters.incrCounter(String, String, long) を使用して更新できます。これらのカウンタは、フレームワークによってグローバルに集計されます。
DistributedCache
は、アプリケーション固有の大規模な読み取り専用ファイルを効率的に分散します。
DistributedCache
は、アプリケーションに必要なファイル (テキスト、アーカイブ、jar など) をキャッシュするために MapReduce フレームワークによって提供される機能です。
アプリケーションは、Job
内の URL (hdfs://) を介してキャッシュするファイルを指定します。DistributedCache
は、hdfs:// URL を介して指定されたファイルが FileSystem
に既に存在すると想定します。
フレームワークは、ジョブのタスクがノードで実行される前に、必要なファイルをワーカーノードにコピーします。その効率性は、ファイルがジョブごとに一度だけコピーされることと、ワーカー上で解凍されるアーカイブをキャッシュできることに由来します。
DistributedCache
は、キャッシュされたファイルの変更タイムスタンプを追跡します。ジョブの実行中に、アプリケーションまたは外部によってキャッシュファイルが変更されないようにする必要があります。
DistributedCache
は、単純な読み取り専用データ/テキストファイルと、アーカイブや jar などのより複雑なタイプの配布に使用できます。アーカイブ (zip、tar、tgz、tar.gz ファイル) は、ワーカーノードで_解凍_されます。ファイルには、_実行権限_が設定されています。
ファイル/アーカイブは、プロパティ mapreduce.job.cache.{files |archives}
を設定することで配布できます。複数のファイル/アーカイブを配布する必要がある場合は、カンマ区切りのパスとして追加できます。プロパティは、API Job.addCacheFile(URI)/ Job.addCacheArchive(URI) および Job.setCacheFiles(URI[])/ Job.setCacheArchives(URI[]) を使用しても設定できます。ここで、URI は hdfs://host:port/absolute-path#link-name
の形式です。ストリーミングでは、コマンドラインオプション -cacheFile/-cacheArchive
を使用してファイルを配布できます。
DistributedCache
は、map タスクまたは reduce タスク (あるいはその両方) で使用するための基本的なソフトウェア配布メカニズムとしても使用できます。jar とネイティブライブラリの両方を配布するために使用できます。Job.addArchiveToClassPath(Path) または Job.addFileToClassPath(Path) API を使用して、ファイル/ jar をキャッシュし、子 JVM の_クラスパス_に追加することもできます。設定プロパティ mapreduce.job.classpath.{files |archives}
を設定することによって、同じことができます。同様に、タスクの作業ディレクトリにシンボリックリンクされているキャッシュファイルを使用して、ネイティブライブラリを配布してロードできます。
分散キャッシュファイルはプライベートまたはパブリックにすることができ、ワーカーノードでの共有方法が決まります。
「プライベート」分散キャッシュファイルは、これらのファイルを必要とするジョブのユーザー専用のローカルディレクトリにキャッシュされます。これらのファイルは、特定のユーザーのすべてのタスクとジョブによってのみ共有され、ワーカー上の他のユーザーのジョブからはアクセスできません。分散キャッシュファイルは、ファイルがアップロードされるファイルシステム (通常は HDFS) に対するアクセス許可によってプライベートになります。ファイルにワールド読み取りアクセス権がない場合、またはファイルへのディレクトリパスにルックアップのためのワールド実行アクセス権がない場合、ファイルはプライベートになります。
「パブリック」分散キャッシュファイルはグローバルディレクトリにキャッシュされ、ファイルアクセスはすべてのユーザーに公開されるように設定されます。これらのファイルは、ワーカー上のすべてのユーザーのタスクとジョブによって共有できます。分散キャッシュファイルは、ファイルがアップロードされるファイルシステム (通常は HDFS) に対するアクセス許可によってパブリックになります。ファイルにワールド読み取りアクセス権があり、ファイルへのディレクトリパスにルックアップのためのワールド実行アクセス権がある場合、ファイルはパブリックになります。言い換えれば、ユーザーがファイルをすべてのユーザーに公開する場合は、ファイルのアクセス許可をワールド読み取り可能に設定し、ファイルへのパス上のディレクトリのアクセス許可をワールド実行可能に設定する必要があります。
プロファイリングは、map と reduce のサンプルに対して、組み込みの Java プロファイラの代表的な (2 つまたは 3 つ) サンプルを取得するためのユーティリティです。
ユーザーは、設定プロパティ mapreduce.task.profile
を設定することにより、システムがジョブ内の一部のタスクのプロファイラ情報を収集するかどうかを指定できます。値は、API Configuration.set(MRJobConfig.TASK_PROFILE
, boolean) を使用して設定できます。値が true
に設定されている場合、タスクプロファイリングが有効になります。プロファイラ情報は、ユーザーログディレクトリに保存されます。デフォルトでは、ジョブのプロファイリングは有効になっていません。
ユーザーがプロファイリングが必要であることを設定したら、設定プロパティ mapreduce.task.profile.{maps|reduces}
を使用して、プロファイリングする MapReduce タスクの範囲を設定できます。値は、API Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES
, String) を使用して設定できます。デフォルトでは、指定された範囲は 0-2
です。
ユーザーは、設定プロパティ `mapreduce.task.profile.params` を設定することで、プロファイラ設定引数を指定することもできます。値は、API Configuration.set(`MRJobConfig.TASK_PROFILE_PARAMS`, String) を使用して指定できます。文字列に `%s` が含まれている場合、タスクの実行時にプロファイリング出力ファイルの名前に置き換えられます。これらのパラメータは、コマンドラインでタスク子 JVM に渡されます。プロファイリングパラメータのデフォルト値は `-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s` です。
MapReduce フレームワークは、デバッグ用にユーザー提供のスクリプトを実行する機能を提供します。MapReduce タスクが失敗した場合、ユーザーはデバッグスクリプトを実行して、たとえばタスクログを処理できます。スクリプトには、タスクの stdout および stderr 出力、syslog、および jobconf へのアクセス権が付与されます。デバッグスクリプトの stdout および stderr からの出力は、コンソール診断に表示され、ジョブ UI の一部としても表示されます。
以下のセクションでは、ジョブにデバッグスクリプトを送信する方法について説明します。スクリプトファイルは、配布してフレームワークに送信する必要があります。
ユーザーは、分散キャッシュを使用して、スクリプトファイルを_配布_し、_シンボリックリンク_する必要があります。
デバッグスクリプトを送信する簡単な方法は、それぞれ map タスクと reduce タスクをデバッグするために、プロパティ `mapreduce.map.debug.script` と `mapreduce.reduce.debug.script` に値を設定することです。これらのプロパティは、API Configuration.set(`MRJobConfig.MAP_DEBUG_SCRIPT`, String) および Configuration.set(`MRJobConfig.REDUCE_DEBUG_SCRIPT`, String) を使用して設定することもできます。ストリーミングモードでは、それぞれ map タスクと reduce タスクをデバッグするために、コマンドラインオプション `-mapdebug` と `-reducedebug` を使用してデバッグスクリプトを送信できます。
スクリプトへの引数は、タスクの stdout、stderr、syslog、および jobconf ファイルです。MapReduce タスクが失敗したノードで実行される debug コマンドは次のとおりです。
$script $stdout $stderr $syslog $jobconf
Pipes プログラムは、コマンドの 5 番目の引数として c++ プログラム名を持ちます。したがって、Pipes プログラムのコマンドは次のとおりです。
$script $stdout $stderr $syslog $jobconf $program
Pipes の場合、gdb でコアダンプを処理し、スタックトレースを出力し、実行中のスレッドに関する情報を提供するためのデフォルトスクリプトが実行されます。
Hadoop MapReduce は、アプリケーションライターが中間 map 出力とジョブ出力 (つまり、reduce の出力) の両方の圧縮を指定するための機能を提供します。また、zlib 圧縮アルゴリズムの CompressionCodec 実装もバンドルされています。gzip、bzip2、snappy、および lz4 ファイル形式もサポートされています。
Hadoop は、パフォーマンス (zlib) と Java ライブラリの入手不可の両方の理由から、上記の圧縮コーデックのネイティブ実装も提供します。使用方法と可用性の詳細は、こちらを参照してください。
アプリケーションは、Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS
, boolean) API を使用して中間 map 出力の圧縮を制御し、Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC
, Class) API を使用して使用する CompressionCodec
を制御できます。
アプリケーションは、FileOutputFormat.setCompressOutput(Job, boolean) API を使用してジョブ出力の圧縮を制御でき、使用する CompressionCodec
は FileOutputFormat.setOutputCompressorClass(Job, Class) API を使用して指定できます。
ジョブ出力が SequenceFileOutputFormat に格納される場合、必要な `SequenceFile.CompressionType` (つまり、`RECORD` / `BLOCK` - デフォルトは `RECORD`) は、SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) API を使用して指定できます。
Hadoop は、map 入力の処理時に特定の不正な入力レコードをスキップできるオプションを提供します。アプリケーションは、SkipBadRecords クラスを使用してこの機能を制御できます。
この機能は、map タスクが特定の入力で確定的にクラッシュする場合に使用できます。これは通常、map 関数のバグが原因で発生します。通常、ユーザーはこれらのバグを修正する必要があります。しかし、これは不可能な場合があります。たとえば、ソースコードが入手できないサードパーティライブラリにバグがある場合があります。このような場合、タスクは複数回試行しても正常に完了せず、ジョブは失敗します。この機能を使用すると、不正なレコードの周囲のデータのごく一部だけが失われ、一部のアプリケーション (たとえば、非常に大きなデータで統計分析を実行するアプリケーション) では許容される場合があります。
デフォルトでは、この機能は無効になっています。有効にするには、SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) および SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) を参照してください。
この機能を有効にすると、フレームワークは特定の数の map 障害の後、「スキップモード」になります。詳細については、SkipBadRecords.setAttemptsToStartSkipping(Configuration, int) を参照してください。「スキップモード」では、map タスクは処理されているレコードの範囲を維持します。これを行うために、フレームワークは処理されたレコードカウンタに依存します。SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS および SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS を参照してください。このカウンタにより、フレームワークは正常に処理されたレコードの数、ひいてはタスクのクラッシュの原因となったレコード範囲を知ることができます。さらに試行すると、このレコード範囲はスキップされます。
スキップされるレコード数は、アプリケーションによって処理済みレコードカウンタがどれだけ頻繁にインクリメントされるかによって異なります。このカウンタは、各レコードが処理された後にインクリメントすることを推奨します。これは、通常バッチ処理を行う一部のアプリケーションでは不可能な場合があります。このような場合、フレームワークは不良レコード周辺の追加レコードをスキップする可能性があります。ユーザーは、SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) および SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) を使用して、スキップされるレコード数を制御できます。フレームワークは、バイナリサーチのようなアプローチを使用して、スキップされるレコードの範囲を絞り込もうとします。スキップされる範囲は2つの半分に分割され、半分だけが実行されます。後続の失敗で、フレームワークはどちらの半分に不良レコードが含まれているかを特定します。タスクは、許容できるスキップ値が満たされるか、すべてのタスク試行が使い果たされるまで再実行されます。タスク試行の回数を増やすには、Job.setMaxMapAttempts(int) および Job.setMaxReduceAttempts(int) を使用します。
スキップされたレコードは、後で分析するために、シーケンスファイル形式でHDFSに書き込まれます。場所は、SkipBadRecords.setSkipOutputPath(JobConf, Path) を使用して変更できます。
これまで説明したMapReduceフレームワークによって提供される多くの機能を使用する、より完全な WordCount
を以下に示します。
これは、特に DistributedCache
関連の機能のために、HDFSが稼働している必要があります。そのため、擬似分散 または 完全分散 Hadoopインストールでのみ動作します。
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; public class WordCount2 { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ static enum CountersEnum { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive; private Set<String> patternsToSkip = new HashSet<String>(); private Configuration conf; private BufferedReader fis; @Override public void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); if (conf.getBoolean("wordcount.skip.patterns", false)) { URI[] patternsURIs = Job.getInstance(conf).getCacheFiles(); for (URI patternsURI : patternsURIs) { Path patternsPath = new Path(patternsURI.getPath()); String patternsFileName = patternsPath.getName().toString(); parseSkipFile(patternsFileName); } } } private void parseSkipFile(String fileName) { try { fis = new BufferedReader(new FileReader(fileName)); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + StringUtils.stringifyException(ioe)); } } @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) { System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); List<String> otherArgs = new ArrayList<String>(); for (int i=0; i < remainingArgs.length; ++i) { if ("-skip".equals(remainingArgs[i])) { job.addCacheFile(new Path(remainingArgs[++i]).toUri()); job.getConfiguration().setBoolean("wordcount.skip.patterns", true); } else { otherArgs.add(remainingArgs[i]); } } FileInputFormat.addInputPath(job, new Path(otherArgs.get(0))); FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
入力としてのサンプルテキストファイル
$ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02 $ bin/hadoop fs -cat /user/joe/wordcount/input/file01 Hello World, Bye World! $ bin/hadoop fs -cat /user/joe/wordcount/input/file02 Hello Hadoop, Goodbye to hadoop.
アプリケーションを実行します
$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
出力
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 Bye 1 Goodbye 1 Hadoop, 1 Hello 2 World! 1 World, 1 hadoop. 1 to 1
入力が最初に見たバージョンと異なり、それらがどのように出力に影響するか注意してください。
それでは、DistributedCache
を介して、無視される単語パターンをリストしたパターンファイルをプラグインしてみましょう。
$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt \. \, \! to
今回はより多くのオプションを使用して、もう一度実行します。
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
期待どおり、出力は
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 Bye 1 Goodbye 1 Hadoop 1 Hello 2 World 2 hadoop 1
もう一度実行しますが、今回は大文字と小文字の区別をオフにします。
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
確かに、出力は
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 bye 1 goodbye 1 hadoop 2 hello 2 world 2
WordCount
の2番目のバージョンは、MapReduceフレームワークによって提供されるいくつかの機能を使用することで、以前のバージョンを改善しています。
アプリケーションが Mapper
(および Reducer
)実装の setup
メソッドで構成パラメータにアクセスする方法を示します。
DistributedCache
を使用して、ジョブに必要な読み取り専用データを配布する方法を示します。ここでは、カウント中にスキップする単語パターンをユーザーが指定できます。
汎用Hadoopコマンドラインオプションを処理するための GenericOptionsParser
のユーティリティを示します。
アプリケーションが Counters
を使用する方法と、map
(および reduce
)メソッドに渡されるアプリケーション固有のステータス情報を設定する方法を示します。
JavaおよびJNIは、米国およびその他の国におけるOracle America, Inc.の商標または登録商標です。