MapReduce チュートリアル

目的

このドキュメントでは、Hadoop MapReduce フレームワークのユーザー向け機能を包括的に説明し、チュートリアルとして機能します。

前提条件

Hadoop がインストール、構成、および実行されていることを確認してください。詳細は

概要

Hadoop MapReduce は、コモディティハードウェアの大規模クラスタ (数千ノード) で、信頼性が高くフォールトトレラントな方法で、大量のデータ (マルチテラバイトデータセット) を並列処理するアプリケーションを簡単に作成するためのソフトウェアフレームワークです。

MapReduce *ジョブ* は通常、入力データセットを独立したチャンクに分割し、*マップタスク* によって完全に並列に処理されます。フレームワークはマップの出力をソートし、*リデュースタスク* への入力とします。通常、ジョブの入力と出力はどちらもファイルシステムに保存されます。フレームワークは、タスクのスケジューリング、監視、および失敗したタスクの再実行を担当します。

通常、計算ノードとストレージノードは同じです。つまり、MapReduce フレームワークと Hadoop 分散ファイルシステム ( HDFS アーキテクチャガイド を参照) は同じノードセットで実行されています。この構成により、フレームワークはデータが既に存在するノードでタスクを効果的にスケジュールできるため、クラスタ全体の集約帯域幅が非常に高くなります。

MapReduce フレームワークは、単一のマスター `ResourceManager`、クラスタノードごとに 1 つのワーカー `NodeManager`、アプリケーションごとに `MRAppMaster` で構成されます ( YARN アーキテクチャガイド を参照)。

少なくとも、アプリケーションは入力/出力の場所を指定し、適切なインターフェース/抽象クラスの実装を介して *マップ* および *リデュース* 関数を提供します。これらと他のジョブパラメータは、*ジョブ構成* を構成します。

次に、Hadoop *ジョブクライアント* はジョブ (jar/実行可能ファイルなど) と構成を `ResourceManager` に送信します。`ResourceManager` は、ソフトウェア/構成をワーカーに配布し、タスクをスケジュールして監視し、ステータスと診断情報をジョブクライアントに提供する責任を負います。

Hadoop フレームワークは Java™ で実装されていますが、MapReduce アプリケーションは Java で記述する必要はありません。

  • Hadoop ストリーミング は、任意の実行可能ファイル (例: シェルユーティリティ) をマッパーまたはレデューサーとして使用してジョブを作成および実行できるユーティリティです。

  • Hadoop Pipes は、MapReduce アプリケーションを実装するための SWIG 互換 C++ API です (JNI™ ベースではありません)。

入力と出力

MapReduce フレームワークは `<key, value>` ペアのみを操作します。つまり、フレームワークはジョブへの入力を `<key, value>` ペアのセットとして表示し、ジョブの出力として `<key, value>` ペアのセットを生成します。型の異なる可能性があります。

`key` および `value` クラスはフレームワークによってシリアル化可能である必要があるため、Writable インターフェースを実装する必要があります。さらに、`key` クラスは、フレームワークによるソートを容易にするために、WritableComparable インターフェースを実装する必要があります。

MapReduce ジョブの入力と出力の型

(入力) `<k1, v1> ->` **map** `-> <k2, v2> ->` **combine** `-> <k2, v2> ->` **reduce** `-> <k3, v3>` (出力)

例: WordCount v1.0

詳細に入る前に、MapReduce アプリケーションの例を挙げて、それらがどのように機能するかを理解しましょう。

`WordCount` は、指定された入力セット内の各単語の出現回数をカウントする単純なアプリケーションです。

これは、ローカルスタンドアロン、擬似分散、または完全に分散された Hadoop インストールで動作します ( 単一ノード設定 )。

ソースコード

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

使用方法

環境変数が次のように設定されていると仮定します

export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

`WordCount.java` をコンパイルして jar を作成します

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class

以下を仮定します

  • `/user/joe/wordcount/input` - HDFS の入力ディレクトリ
  • `/user/joe/wordcount/output` - HDFS の出力ディレクトリ

入力としてのサンプルテキストファイル

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

アプリケーションを実行します

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

出力

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

アプリケーションは、オプション `-files` を使用して、タスクの現在の作業ディレクトリに存在するパスのコンマ区切りリストを指定できます。 `-libjars` オプションを使用すると、アプリケーションはマップとリデュースのクラスパスに jar を追加できます。オプション `-archives` を使用すると、コンマ区切りのアーカイブリストを引数として渡すことができます。これらのアーカイブは解凍され、アーカイブの名前のリンクがタスクの現在の作業ディレクトリに作成されます。コマンドラインオプションの詳細については、コマンドガイド を参照してください。

`-libjars`、`-files`、`-archives` を使用した `wordcount` 例の実行

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output

ここでは、myarchive.zip が配置され、「myarchive.zip」という名前のディレクトリに解凍されます。

ユーザーは、`-files` および `-archives` オプションを介して渡されるファイルとアーカイブに、# を使用して異なるシンボリック名を指定できます。

例えば、

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

ここでは、ファイル dir1/dict.txt と dir2/dict.txt には、それぞれシンボリック名 dict1 と dict2 を使用してタスクからアクセスできます。アーカイブ mytar.tgz は配置され、「tgzdir」という名前のディレクトリに解凍されます。

アプリケーションは、コマンドラインでそれぞれ -Dmapreduce.map.env、-Dmapreduce.reduce.env、-Dyarn.app.mapreduce.am.env オプションを使用して環境変数を指定することにより、Mapper、Reducer、および Application Master タスクの環境変数を指定できます。

たとえば、以下は、Mapper と Reducer に対して環境変数 FOO_VAR=bar と LIST_VAR=a,b,c を設定します。

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -Dmapreduce.map.env.FOO_VAR=bar -Dmapreduce.map.env.LIST_VAR=a,b,c -Dmapreduce.reduce.env.FOO_VAR=bar -Dmapreduce.reduce.env.LIST_VAR=a,b,c input output

ウォークスルー

WordCount アプリケーションは非常に分かりやすいです。

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

Mapper の実装は、map メソッドを介して、指定された TextInputFormat によって提供される 1 行ずつを処理します。次に、StringTokenizer を使用して、行を空白で区切られたトークンに分割し、< <単語>, 1> のキーと値のペアを出力します。

