Hadoopストリーミングは、Hadoopディストリビューションに付属するユーティリティです。このユーティリティを使用すると、マッパーおよび/またはリデューサとして任意の実行可能ファイルまたはスクリプトを使用して、Map/Reduceジョブを作成および実行できます。例:
mapred streaming \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /usr/bin/wc
上記の例では、マッパーとリデューサの両方が、標準入力から(行単位で)入力を読み取り、標準出力に出力を出力する実行可能ファイルです。このユーティリティは、Map/Reduceジョブを作成し、適切なクラスタにジョブを提出して、ジョブが完了するまでその進捗状況を監視します。
マッパーに対して実行可能ファイルが指定されている場合、各マッパータスクは、マッパーが初期化されると、実行可能ファイルを個別のプロセスとして起動します。マッパータスクの実行中に、その入力を行に変換し、それらの行をプロセスの標準入力に供給します。同時に、マッパーはプロセスの標準出力から行指向の出力を収集し、各行をキー/値ペアに変換し、マッパーの出力として収集します。デフォルトでは、最初のタブ文字までの*行の先頭*がキー
であり、行の残りの部分(タブ文字を除く)が値
になります。行にタブ文字がない場合は、行全体がキーと見なされ、値はnullになります。ただし、これは後で説明する-inputformat
コマンドオプションを設定することでカスタマイズできます。
リデューサに対して実行可能ファイルが指定されている場合、各リデューサタスクは、リデューサが初期化されると、実行可能ファイルを個別のプロセスとして起動します。リデューサタスクの実行中に、その入力キー/値ペアを行に変換し、それらの行をプロセスの標準入力に供給します。同時に、リデューサはプロセスの標準出力から行指向の出力を収集し、各行をキー/値ペアに変換し、リデューサの出力として収集します。デフォルトでは、最初のタブ文字までの行の先頭がキーであり、行の残りの部分(タブ文字を除く)が値になります。ただし、これは後で説明する-outputformat
コマンドオプションを設定することでカスタマイズできます。
これは、Map/Reduceフレームワークとストリーミングマッパー/リデューサ間の通信プロトコルの基礎です。
ユーザーは、ゼロ以外のステータスで終了するストリーミングタスクをそれぞれ失敗
または成功
にするために、stream.non.zero.exit.is.failure
をtrue
またはfalse
として指定できます。デフォルトでは、ゼロ以外のステータスで終了するストリーミングタスクは、失敗したタスクと見なされます。
ストリーミングは、一般的なコマンドオプションと同様に、ストリーミングコマンドオプションをサポートしています。一般的なコマンドライン構文を以下に示します。
**注:** 一般的なオプションをストリーミングオプションの前に配置してください。そうでない場合、コマンドは失敗します。例については、タスクで使用可能なアーカイブの作成を参照してください。
mapred streaming [genericOptions] [streamingOptions]
Hadoopストリーミングコマンドオプションを以下に示します。
パラメータ | オプション/必須 | 説明 |
---|---|---|
-input ディレクトリ名またはファイル名 | 必須 | マッパーの入力場所 |
-output ディレクトリ名 | 必須 | リデューサの出力場所 |
-mapper 実行可能ファイルまたはJavaクラス名 | オプション | マッパー実行可能ファイル。指定しない場合、IdentityMapperがデフォルトとして使用されます。 |
-reducer 実行可能ファイルまたはJavaクラス名 | オプション | リデューサ実行可能ファイル。指定しない場合、IdentityReducerがデフォルトとして使用されます。 |
-file ファイル名 | オプション | マッパー、リデューサ、またはコンバイナ実行可能ファイルを、計算ノードでローカルに使用できるようにします。 |
-inputformat Javaクラス名 | オプション | 提供するクラスは、Textクラスのキー/値ペアを返す必要があります。指定しない場合、TextInputFormatがデフォルトとして使用されます。 |
-outputformat Javaクラス名 | オプション | 提供するクラスは、Textクラスのキー/値ペアを受け入れる必要があります。指定しない場合、TextOutputFormatがデフォルトとして使用されます。 |
-partitioner Javaクラス名 | オプション | キーがどのリデューサに送信されるかを決定するクラス |
-combiner ストリーミングコマンドまたはJavaクラス名 | オプション | マップ出力のコンバイナ実行可能ファイル |
-cmdenv name=value | オプション | ストリーミングコマンドに環境変数を渡します。 |
-inputreader | オプション | 下位互換性のため:レコードリーダークラス(入力形式クラスではなく)を指定します。 |
-verbose | オプション | 詳細出力 |
-lazyOutput | オプション | 遅延して出力を生成します。たとえば、出力形式がFileOutputFormatに基づいている場合、出力ファイルはContext.writeへの最初の呼び出しでのみ作成されます。 |
-numReduceTasks | オプション | リデューサ数を指定します。 |
-mapdebug | オプション | マップタスクが失敗したときに呼び出すスクリプト |
-reducedebug | オプション | リデュースタスクが失敗したときに呼び出すスクリプト |
マッパーおよび/またはリデューサとしてJavaクラスを指定できます。
mapred streaming \ -input myInputDirs \ -output myOutputDir \ -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer /usr/bin/wc
ゼロ以外のステータスで終了するストリーミングタスクをそれぞれ失敗
または成功
にするために、stream.non.zero.exit.is.failure
をtrue
またはfalse
として指定できます。デフォルトでは、ゼロ以外のステータスで終了するストリーミングタスクは、失敗したタスクと見なされます。
マッパーおよび/またはリデューサとして任意の実行可能ファイルを指定できます。実行可能ファイルは、クラスタ内のマシンに事前に存在する必要はありません。ただし、存在しない場合は、「-file」オプションを使用して、実行可能ファイルをジョブ提出の一部としてパッケージ化することをフレームワークに指示する必要があります。例:
mapred streaming \ -input myInputDirs \ -output myOutputDir \ -mapper myPythonScript.py \ -reducer /usr/bin/wc \ -file myPythonScript.py
上記の例では、ユーザー定義のPython実行可能ファイルがマッパーとして指定されています。「-file myPythonScript.py」オプションにより、Python実行可能ファイルがジョブ提出の一部としてクラスタマシンに送信されます。
実行可能ファイルに加えて、マッパーおよび/またはリデューサで使用される可能性のあるその他の補助ファイル(辞書、設定ファイルなど)もパッケージ化できます。例:
mapred streaming \ -input myInputDirs \ -output myOutputDir \ -mapper myPythonScript.py \ -reducer /usr/bin/wc \ -file myPythonScript.py \ -file myDictionary.txt
通常のMap/Reduceジョブと同様に、ストリーミングジョブに他のプラグインを指定できます。
-inputformat JavaClassName -outputformat JavaClassName -partitioner JavaClassName -combiner streamingCommand or JavaClassName
入力形式に指定するクラスは、Textクラスのキー/値ペアを返す必要があります。入力形式クラスを指定しない場合、TextInputFormatがデフォルトとして使用されます。TextInputFormatはLongWritableクラスのキーを返し、これは実際には入力データの一部ではないため、キーは破棄され、値のみがストリーミングマッパーにパイプされます。
出力形式に指定するクラスは、Textクラスのキー/値ペアを受け入れる必要があります。出力形式クラスを指定しない場合、TextOutputFormatがデフォルトとして使用されます。
ストリーミングコマンドで環境変数を設定するには、以下を使用します。
-cmdenv EXAMPLE_DIR=/home/example/dictionaries/
ストリーミングは、ストリーミングコマンドオプションと同様に、一般的なコマンドオプションをサポートしています。一般的なコマンドライン構文を以下に示します。
**注:** 一般的なオプションをストリーミングオプションの前に配置してください。そうでない場合、コマンドは失敗します。例については、タスクで使用可能なアーカイブの作成を参照してください。
hadoop command [genericOptions] [streamingOptions]
ストリーミングで使用できるHadoop一般的なコマンドオプションを以下に示します。
パラメータ | オプション/必須 | 説明 |
---|---|---|
-conf 設定ファイル | オプション | アプリケーション設定ファイルの指定 |
-D プロパティ=値 | オプション | 指定されたプロパティに値を使用する |
-fs ホスト:ポート または local | オプション | NameNodeの指定 |
-files | オプション | Map/Reduceクラスタにコピーするファイルをカンマ区切りで指定する |
-libjars | オプション | クラスパスに含めるjarファイルをカンマ区切りで指定する |
-archives | オプション | 計算機で展開するアーカイブファイルをカンマ区切りで指定する |
“-D
ローカル一時ディレクトリを変更するには
-D dfs.data.dir=/tmp
追加のローカル一時ディレクトリを指定するには
-D mapred.local.dir=/tmp/local -D mapred.system.dir=/tmp/system -D mapred.temp.dir=/tmp/temp
注記: ジョブ設定パラメータの詳細については、mapred-default.xmlを参照してください。
多くの場合、マップ関数のみを使用して入力データを処理することがあります。これを行うには、mapreduce.job.reduces
をゼロに設定するだけです。Map/Reduceフレームワークは、リデューサタスクを作成しません。むしろ、マッパタスクの出力はジョブの最終出力になります。
-D mapreduce.job.reduces=0
後方互換性のために、Hadoop Streamingは「-reducer NONE」オプションもサポートしており、「-D mapreduce.job.reduces=0」と同等です。
たとえば、リデューサ数を2つに指定するには、以下を使用します。
mapred streaming \ -D mapreduce.job.reduces=2 \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /usr/bin/wc
前述のように、Map/Reduceフレームワークは、マッパーの標準出力から行を読み取るときに、その行をキー/値ペアに分割します。デフォルトでは、行の先頭から最初のタブ文字までのプレフィックスがキーになり、行の残りの部分(タブ文字を除く)が値になります。
ただし、このデフォルトをカスタマイズできます。タブ文字(デフォルト)以外のフィールドセパレータを指定し、行の先頭文字(デフォルト)ではなく、行内のn番目(n >= 1)の文字をキーと値の間のセパレータとして指定できます。たとえば
mapred streaming \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /bin/cat
上記の例では、「-D stream.map.output.field.separator=.」はマップ出力のフィールドセパレータとして「.」を指定し、行内の4番目の「.」までのプレフィックスがキーになり、行の残りの部分(4番目の「.」を除く)が値になります。行に4つの「.」より少ない場合、行全体がキーになり、値は空のTextオブジェクト(new Text("")で作成されたものなど)になります。
同様に、「-D stream.reduce.output.field.separator=SEP」と「-D stream.num.reduce.output.fields=NUM」を使用して、リデュース出力の行内のn番目のフィールドセパレータをキーと値の間のセパレータとして指定できます。
同様に、「stream.map.input.field.separator」と「stream.reduce.input.field.separator」をMap/Reduce入力の入力セパレータとして指定できます。デフォルトのセパレータはタブ文字です。
-filesオプションと-archivesオプションを使用すると、タスクで使用できるファイルとアーカイブを作成できます。引数は、HDFSに既にアップロードされているファイルまたはアーカイブへのURIです。これらのファイルとアーカイブは、ジョブ間でキャッシュされます。ホストとfs_portの値は、fs.default.name設定変数から取得できます。
注記: -filesオプションと-archivesオプションは一般的なオプションです。一般的なオプションはコマンドオプションの前に配置してください。そうでない場合、コマンドは失敗します。
-filesオプションは、ファイルのローカルコピーを指すタスクの現在の作業ディレクトリにシンボリックリンクを作成します。
この例では、Hadoopはタスクの現在の作業ディレクトリにtestfile.txtという名前のシンボリックリンクを自動的に作成します。このシンボリックリンクは、testfile.txtのローカルコピーを指します。
-files hdfs://host:fs_port/user/testfile.txt
ユーザーは、#を使用して-filesの異なるシンボリックリンク名を指定できます。
-files hdfs://host:fs_port/user/testfile.txt#testfile
複数のエントリは次のように指定できます。
-files hdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt
-archivesオプションを使用すると、jarファイルをタスクの現在の作業ディレクトリにローカルにコピーし、ファイルを自動的に解凍できます。
この例では、Hadoopはタスクの現在の作業ディレクトリにtestfile.jarという名前のシンボリックリンクを自動的に作成します。このシンボリックリンクは、アップロードされたjarファイルの解凍されたコンテンツを格納するディレクトリを指します。
-archives hdfs://host:fs_port/user/testfile.jar
ユーザーは、#を使用して-archivesの異なるシンボリックリンク名を指定できます。
-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir
この例では、input.txtファイルには、cachedir.jar/cache.txtとcachedir.jar/cache2.txtという2つのファイルの名前を指定する2行があります。「cachedir.jar」はアーカイブされたディレクトリへのシンボリックリンクであり、「cache.txt」と「cache2.txt」というファイルが含まれています。
mapred streaming \ -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar' \ -D mapreduce.job.maps=1 \ -D mapreduce.job.reduces=1 \ -D mapreduce.job.name="Experiment" \ -input "/user/me/samples/cachefile/input.txt" \ -output "/user/me/samples/cachefile/out" \ -mapper "xargs cat" \ -reducer "cat" $ ls test_jar/ cache.txt cache2.txt $ jar cvf cachedir.jar -C test_jar/ . added manifest adding: cache.txt(in = 30) (out= 29)(deflated 3%) adding: cache2.txt(in = 37) (out= 35)(deflated 5%) $ hdfs dfs -put cachedir.jar samples/cachefile $ hdfs dfs -cat /user/me/samples/cachefile/input.txt cachedir.jar/cache.txt cachedir.jar/cache2.txt $ cat test_jar/cache.txt This is just the cache string $ cat test_jar/cache2.txt This is just the second cache string $ hdfs dfs -ls /user/me/samples/cachefile/out Found 2 items -rw-r--r-* 1 me supergroup 0 2013-11-14 17:00 /user/me/samples/cachefile/out/_SUCCESS -rw-r--r-* 1 me supergroup 69 2013-11-14 17:00 /user/me/samples/cachefile/out/part-00000 $ hdfs dfs -cat /user/me/samples/cachefile/out/part-00000 This is just the cache string This is just the second cache string
Hadoopには、多くのアプリケーションで役立つライブラリークラスKeyFieldBasedPartitionerがあります。このクラスを使用すると、Map/Reduceフレームワークは、キー全体ではなく、特定のキーフィールドに基づいてマップ出力をパーティション分割できます。たとえば
mapred streaming \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ -D mapreduce.partition.keypartitioner.options=-k1,2 \ -D mapreduce.job.reduces=12 \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /bin/cat \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
ここでは、-D stream.map.output.field.separator=.と-D stream.num.map.output.key.fields=4は、前の例で説明したとおりです。この2つの変数は、ストリーミングによってマッパーのキー/値ペアを識別するために使用されます。
上記のMap/Reduceジョブのマップ出力キーは通常、「.」で区切られた4つのフィールドを持ちます。ただし、Map/Reduceフレームワークは、-D mapred.text.key.partitioner.options=-k1,2オプションを使用して、キーの最初の2つのフィールドでマップ出力をパーティション分割します。ここでは、-D map.output.key.field.separator=.は、パーティションのセパレータを指定します。これにより、キーの最初の2つのフィールドが同じであるすべてのキー/値ペアが同じリデューサにパーティション分割されることが保証されます。
これは、最初の2つのフィールドを主キーとして、次の2つのフィールドを副キーとして指定することと事実上同等です。主キーはパーティショニングに使用され、主キーと副キーの組み合わせはソートに使用されます。簡単な例を以下に示します。
マップの出力(キー)
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
3つのリデューサへのパーティション分割(最初の2つのフィールドがパーティションのキーとして使用される)
11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2
各パーティション内でのリデューサのソート(すべての4つのフィールドがソートに使用される)
11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3
Hadoopには、多くのアプリケーションで役立つライブラリークラスKeyFieldBasedComparatorがあります。このクラスは、Unix/GNU Sortによって提供される機能のサブセットを提供します。たとえば
mapred streaming \ -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D mapreduce.map.output.key.field.separator=. \ -D mapreduce.partition.keycomparator.options=-k2,2nr \ -D mapreduce.job.reduces=1 \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /bin/cat
上記のMap/Reduceジョブのマップ出力キーは通常、「.」で区切られた4つのフィールドを持ちます。ただし、Map/Reduceフレームワークは、-D mapreduce.partition.keycomparator.options=-k2,2nrオプションを使用して、キーの2番目のフィールドで出力をソートします。ここでは、-nは数値ソートを指定し、-rは結果を反転させることを指定します。簡単な例を以下に示します。
マップの出力(キー)
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
リデューサのソート出力(2番目のフィールドがソートに使用される場合)
11.14.2.3 11.14.2.2 11.12.1.2 11.12.1.1 11.11.4.1
Hadoopには、Aggregateと呼ばれるライブラリパッケージがあります。Aggregateは、特別なリデューサクラスと特別なコンバイナクラス、および値のシーケンスに対して「sum」、「max」、「min」などの集計を実行する単純な集計子のリストを提供します。Aggregateを使用すると、マッパーの各入力キー/値ペアに対して「集計可能なアイテム」を生成すると予想されるマッパーープラグインクラスを定義できます。コンバイナ/リデューサは、適切な集計子を呼び出すことによって、これらの集計可能なアイテムを集計します。
Aggregateを使用するには、「-reducer aggregate」を指定するだけです。
mapred streaming \ -input myInputDirs \ -output myOutputDir \ -mapper myAggregatorForKeyCount.py \ -reducer aggregate \ -file myAggregatorForKeyCount.py
pythonプログラムmyAggregatorForKeyCount.pyは次のようになります。
#!/usr/bin/python3 import sys def generateLongCountToken(id): return "LongValueSum:" + id + "\t" + "1" def main(argv): line = sys.stdin.readline() try: while line: line = line[:-1] fields = line.split("\t") print(generateLongCountToken(fields[0])) line = sys.stdin.readline() except "end of file": return None if __name__ == "__main__": main(sys.argv)
Hadoopには、unixの「cut」ユーティリティのようにテキストデータを処理できるライブラリークラスFieldSelectionMapReduceがあります。クラスで定義されたマップ関数は、各入力キー/値ペアをフィールドのリストとして扱います。フィールドセパレータを指定できます(デフォルトはタブ文字です)。マップ出力キーとして任意のフィールドリストを選択し、マップ出力値として任意のフィールドリストを選択できます。同様に、クラスで定義されたリデュース関数は、各入力キー/値ペアをフィールドのリストとして扱います。リデュース出力キーとして任意のフィールドリストを選択し、リデュース出力値として任意のフィールドリストを選択できます。たとえば
mapred streaming \ -D mapreduce.map.output.key.field.separator=. \ -D mapreduce.partition.keypartitioner.options=-k1,2 \ -D mapreduce.fieldsel.data.field.separator=. \ -D mapreduce.fieldsel.map.output.key.value.fields.spec=6,5,1-3:0- \ -D mapreduce.fieldsel.reduce.output.key.value.fields.spec=0-2:5- \ -D mapreduce.map.output.key.class=org.apache.hadoop.io.Text \ -D mapreduce.job.reduces=12 \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \ -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
オプション「-D mapreduce.fieldsel.map.output.key.value.fields.spec=6,5,1-3:0-」は、マップ出力のキー/値の選択を指定します。キー選択仕様と値選択仕様は「:」で区切られます。この場合、マップ出力キーはフィールド6、5、1、2、3で構成されます。マップ出力値は、すべてのフィールドで構成されます(0-はフィールド0とそれに続くすべてのフィールドを意味します)。
オプション「-D mapreduce.fieldsel.reduce.output.key.value.fields.spec=0-2:5-」は、リデュース出力のキー/値の選択を指定します。この場合、リデュース出力キーはフィールド0、1、2(元のフィールド6、5、1に対応)で構成されます。リデュース出力値は、フィールド5から始まるすべてのフィールド(元のすべてのフィールドに対応)で構成されます。
多くの場合、Map Reduceのすべての機能は必要なく、同じプログラムの複数のインスタンスを、データの異なる部分で、または同じデータで異なるパラメータを使用して実行するだけで済みます。Hadoop Streamingを使用してこれを行うことができます。
例として、Hadoopクラスタ全体でファイル一式を圧縮する問題を考えてみましょう。Hadoop Streamingとカスタムマッパースクリプトを使用してこれを実現できます。
入力ファイルの完全なHDFSパスを含むファイルを作成します。各マップタスクは、入力として1つのファイル名を取得します。
ファイル名を与えられると、ファイルをローカルディスクに取得し、ファイルをgzip圧縮して目的の出力ディレクトリに戻すマッパースクリプトを作成します。
詳細については、MapReduceチュートリアルを参照してください:Reducer
例えば、次のようにした場合を考えてみます。alias c1='cut -f1'。-mapper "c1" は機能しますか?
エイリアスを使用しても機能しませんが、この例に示すように、変数置換は許可されています。
$ hdfs dfs -cat /user/me/samples/student_marks alice 50 bruce 70 charlie 80 dan 75 $ c2='cut -f2'; mapred streaming \ -D mapreduce.job.name='Experiment' \ -input /user/me/samples/student_marks \ -output /user/me/samples/student_out \ -mapper "$c2" -reducer 'cat' $ hdfs dfs -cat /user/me/samples/student_out/part-00000 50 70 75 80
例えば、-mapper "cut -f1 | sed s/foo/bar/g" は機能しますか?
現在これは機能せず、「java.io.IOException: Broken pipe」エラーが発生します。これは調査が必要なバグの可能性があります。
例えば、-file オプションを介して大きな実行ファイル(例えば3.6G)を配布してストリーミングジョブを実行すると、「デバイスの空き領域がありません」というエラーが発生します。
jarパッケージングは、構成変数stream.tmpdirが指すディレクトリで行われます。stream.tmpdirのデフォルト値は/tmpです。より多くの空き領域のあるディレクトリに値を設定してください。
-D stream.tmpdir=/export/bigspace/…
複数の '-input' オプションを使用して、複数の入力ディレクトリを指定できます。
mapred streaming \ -input '/user/foo/dir1' -input '/user/foo/dir2' \ (rest of the command)
プレーンテキストファイルの代わりに、生成された出力としてgzipファイルを生成できます。ストリーミングジョブのオプションとして ' -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec' を渡してください。
カスタムjarをパッケージ化して`$HADOOP_CLASSPATH`に配置することで、独自のクラスを指定できます。
XMLドキュメントを処理するために、レコードリーダーStreamXmlRecordReaderを使用できます。
mapred streaming \ -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" \ (rest of the command)
BEGIN_STRINGとEND_STRINGの間にあるものはすべて、マップタスクの1つのレコードとして扱われます。
StreamXmlRecordReaderが認識する名前と値のプロパティは次のとおりです。
ストリーミングプロセスは、stderrを使用してカウンター情報を送信できます。カウンターを更新するには、`reporter:counter:<グループ>,<カウンター>,<数値>`をstderrに送信する必要があります。
ストリーミングプロセスは、stderrを使用してステータス情報を送信できます。ステータスを設定するには、`reporter:status:<メッセージ>`をstderrに送信する必要があります。
設定済みのパラメータを参照してください。ストリーミングジョブの実行中、「mapred」パラメータの名前は変換されます。ドット(.)はアンダースコア(_)になります。例えば、mapreduce.job.idはmapreduce_job_idになり、mapreduce.job.jarはmapreduce_job_jarになります。コードでは、アンダースコアを含むパラメータ名を使用してください。
ジョブは構成全体を環境にコピーします。ジョブが多数の入力ファイルを処理している場合、環境にジョブ構成を追加すると、環境のオーバーランが発生する可能性があります。環境内のジョブ構成のコピーはジョブの実行に不可欠ではなく、次のように設定することで切り捨てることができます。
-D stream.jobconf.truncate.limit=20000
デフォルトでは値は切り捨てられません(-1)。0は名前のみをコピーし、値はコピーしません。ほとんどの場合、20000は環境のオーバーランを防ぐ安全な値です。