指定されたサンプル入力に対して、最初のマップは以下を出力します。

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

2 番目のマップは以下を出力します。

< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

指定されたジョブに対して生成されるマップの数、およびそれらをきめ細かく制御する方法については、チュートリアルの後半で詳しく説明します。

    job.setCombinerClass(IntSumReducer.class);

WordCountcombiner も指定します。したがって、各マップの出力は、*キー*でソートされた後、ローカル集計のためにローカル combiner(ジョブ構成によると Reducer と同じ)に渡されます。

最初のマップの出力

< Bye, 1>
< Hello, 1>
< World, 2>

2 番目のマップの出力

< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
public void reduce(Text key, Iterable<IntWritable> values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

Reducer の実装は、reduce メソッドを介して、各キー(この例では単語)の出現回数である値を合計するだけです。

したがって、ジョブの出力は次のとおりです。

< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

main メソッドは、Job で、入力/出力パス(コマンドライン経由で渡される)、キー/値の型、入力/出力フォーマットなど、ジョブのさまざまな側面を指定します。次に、job.waitForCompletion を呼び出してジョブを送信し、その進行状況を監視します。

JobInputFormatOutputFormat、およびその他のインターフェースとクラスについては、チュートリアルの後半で詳しく説明します。

MapReduce - ユーザーインターフェース

このセクションでは、MapReduce フレームワークのすべてのユーザー向け側面について、ある程度の詳細を提供します。これは、ユーザーがジョブを実装、構成、および調整するのに役立ちます。ただし、各クラス/インターフェースの javadoc が最も包括的なドキュメントであることに注意してください。これはチュートリアルとしてのみ意図されています。

まず、MapperReducer インターフェースを見てみましょう。アプリケーションは通常、map メソッドと reduce メソッドを提供するためにこれらを実装します。

次に、JobPartitionerInputFormatOutputFormat などの他のコアインターフェースについて説明します。

最後に、DistributedCacheIsolationRunner などのフレームワークの便利な機能について説明します。

ペイロード

アプリケーションは通常、map メソッドと reduce メソッドを提供するために、Mapper インターフェースと Reducer インターフェースを実装します。これらはジョブの中核を形成します。

Mapper

Mapper は入力キーと値のペアを、中間キーと値のペアのセットにマップします。

マップは、入力レコードを中間レコードに変換する個々のタスクです。変換された中間レコードは、入力レコードと同じ型である必要はありません。指定された入力ペアは、ゼロまたは多くの出力ペアにマップされる場合があります。

Hadoop MapReduce フレームワークは、ジョブの InputFormat によって生成された各 InputSplit ごとに 1 つのマップタスクを生成します。

全体として、Mapper の実装は Job.setMapperClass(Class) メソッドを介してジョブに渡されます。次に、フレームワークは、そのタスクの InputSplit 内の各キーと値のペアに対して map(WritableComparable, Writable, Context) を呼び出します。アプリケーションは、cleanup(Context) メソッドをオーバーライドして、必要なクリーンアップを実行できます。

出力ペアは、入力ペアと同じ型である必要はありません。指定された入力ペアは、ゼロまたは多くの出力ペアにマップされる場合があります。出力ペアは context.write(WritableComparable, Writable) の呼び出しで収集されます。

アプリケーションは Counter を使用して統計情報を報告できます。

指定された出力キーに関連付けられたすべての中間値は、その後フレームワークによってグループ化され、Reducer に渡されて最終出力が決定されます。ユーザーは、Job.setGroupingComparatorClass(Class) を介して Comparator を指定することにより、グループ化を制御できます。

Mapper の出力はソートされ、Reducer ごとに分割されます。パーティションの総数は、ジョブの Reduce タスクの数と同じです。ユーザーは、カスタム Partitioner を実装することにより、どのキー(したがってレコード)がどの Reducer に移動するかを制御できます。

ユーザーは、Job.setCombinerClass(Class) を介して combiner をオプションで指定して、中間出力のローカル集計を実行できます。これは、Mapper から Reducer に転送されるデータ量を削減するのに役立ちます。

中間ソート出力は、常に単純な(key-len、key、value-len、value)形式で格納されます。アプリケーションは、中間出力を圧縮するかどうか、およびその方法、および Configuration を介して使用する CompressionCodec を制御できます。

マップの数

マップの数は通常、入力の合計サイズ、つまり入力ファイルのブロックの総数によって決まります。

マップの適切な並列処理レベルは、ノードあたり 10〜100 マップのようです。ただし、CPU 負荷の非常に軽いマップタスクの場合は 300 マップに設定されています。タスクのセットアップには時間がかかるため、マップの実行に少なくとも 1 分かかるようにするのが最善です。

したがって、10TB の入力データが予想され、ブロックサイズが 128MB の場合、Configuration.set(MRJobConfig.NUM_MAPS, int)(フレームワークへのヒントのみを提供します)を使用してさらに高く設定しない限り、82,000 のマップが作成されます。

Reducer

Reducer は、キーを共有する一連の中間値を、より小さな値のセットに削減します。

ジョブの Reduce の数は、ユーザーが Job.setNumReduceTasks(int) を介して設定します。

全体として、Reducer の実装は、Job.setReducerClass(Class) メソッドを介してジョブの Job に渡され、それをオーバーライドして初期化できます。次に、フレームワークは、グループ化された入力の各 <key, (値のリスト)> ペアに対して reduce(WritableComparable, Iterable<Writable>, Context) メソッドを呼び出します。アプリケーションは、cleanup(Context) メソッドをオーバーライドして、必要なクリーンアップを実行できます。

Reducer には、シャッフル、ソート、および Reduce の 3 つの主要なフェーズがあります。

シャッフル

Reducer への入力は、Mapper のソートされた出力です。このフェーズでは、フレームワークは HTTP を介してすべての Mapper の出力の関連するパーティションを取得します。

ソート

フレームワークは、このステージでキー別に Reducer 入力をグループ化します(異なる Mapper が同じキーを出力している可能性があるため)。

シャッフルフェーズとソートフェーズは同時に発生します。マップ出力がフェッチされている間、それらはマージされます。

セカンダリソート

中間キーをグループ化するための等価ルールが、削減前にキーをグループ化するためのルールと異なる必要がある場合、Job.setSortComparatorClass(Class) を介して Comparator を指定できます。Job.setGroupingComparatorClass(Class) を使用して中間キーのグループ化方法を制御できるため、これらを組み合わせて使用​​して *値のセカンダリソート* をシミュレートできます。

Reduce

このフェーズでは、グループ化された入力の各 <key, (値のリスト)> ペアに対して reduce(WritableComparable, Iterable<Writable>, Context) メソッドが呼び出されます。

Reduce タスクの出力は、通常、Context.write(WritableComparable, Writable) を介して FileSystem に書き込まれます。

アプリケーションは Counter を使用して統計情報を報告できます。

Reducer の出力は *ソートされていません*。

Reduce の数

Reduce の適切な数は、0.95 または 1.75 に(<*ノード数*> * <*ノードあたりの最大コンテナー数*>)を掛けた値のようです。

0.95 を使用すると、すべての Reduce がすぐに起動し、マップの完了とともに出力の転送を開始できます。1.75 を使用すると、高速ノードは最初の Reduce ラウンドを完了し、負荷分散をはるかにうまく行う Reduce の第 2 ウェーブを起動します。

Reduce の数を増やすとフレームワークのオーバーヘッドは増加しますが、負荷分散が向上し、障害のコストが削減されます。

上記のスケーリング係数は、整数よりわずかに小さく、投機的タスクと失敗したタスクのためにフレームワークにいくつかの Reduce スロットを予約しています。

Reducer なし

削減が不要な場合、Reduce タスクの数を *ゼロ* に設定することは有効です。

この場合、マップタスクの出力は、FileOutputFormat.setOutputPath(Job, Path) によって設定された出力パスに、FileSystem に直接送られます。フレームワークは、マップ出力を FileSystem に書き込む前にソートしません。

Partitioner

Partitioner はキースペースを分割します。

Partitioner は、中間マップ出力のキーの分割を制御します。キー(またはキーのサブセット)は、通常は *ハッシュ関数* によってパーティションを導出するために使用されます。パーティションの総数は、ジョブの Reduce タスクの数と同じです。したがって、これは、中間キー(したがってレコード)が削減のために送信される m 個の Reduce タスクのどれを制御します。

HashPartitioner はデフォルトの Partitioner です。

カウンター

Counter は、MapReduce アプリケーションが統計情報を報告するための機能です。

MapperReducer の実装は、Counter を使用して統計情報を報告できます。

Hadoop MapReduce には、一般的に役立つ Mapper、Reducer、および Partitioner の ライブラリ がバンドルされています。

ジョブ構成

Job は MapReduce ジョブ構成を表します。

Job は、ユーザーが実行するために Hadoop フレームワークに MapReduce ジョブを記述するための主要なインターフェースです。フレームワークは、Job で説明されているとおりにジョブを忠実に実行しようとしますが、

  • 一部の構成パラメーターは、管理者によって final としてマークされている場合があり(最終パラメーター を参照)、したがって変更できません。

  • 一部のジョブパラメータは簡単に設定できます(例:Job.setNumReduceTasks(int))が、他のパラメータはフレームワークやジョブ設定の残りの部分と微妙に相互作用するため、設定がより複雑になります(例:Configuration.set(JobContext.NUM_MAPS, int))。

Jobは通常、Mapper、コンバイナ(存在する場合)、PartitionerReducerInputFormatOutputFormatの実装を指定するために使用されます。 FileInputFormatは、入力ファイルのセット(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))と(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String)))、および出力ファイルの書き込み先(FileOutputFormat.setOutputPath(Path))を示します。

オプションとして、Jobは、使用するComparatorDistributedCacheに配置するファイル、中間および/またはジョブ出力を圧縮するかどうか(および方法)、ジョブタスクを_投機的_に実行できるかどうか(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean))、タスクごとの最大試行回数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))など、ジョブの他の高度な側面を指定するために使用されます。

もちろん、ユーザーはConfiguration.set(String, String)/ Configuration.get(String)を使用して、アプリケーションに必要な任意のパラメータを設定/取得できます。ただし、大量の(読み取り専用)データにはDistributedCacheを使用してください。

タスクの実行と環境

MRAppMasterは、Mapper/Reducer _タスク_を、別のJVMのチャイルドプロセスとして実行します。

子タスクは、親MRAppMasterの環境を継承します。ユーザーは、mapreduce.{map|reduce}.java.opts設定パラメータとJobの設定パラメータを介して、子JVMに追加のオプションを指定できます。たとえば、ランタイムリンカーが共有ライブラリを検索するための非標準パスを-Djava.library.path=<>などを介して指定できます。 mapreduce.{map|reduce}.java.optsパラメータにシンボル_@taskid@_が含まれている場合、MapReduceタスクのtaskidの値に置き換えられます。

複数の引数と置換を使用した例を次に示します。JVM GCロギングと、パスワードなしのJVM JMXエージェントの起動を示しています。これにより、jconsoleなどが接続して、子のメモリ、スレッドを監視し、スレッドダンプを取得できます。また、マップとリデュースの子JVMの最大ヒープサイズをそれぞれ512MBと1024MBに設定します。また、子JVMのjava.library.pathに追加のパスを追加します。

<property>
  <name>mapreduce.map.java.opts</name>
  <value>
  -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
  -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

メモリ管理

ユーザー/管理者は、mapreduce.{map|reduce}.memory.mbを使用して、起動された子タスク、およびそれが再帰的に起動するすべてのサブプロセスの最大仮想メモリを指定することもできます。ここで設定される値はプロセスごとの制限であることに注意してください。 mapreduce.{map|reduce}.memory.mbの値はメガバイト(MB)単位で指定する必要があります。また、値はJavaVMに渡される-Xmx以上でなければなりません。そうでない場合、VMが起動しない可能性があります。

注:mapreduce.{map|reduce}.java.optsは、MRAppMasterから起動された子タスクを設定するためにのみ使用されます。デーモンのメモリオプションの設定については、Hadoopデーモンの環境の設定に記載されています。

フレームワークの一部の部分で使用可能なメモリも設定可能です。マップタスクとリデューサタスクでは、操作の並行性とデータがディスクにヒットする頻度に影響を与えるパラメータを調整することで、パフォーマンスが影響を受ける可能性があります。ジョブのファイルシステムカウンター(特に、マップからのバイト数とリデュースへのバイト数に関連するもの)を監視することは、これらのパラメータの調整に非常に役立ちます。

マップパラメータ

マップから出力されたレコードはバッファにシリアル化され、メタデータはアカウンティングバッファに格納されます。以下のオプションで説明するように、シリアル化バッファまたはメタデータがいずれかのしきい値を超えると、バッファの内容はソートされ、バックグラウンドでディスクに書き込まれます。マップはレコードの出力を続けます。スピル中にいずれかのバッファが完全にいっぱいになると、マップスレッドはブロックされます。マップが終了すると、残りのレコードはディスクに書き込まれ、すべてのディスク上のセグメントが1つのファイルにマージされます。ディスクへのスピル数を最小限に抑えるとマップ時間を短縮できますが、バッファが大きいほどマッパーで使用可能なメモリも少なくなります。

名前 タイプ 説明
mapreduce.task.io.sort.mb int マップから出力されたレコードを格納するシリアル化バッファとアカウンティングバッファの累積サイズ(メガバイト単位)。
mapreduce.map.sort.spill.percent float シリアル化バッファのソフト制限。到達すると、スレッドはバックグラウンドでコンテンツをディスクにスピルし始めます。

その他の注意事項

  • スピル中にいずれかのスピルしきい値を超えた場合、スピルが完了するまで収集は続行されます。たとえば、mapreduce.map.sort.spill.percentが0.33に設定されていて、スピルの実行中にバッファの残りの部分が埋められた場合、次のスピルには収集されたすべてのレコード、つまりバッファの0.66が含まれ、追加のスピルは生成されません。言い換えれば、しきい値はトリガーを定義するものであり、ブロッキングするものではありません。

  • シリアル化バッファより大きいレコードは、最初にスピルをトリガーし、次に別のファイルにスピルされます。このレコードが最初にコンバイナを通過するかどうかは定義されていません。

シャッフル/リデュースパラメータ

前述のように、各リデュースは、パーティショナーによって割り当てられた出力をHTTP経由でメモリにフェッチし、これらの出力を定期的にディスクにマージします。マップ出力の中間圧縮がオンになっている場合、各出力はメモリに解凍されます。以下のオプションは、リデュース前のディスクへのこれらのマージの頻度と、リデュース中のマップ出力に割り当てられるメモリに影響します。

名前 タイプ 説明
mapreduce.task.io.soft.factor int 同時にマージされるディスク上のセグメント数を指定します。マージ中の開いているファイルと圧縮コーデックの数を制限します。ファイル数がこの制限を超えると、マージは数回に分けて実行されます。この制限はマップにも適用されますが、ほとんどのジョブは、この制限に達する可能性が低いように構成する必要があります。
mapreduce.reduce.merge.inmem.thresholds int ディスクにマージされる前にメモリにフェッチされるソート済みマップ出力の数。前の注のスピルしきい値と同様に、これはパーティションの単位を定義するのではなく、トリガーを定義します。実際には、メモリ内セグメントのマージはディスクからのマージよりも安価なことが多いため、これは通常非常に高く(1000)設定されるか、無効にされます(0)(この表の後の注を参照)。このしきい値は、シャッフル中のメモリ内マージの頻度にのみ影響します。
mapreduce.reduce.shuffle.merge.percent float メモリ内マージが開始される前にフェッチされたマップ出力のメモリしきい値。メモリにマップ出力を格納するために割り当てられたメモリの割合として表されます。メモリに収まらないマップ出力は停止する可能性があるため、これを高く設定すると、フェッチとマージの間の並列性が低下する可能性があります。逆に、入力がメモリに完全に収まるリデュースでは、1.0などの高い値が有効です。このパラメータは、シャッフル中のメモリ内マージの頻度にのみ影響します。
mapreduce.reduce.shuffle.input.buffer.percent float シャッフル中にマップ出力を格納するために割り当てることができるメモリの割合(通常はmapreduce.reduce.java.optsで指定される最大ヒープサイズに対する割合)。フレームワーク用にメモリを確保しておく必要がありますが、一般に、大きくて多数のマップ出力を格納するのに十分な高さに設定すると有利です。
mapreduce.reduce.input.buffer.percent float リデュース中にマップ出力を保持できる最大ヒープサイズに対するメモリの割合。リデュースが開始されると、残りのマップ出力がこの定義のリソース制限を下回るまで、マップ出力はディスクにマージされます。デフォルトでは、リデュースで使用可能なメモリを最大化するために、リデュースが開始される前にすべてのマップ出力がディスクにマージされます。メモリ消費量の少ないリデュースの場合、ディスクへのトリップを回避するためにこれを増やす必要があります。

その他の注意事項

  • マップ出力がマップ出力のコピーに割り当てられたメモリの25%を超える場合、メモリを介してステージングすることなく、ディスクに直接書き込まれます。

  • コンバイナを使用して実行する場合、高いマージしきい値と大きなバッファに関する推論は当てはまらない場合があります。すべてのマップ出力がフェッチされる前に開始されたマージの場合、ディスクにスピルしている間にコンバイナが実行されます。場合によっては、バッファサイズを積極的に増やすのではなく、マップ出力の結合(ディスクスピルを小さくし、スピルとフェッチを並列化する)にリソースを費やすことで、リデュース時間を短縮できます。

  • リデュースを開始するためにメモリ内マップ出力をディスクにマージする場合、スピルするセグメントがあり、少なくともmapreduce.task.io.sort.factorセグメントがすでにディスク上にあるために中間マージが必要な場合、メモリ内マップ出力は中間マージの一部になります。

設定パラメータ

以下のプロパティは、各タスクの実行のジョブ設定でローカライズされます

名前 タイプ 説明
mapreduce.job.id String ジョブID
mapreduce.job.jar String ジョブディレクトリ内のjob.jarの場所
mapreduce.job.local.dir String ジョブ固有の共有スクラッチスペース
mapreduce.task.id String タスクID
mapreduce.task.attempt.id String タスク試行ID
mapreduce.task.is.map boolean これはマップタスクですか
mapreduce.task.partition int ジョブ内のタスクのID
mapreduce.map.input.file String マップが読み取っているファイル名
mapreduce.map.input.start long マップ入力分割の開始オフセット
mapreduce.map.input.length long マップ入力分割のバイト数
mapreduce.task.output.dir String タスクの一時出力ディレクトリ

注:ストリーミングジョブの実行中、「mapreduce」パラメータの名前は変換されます。ドット(.)はアンダースコア(_)になります。たとえば、mapreduce.job.idはmapreduce_job_idになり、mapreduce.job.jarはmapreduce_job_jarになります。ストリーミングジョブのマッパー/リデューサーで値を取得するには、アンダースコア付きのパラメータ名を使用します。

タスクログ

タスクの標準出力(stdout)およびエラー(stderr)ストリームとsyslogは、NodeManagerによって読み取られ、${HADOOP_LOG_DIR}/userlogsに記録されます。

ライブラリの配布

分散キャッシュは、マップタスクやリデュースタスクで使用するJARファイルとネイティブライブラリの両方を配布するためにも使用できます。子JVMは常に、その*カレントワーキングディレクトリ*をjava.library.pathLD_LIBRARY_PATHに追加します。そのため、キャッシュされたライブラリは、System.loadLibraryまたはSystem.loadを介してロードできます。分散キャッシュを介して共有ライブラリをロードする方法の詳細については、ネイティブライブラリに記載されています。

ジョブの送信と監視

Jobは、ユーザー ジョブがResourceManagerと対話するための主要なインターフェースです。

Jobは、ジョブの送信、進捗状況の追跡、コンポーネントタスクのレポートとログへのアクセス、MapReduceクラスタのステータス情報の取得などの機能を提供します。

ジョブ送信プロセスには、次のものが含まれます。

  1. ジョブの入力と出力の仕様を確認します。

  2. ジョブのInputSplit値を計算します。

  3. 必要に応じて、ジョブのDistributedCacheに必要なアカウンティング情報を設定します。

  4. ジョブのjarと設定を、FileSystem上のMapReduceシステムディレクトリにコピーします。

  5. ジョブをResourceManagerに送信し、必要に応じてそのステータスを監視します。

ジョブ履歴ファイルは、ユーザー指定のディレクトリmapreduce.jobhistory.intermediate-done-dirおよびmapreduce.jobhistory.done-dirにもログに記録されます。デフォルトはジョブ出力ディレクトリです。

ユーザーは、次のコマンド$ mapred job -history output.jhistを使用して、指定されたディレクトリ内の履歴ログの概要を表示できます。このコマンドは、ジョブの詳細、失敗したtipと強制終了されたtipの詳細を出力します。成功したタスクや各タスクに対して行われたタスク試行などのジョブの詳細については、次のコマンド$ mapred job -history all output.jhistを使用して表示できます。

通常、ユーザーはJobを使用してアプリケーションを作成し、ジョブのさまざまな側面を記述し、ジョブを送信し、その進捗状況を監視します。

ジョブ制御

ユーザーは、単一のMapReduceジョブでは実行できない複雑なタスクを実行するために、MapReduceジョブをチェーンする必要がある場合があります。ジョブの出力は通常分散ファイルシステムに送られ、その出力は次のジョブの入力として使用できるため、これは非常に簡単です。

ただし、これは、ジョブが完了(成功/失敗)していることを確認する責任がクライアントにあることも意味します。このような場合、さまざまなジョブ制御オプションは次のとおりです。

ジョブ入力

InputFormatは、MapReduceジョブの入力仕様を記述します。

MapReduceフレームワークは、ジョブのInputFormatに依存して、次のことを行います。

  1. ジョブの入力仕様を検証します。

  2. 入力ファイルを入力ファイル)を論理InputSplitインスタンスに分割します。それぞれのインスタンスは個々のMapperに割り当てられます。

  3. Mapperによる処理のために論理InputSplitから入力レコードを読み取るために使用されるRecordReader実装を提供します。

ファイルベースのInputFormat実装(通常はFileInputFormatのサブクラス)のデフォルトの動作は、入力ファイルの合計サイズ(バイト単位)に基づいて入力を*論理的*なInputSplitインスタンスに分割することです。ただし、入力ファイルのFileSystemブロックサイズは、入力分割の上限として扱われます。分割サイズの下限は、mapreduce.input.fileinputformat.split.minsizeを介して設定できます。

レコード境界を尊重する必要があるため、入力サイズに基づく論理分割は多くのアプリケーションでは不十分です。このような場合、アプリケーションはRecordReaderを実装する必要があります。これは、レコード境界を尊重し、個々のタスクに論理InputSplitのレコード指向ビューを提供する役割を担います。

TextInputFormatは、デフォルトのInputFormatです。

TextInputFormatが特定のジョブのInputFormatである場合、フレームワークは*.gz*拡張子を持つ入力ファイルを検出し、適切なCompressionCodecを使用して自動的に解凍します。ただし、上記の拡張子を持つ圧縮ファイルは*分割*できず、各圧縮ファイルは単一のmapperによって全体として処理されることに注意してください。

InputSplit

InputSplitは、個々のMapperによって処理されるデータを表します。

通常、InputSplitは入力のバイト指向ビューを提供し、レコード指向のビューを処理して提示するのはRecordReaderの責任です。

FileSplitは、デフォルトのInputSplitです。論理分割の入力ファイルのパスにmapreduce.map.input.fileを設定します。

RecordReader

RecordReaderは、InputSplitから<key, value>ペアを読み取ります。

通常、RecordReaderは、InputSplitによって提供される入力のバイト指向ビューを変換し、処理のためにMapper実装にレコード指向のビューを提供します。したがって、RecordReaderはレコード境界を処理する責任を負い、タスクにキーと値を提示します。

ジョブ出力

OutputFormatは、MapReduceジョブの出力仕様を記述します。

MapReduceフレームワークは、ジョブのOutputFormatに依存して、次のことを行います。

  1. ジョブの出力仕様を検証します。たとえば、出力ディレクトリが既に存在しないことを確認します。

  2. ジョブの出力ファイルを書き込むために使用されるRecordWriter実装を提供します。出力ファイルはFileSystemに保存されます。

TextOutputFormatは、デフォルトのOutputFormatです。

OutputCommitter

OutputCommitterは、MapReduceジョブのタスク出力のコミットについて説明します。

MapReduceフレームワークは、ジョブのOutputCommitterに依存して、次のことを行います。

  1. 初期化中にジョブをセットアップします。たとえば、ジョブの初期化中にジョブの一時出力ディレクトリを作成します。ジョブのセットアップは、ジョブがPREP状態にあり、タスクを初期化した後に、別のタスクによって行われます。セットアップタスクが完了すると、ジョブはRUNNING状態に移行します。

  2. ジョブ完了後にジョブをクリーンアップします。たとえば、ジョブ完了後、一時出力ディレクトリを削除します。ジョブのクリーンアップは、ジョブの最後に別のタスクによって行われます。クリーンアップタスクが完了すると、ジョブはSUCCEDED/FAILED/KILLEDと宣言されます。

  3. タスク一時出力を設定します。タスクのセットアップは、タスクの初期化中に、同じタスクの一部として行われます。

  4. タスクにコミットが必要かどうかを確認します。これは、タスクにコミットが必要ない場合にコミット手順を回避するためです。

  5. タスク出力のコミット。タスクが完了すると、タスクは必要に応じて出力をコミットします。

  6. タスクコミットを破棄します。タスクが失敗/強制終了された場合、出力はクリーンアップされます。タスクがクリーンアップできなかった場合(例外ブロック内)、クリーンアップを実行するために、同じ試行IDで別のタスクが起動されます。

FileOutputCommitterは、デフォルトのOutputCommitterです。ジョブのセットアップ/クリーンアップタスクは、NodeManagerで使用可能なマップコンテナまたは縮小コンテナのいずれかを占有します。また、JobCleanupタスク、TaskCleanupタスク、JobSetupタスクはこの順序で最も高い優先順位を持ちます。

タスクの副作用ファイル

一部のアプリケーションでは、コンポーネントタスクは、実際のジョブ出力ファイルとは異なる副作用ファイルを作成および/または書き込む必要があります。

このような場合、同じMapperまたはReducerの2つのインスタンスが同時に実行されている(たとえば、投機的タスク)と、FileSystem上の同じファイル(パス)を開こうとしたり、書き込もうとしたりする際に問題が発生する可能性があります。したがって、アプリケーションライターは、タスクごとではなく、タスク試行ごとに一意の名前を選択する必要があります(試行ID、たとえば、attempt_200709221812_0001_m_000000_0を使用)。

これらの問題を回避するために、MapReduceフレームワークは、OutputCommitterFileOutputCommitterの場合、FileSystem上の各タスク試行に対して、${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}という特別なサブディレクトリを保持します。タスク試行の出力が格納されます。タスク試行が正常に完了すると、${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}内のファイル(のみ)が${mapreduce.output.fileoutputformat.outputdir}に*昇格*されます。もちろん、フレームワークは失敗したタスク試行のサブディレクトリを破棄します。このプロセスは、アプリケーションに対して完全に透過的です。

アプリケーションライターは、FileOutputFormat.getWorkOutputPath(Conext)を介してタスクの実行中に${mapreduce.task.output.dir}に必要な副作用ファイルを作成することで、この機能を利用できます。フレームワークは、成功したタスク試行についても同様にそれらを昇格させるため、タスク試行ごとに一意のパスを選択する必要がなくなります。

注:特定のタスク試行の実行中の${mapreduce.task.output.dir}の値は、実際には${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}であり、この値はMapReduceフレームワークによって設定されます。したがって、この機能を利用するには、MapReduceタスクからFileOutputFormat.getWorkOutputPath(Conext)によって返されるパスに副作用ファイルを作成するだけです。

reducer = NONE(つまり、0個のreducer)のジョブのマップについては、マップの出力がその場合HDFSに直接送られるため、議論全体が当てはまります。

RecordWriter

RecordWriterは、出力<key, value>ペアを出力ファイルに書き込みます。

RecordWriter実装は、ジョブ出力をFileSystemに書き込みます。

その他の便利な機能

キューへのジョブの送信

ユーザーはジョブをキューに送信します。キューはジョブの集合体として、システムが特定の機能を提供できるようにします。たとえば、キューはACLを使用して、ジョブを送信できるユーザーを制御します。キューは、主にHadoopスケジューラによって使用されることが想定されています。

Hadoopには、「default」と呼ばれる単一の必須キューが設定されています。キュー名は、Hadoopサイト設定のmapreduce.job.queuenameプロパティで定義されています。Capacity Schedulerなど、一部のジョブスケジューラは複数のキューをサポートしています。

ジョブは、mapreduce.job.queuename プロパティ、または Configuration.set(MRJobConfig.QUEUE_NAME, String) API を使用して、送信する必要があるキューを定義します。キュー名の設定はオプションです。ジョブが関連付けられたキュー名なしで送信された場合、'default' キューに送信されます。

カウンタ

Counters は、MapReduce フレームワークまたはアプリケーションによって定義されたグローバルカウンタを表します。各 Counter は任意の Enum 型にすることができます。特定の Enum のカウンタは、Counters.Group 型のグループにまとめられます。

アプリケーションは、任意の Counters (Enum 型) を定義し、map メソッドまたは reduce メソッド (あるいはその両方) で Counters.incrCounter(Enum, long) または Counters.incrCounter(String, String, long) を使用して更新できます。これらのカウンタは、フレームワークによってグローバルに集計されます。

分散キャッシュ

DistributedCache は、アプリケーション固有の大規模な読み取り専用ファイルを効率的に分散します。

DistributedCache は、アプリケーションに必要なファイル (テキスト、アーカイブ、jar など) をキャッシュするために MapReduce フレームワークによって提供される機能です。

アプリケーションは、Job 内の URL (hdfs://) を介してキャッシュするファイルを指定します。DistributedCache は、hdfs:// URL を介して指定されたファイルが FileSystem に既に存在すると想定します。

フレームワークは、ジョブのタスクがノードで実行される前に、必要なファイルをワーカーノードにコピーします。その効率性は、ファイルがジョブごとに一度だけコピーされることと、ワーカー上で解凍されるアーカイブをキャッシュできることに由来します。

DistributedCache は、キャッシュされたファイルの変更タイムスタンプを追跡します。ジョブの実行中に、アプリケーションまたは外部によってキャッシュファイルが変更されないようにする必要があります。

DistributedCache は、単純な読み取り専用データ/テキストファイルと、アーカイブや jar などのより複雑なタイプの配布に使用できます。アーカイブ (zip、tar、tgz、tar.gz ファイル) は、ワーカーノードで_解凍_されます。ファイルには、_実行権限_が設定されています。

ファイル/アーカイブは、プロパティ mapreduce.job.cache.{files |archives} を設定することで配布できます。複数のファイル/アーカイブを配布する必要がある場合は、カンマ区切りのパスとして追加できます。プロパティは、API Job.addCacheFile(URI)/ Job.addCacheArchive(URI) および Job.setCacheFiles(URI[])/ Job.setCacheArchives(URI[]) を使用しても設定できます。ここで、URI は hdfs://host:port/absolute-path#link-name の形式です。ストリーミングでは、コマンドラインオプション -cacheFile/-cacheArchive を使用してファイルを配布できます。

DistributedCache は、map タスクまたは reduce タスク (あるいはその両方) で使用するための基本的なソフトウェア配布メカニズムとしても使用できます。jar とネイティブライブラリの両方を配布するために使用できます。Job.addArchiveToClassPath(Path) または Job.addFileToClassPath(Path) API を使用して、ファイル/ jar をキャッシュし、子 JVM の_クラスパス_に追加することもできます。設定プロパティ mapreduce.job.classpath.{files |archives} を設定することによって、同じことができます。同様に、タスクの作業ディレクトリにシンボリックリンクされているキャッシュファイルを使用して、ネイティブライブラリを配布してロードできます。

プライベートおよびパブリック分散キャッシュファイル

分散キャッシュファイルはプライベートまたはパブリックにすることができ、ワーカーノードでの共有方法が決まります。

  • 「プライベート」分散キャッシュファイルは、これらのファイルを必要とするジョブのユーザー専用のローカルディレクトリにキャッシュされます。これらのファイルは、特定のユーザーのすべてのタスクとジョブによってのみ共有され、ワーカー上の他のユーザーのジョブからはアクセスできません。分散キャッシュファイルは、ファイルがアップロードされるファイルシステム (通常は HDFS) に対するアクセス許可によってプライベートになります。ファイルにワールド読み取りアクセス権がない場合、またはファイルへのディレクトリパスにルックアップのためのワールド実行アクセス権がない場合、ファイルはプライベートになります。

  • 「パブリック」分散キャッシュファイルはグローバルディレクトリにキャッシュされ、ファイルアクセスはすべてのユーザーに公開されるように設定されます。これらのファイルは、ワーカー上のすべてのユーザーのタスクとジョブによって共有できます。分散キャッシュファイルは、ファイルがアップロードされるファイルシステム (通常は HDFS) に対するアクセス許可によってパブリックになります。ファイルにワールド読み取りアクセス権があり、ファイルへのディレクトリパスにルックアップのためのワールド実行アクセス権がある場合、ファイルはパブリックになります。言い換えれば、ユーザーがファイルをすべてのユーザーに公開する場合は、ファイルのアクセス許可をワールド読み取り可能に設定し、ファイルへのパス上のディレクトリのアクセス許可をワールド実行可能に設定する必要があります。

プロファイリング

プロファイリングは、map と reduce のサンプルに対して、組み込みの Java プロファイラの代表的な (2 つまたは 3 つ) サンプルを取得するためのユーティリティです。

ユーザーは、設定プロパティ mapreduce.task.profile を設定することにより、システムがジョブ内の一部のタスクのプロファイラ情報を収集するかどうかを指定できます。値は、API Configuration.set(MRJobConfig.TASK_PROFILE, boolean) を使用して設定できます。値が true に設定されている場合、タスクプロファイリングが有効になります。プロファイラ情報は、ユーザーログディレクトリに保存されます。デフォルトでは、ジョブのプロファイリングは有効になっていません。

ユーザーがプロファイリングが必要であることを設定したら、設定プロパティ mapreduce.task.profile.{maps|reduces} を使用して、プロファイリングする MapReduce タスクの範囲を設定できます。値は、API Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String) を使用して設定できます。デフォルトでは、指定された範囲は 0-2 です。

ユーザーは、設定プロパティ `mapreduce.task.profile.params` を設定することで、プロファイラ設定引数を指定することもできます。値は、API Configuration.set(`MRJobConfig.TASK_PROFILE_PARAMS`, String) を使用して指定できます。文字列に `%s` が含まれている場合、タスクの実行時にプロファイリング出力ファイルの名前に置き換えられます。これらのパラメータは、コマンドラインでタスク子 JVM に渡されます。プロファイリングパラメータのデフォルト値は `-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s` です。

デバッグ

MapReduce フレームワークは、デバッグ用にユーザー提供のスクリプトを実行する機能を提供します。MapReduce タスクが失敗した場合、ユーザーはデバッグスクリプトを実行して、たとえばタスクログを処理できます。スクリプトには、タスクの stdout および stderr 出力、syslog、および jobconf へのアクセス権が付与されます。デバッグスクリプトの stdout および stderr からの出力は、コンソール診断に表示され、ジョブ UI の一部としても表示されます。

以下のセクションでは、ジョブにデバッグスクリプトを送信する方法について説明します。スクリプトファイルは、配布してフレームワークに送信する必要があります。

スクリプトファイルを配布する方法

ユーザーは、分散キャッシュを使用して、スクリプトファイルを_配布_し、_シンボリックリンク_する必要があります。

スクリプトを送信する方法

デバッグスクリプトを送信する簡単な方法は、それぞれ map タスクと reduce タスクをデバッグするために、プロパティ `mapreduce.map.debug.script` と `mapreduce.reduce.debug.script` に値を設定することです。これらのプロパティは、API Configuration.set(`MRJobConfig.MAP_DEBUG_SCRIPT`, String) および Configuration.set(`MRJobConfig.REDUCE_DEBUG_SCRIPT`, String) を使用して設定することもできます。ストリーミングモードでは、それぞれ map タスクと reduce タスクをデバッグするために、コマンドラインオプション `-mapdebug` と `-reducedebug` を使用してデバッグスクリプトを送信できます。

スクリプトへの引数は、タスクの stdout、stderr、syslog、および jobconf ファイルです。MapReduce タスクが失敗したノードで実行される debug コマンドは次のとおりです。
$script $stdout $stderr $syslog $jobconf

Pipes プログラムは、コマンドの 5 番目の引数として c++ プログラム名を持ちます。したがって、Pipes プログラムのコマンドは次のとおりです。
$script $stdout $stderr $syslog $jobconf $program

デフォルトの動作

Pipes の場合、gdb でコアダンプを処理し、スタックトレースを出力し、実行中のスレッドに関する情報を提供するためのデフォルトスクリプトが実行されます。

データ圧縮

Hadoop MapReduce は、アプリケーションライターが中間 map 出力とジョブ出力 (つまり、reduce の出力) の両方の圧縮を指定するための機能を提供します。また、zlib 圧縮アルゴリズムの CompressionCodec 実装もバンドルされています。gzipbzip2snappy、および lz4 ファイル形式もサポートされています。

Hadoop は、パフォーマンス (zlib) と Java ライブラリの入手不可の両方の理由から、上記の圧縮コーデックのネイティブ実装も提供します。使用方法と可用性の詳細は、こちらを参照してください。

中間出力

アプリケーションは、Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) API を使用して中間 map 出力の圧縮を制御し、Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) API を使用して使用する CompressionCodec を制御できます。

ジョブ出力

アプリケーションは、FileOutputFormat.setCompressOutput(Job, boolean) API を使用してジョブ出力の圧縮を制御でき、使用する CompressionCodec は FileOutputFormat.setOutputCompressorClass(Job, Class) API を使用して指定できます。

ジョブ出力が SequenceFileOutputFormat に格納される場合、必要な `SequenceFile.CompressionType` (つまり、`RECORD` / `BLOCK` - デフォルトは `RECORD`) は、SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) API を使用して指定できます。

不正なレコードのスキップ

Hadoop は、map 入力の処理時に特定の不正な入力レコードをスキップできるオプションを提供します。アプリケーションは、SkipBadRecords クラスを使用してこの機能を制御できます。

この機能は、map タスクが特定の入力で確定的にクラッシュする場合に使用できます。これは通常、map 関数のバグが原因で発生します。通常、ユーザーはこれらのバグを修正する必要があります。しかし、これは不可能な場合があります。たとえば、ソースコードが入手できないサードパーティライブラリにバグがある場合があります。このような場合、タスクは複数回試行しても正常に完了せず、ジョブは失敗します。この機能を使用すると、不正なレコードの周囲のデータのごく一部だけが失われ、一部のアプリケーション (たとえば、非常に大きなデータで統計分析を実行するアプリケーション) では許容される場合があります。

デフォルトでは、この機能は無効になっています。有効にするには、SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) および SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) を参照してください。

この機能を有効にすると、フレームワークは特定の数の map 障害の後、「スキップモード」になります。詳細については、SkipBadRecords.setAttemptsToStartSkipping(Configuration, int) を参照してください。「スキップモード」では、map タスクは処理されているレコードの範囲を維持します。これを行うために、フレームワークは処理されたレコードカウンタに依存します。SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS および SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS を参照してください。このカウンタにより、フレームワークは正常に処理されたレコードの数、ひいてはタスクのクラッシュの原因となったレコード範囲を知ることができます。さらに試行すると、このレコード範囲はスキップされます。

スキップされるレコード数は、アプリケーションによって処理済みレコードカウンタがどれだけ頻繁にインクリメントされるかによって異なります。このカウンタは、各レコードが処理された後にインクリメントすることを推奨します。これは、通常バッチ処理を行う一部のアプリケーションでは不可能な場合があります。このような場合、フレームワークは不良レコード周辺の追加レコードをスキップする可能性があります。ユーザーは、SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) および SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) を使用して、スキップされるレコード数を制御できます。フレームワークは、バイナリサーチのようなアプローチを使用して、スキップされるレコードの範囲を絞り込もうとします。スキップされる範囲は2つの半分に分割され、半分だけが実行されます。後続の失敗で、フレームワークはどちらの半分に不良レコードが含まれているかを特定します。タスクは、許容できるスキップ値が満たされるか、すべてのタスク試行が使い果たされるまで再実行されます。タスク試行の回数を増やすには、Job.setMaxMapAttempts(int) および Job.setMaxReduceAttempts(int) を使用します。

スキップされたレコードは、後で分析するために、シーケンスファイル形式でHDFSに書き込まれます。場所は、SkipBadRecords.setSkipOutputPath(JobConf, Path) を使用して変更できます。

例: WordCount v2.0

これまで説明したMapReduceフレームワークによって提供される多くの機能を使用する、より完全な WordCount を以下に示します。

これは、特に DistributedCache 関連の機能のために、HDFSが稼働している必要があります。そのため、擬似分散 または 完全分散 Hadoopインストールでのみ動作します。

ソースコード

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

サンプル実行

入力としてのサンプルテキストファイル

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

アプリケーションを実行します

$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

出力

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

入力が最初に見たバージョンと異なり、それらがどのように出力に影響するか注意してください。

それでは、DistributedCache を介して、無視される単語パターンをリストしたパターンファイルをプラグインしてみましょう。

$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

今回はより多くのオプションを使用して、もう一度実行します。

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

期待どおり、出力は

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

もう一度実行しますが、今回は大文字と小文字の区別をオフにします。

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

確かに、出力は

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

ハイライト

WordCount の2番目のバージョンは、MapReduceフレームワークによって提供されるいくつかの機能を使用することで、以前のバージョンを改善しています。

  • アプリケーションが Mapper(および Reducer)実装の setup メソッドで構成パラメータにアクセスする方法を示します。

  • DistributedCache を使用して、ジョブに必要な読み取り専用データを配布する方法を示します。ここでは、カウント中にスキップする単語パターンをユーザーが指定できます。

  • 汎用Hadoopコマンドラインオプションを処理するための GenericOptionsParser のユーティリティを示します。

  • アプリケーションが Counters を使用する方法と、map(および reduce)メソッドに渡されるアプリケーション固有のステータス情報を設定する方法を示します。

JavaおよびJNIは、米国およびその他の国におけるOracle America, Inc.の商標または登録商標です。