DistCp(分散コピー)は、大規模なクラスタ内/クラスタ間コピーに使用されるツールです。MapReduceを使用して、分散処理、エラー処理と回復、およびレポートを実行します。ファイルとディレクトリのリストをマップタスクへの入力に展開し、それぞれがソースリストに指定されたファイルのパーティションをコピーします。
DistCpの以前の実装https://hadoop.dokyumento.jp/docs/r1.2.1/distcp.htmlには、使用方法、拡張性、パフォーマンスの両方において、独自の癖と欠点があります。DistCpのリファクタリングの目的は、これらの欠点を修正し、プログラムによって使用および拡張できるようにすることです。レガシーの動作をデフォルトとして維持しながら、実行時と設定のパフォーマンスを向上させるために、新しいパラダイムが導入されました。
このドキュメントでは、新しいDistCpの設計、その新しい機能、最適な使用方法、およびレガシー実装からのずれについて説明することを目的としています。
DistCpの最も一般的な呼び出しは、クラスタ間コピーです。
bash$ hadoop distcp hdfs://nn1:8020/foo/bar \ hdfs://nn2:8020/bar/foo
これにより、nn1上の/foo/bar
の下のネームスペースが一時ファイルに展開され、その内容がマップタスクのセットに分割され、各NodeManager上でnn1
からnn2
へのコピーが開始されます。
コマンドラインで複数のソースディレクトリを指定することもできます。
bash$ hadoop distcp hdfs://nn1:8020/foo/a \ hdfs://nn1:8020/foo/b \ hdfs://nn2:8020/bar/foo
または、同等に、-fオプションを使用してファイルから指定することもできます。
bash$ hadoop distcp -f hdfs://nn1:8020/srclist \ hdfs://nn2:8020/bar/foo
ここで、srclist
には以下が含まれています。
hdfs://nn1:8020/foo/a hdfs://nn1:8020/foo/b
複数のソースからコピーする場合、2つのソースが衝突すると、DistCpはエラーメッセージとともにコピーを中止しますが、宛先での衝突は、指定されたオプションに従って解決されます。デフォルトでは、宛先に既に存在するファイルはスキップされます(つまり、ソースファイルで置き換えられません)。各ジョブの最後にスキップされたファイル数が報告されますが、一部のファイルでコピアが失敗したが、後の試行で成功した場合、不正確になる可能性があります。
各NodeManagerがソースファイルシステムと宛先ファイルシステムの両方にアクセスして通信できることが重要です。HDFSの場合、ソースと宛先の両方で同じバージョンのプロトコルを実行するか、下位互換性のあるプロトコルを使用する必要があります。[バージョンの間のコピー](#Copying_Between_Versions_of_HDFS)を参照してください。
コピー後、ソースと宛先のリストを生成して相互にチェックし、コピーが実際に成功したことを確認することをお勧めします。DistCpはMap/ReduceとファイルシステムAPIの両方を使用するため、3つのいずれかまたはそれらの間の問題が、コピーに悪影響を与え、サイレントに影響を与える可能性があります。-update
を有効にして2回目のパスを実行することに成功した人もいますが、ユーザーは試行する前にその意味を理解する必要があります。
別のクライアントがまだソースファイルに書き込んでいる場合、コピーは失敗する可能性があることにも注意してください。宛先に書き込まれているファイルを上書きしようとすると、HDFSでも失敗する可能性があります。ソースファイルがコピーされる前に(再)移動されると、FileNotFoundException
でコピーが失敗します。
DistCpで使用可能なすべてのオプションの詳細については、コマンドラインリファレンスを参照してください。
-update
は、ターゲットに存在しないか、ターゲットバージョンと異なるソースからのファイルをコピーするために使用されます。-overwrite
は、ターゲットに存在するターゲットファイルを上書きします。
更新と上書きオプションは、ソースパスの処理方法がデフォルトとは微妙に異なるため、特別な注意が必要です。/source/first/
と/source/second/
から/target/
へのコピーを検討してください。ソースパスには次の内容が含まれています。
hdfs://nn1:8020/source/first/1 hdfs://nn1:8020/source/first/2 hdfs://nn1:8020/source/second/10 hdfs://nn1:8020/source/second/20
-update
または-overwrite
を指定せずにDistCpを呼び出すと、DistCpのデフォルトでは、/target
の下にfirst/
とsecond/
ディレクトリが作成されます。したがって
distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
は、/target
に次の内容を作成します。
hdfs://nn2:8020/target/first/1 hdfs://nn2:8020/target/first/2 hdfs://nn2:8020/target/second/10 hdfs://nn2:8020/target/second/20
-update
または-overwrite
を指定すると、ソースディレクトリの**内容**がターゲットにコピーされ、ソースディレクトリ自体はコピーされません。したがって
distcp -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
は、/target
に次の内容を作成します。
hdfs://nn2:8020/target/1 hdfs://nn2:8020/target/2 hdfs://nn2:8020/target/10 hdfs://nn2:8020/target/20
拡張として、両方のソースフォルダに同じ名前のファイル(たとえば、0
)が含まれている場合、両方のソースは宛先の/target/0
にエントリをマップします。この競合を許可するのではなく、DistCpは中止します。
次に、次のコピー操作を考えてみましょう。
distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
ソース/サイズ:
hdfs://nn1:8020/source/first/1 32 hdfs://nn1:8020/source/first/2 32 hdfs://nn1:8020/source/second/10 64 hdfs://nn1:8020/source/second/20 32
宛先/サイズ:
hdfs://nn2:8020/target/1 32 hdfs://nn2:8020/target/10 32 hdfs://nn2:8020/target/20 64
は次のようになります。
hdfs://nn2:8020/target/1 32 hdfs://nn2:8020/target/2 32 hdfs://nn2:8020/target/10 64 hdfs://nn2:8020/target/20 32
ファイルの長さと内容が一致するため、1
はスキップされます。2
はターゲットに存在しないためコピーされます。内容がソースと一致しないため、10
と20
は上書きされます。
-update
を使用する場合、ファイルの長さと内容が一致するため、1
はスキップされます。2
はターゲットに存在しないためコピーされます。内容がソースと一致しないため、10
と20
は上書きされます。ただし、-append
も追加で使用されると、10
のみが上書きされ(ソースの長さが宛先より短い)、20
にはファイルの変更が追加されます(ファイルが宛先の元の長さまで一致する場合)。
-overwrite
を使用すると、1
も上書きされます。
-diff
オプションは、スナップショット差分を使用して、ソースクラスタからターゲットクラスタにファイルを同期します。スナップショット差分リスト内のファイルをコピー、名前変更、削除します。
-diff
オプションを使用する場合は、-update
オプションを含める必要があります。
現時点では、ほとんどのクラウドプロバイダーは同期と適切に連携しません。
使用方法
hadoop distcp -update -diff <from_snapshot> <to_snapshot> <source> <destination>
例
hadoop distcp -update -diff snap1 snap2 /src/ /dst/
上記のコマンドは、/src/
のsnapshot snap1
からsnap2
(つまり、snap1
からsnap2
のスナップショット差分)の変更を/dst/
に適用します。明らかに、/src/
にはsnap1
とsnap2
の両方のスナップショットが必要です。しかし、宛先/dst/
にも、この場合はsnap1
と同様に<from_snapshot>
と同じ名前のスナップショットが必要です。宛先/dst/
には、snap1
以降に新しいファイル操作(作成、名前変更、削除)があってはなりません。このコマンドが完了すると、宛先/dst/
に新しいスナップショットsnap2
は**作成されません**。
-diff
オプションを使用するには、-update
が必要です。
たとえば、/src/
で、snap1
の作成後、snap2
の作成前に1.txt
が追加され、2.txt
が削除された場合、上記のコマンドは/src/
から/dst/
に1.txt
をコピーし、/dst/
から2.txt
を削除します。
同期の動作は、以下の実験を使用して詳しく説明します。
開始前にいくつかの準備をします。
# Create source and destination directories hdfs dfs -mkdir /src/ /dst/ # Allow snapshot on source hdfs dfsadmin -allowSnapshot /src/ # Create a snapshot (empty one) hdfs dfs -createSnapshot /src/ snap1 # Allow snapshot on destination hdfs dfsadmin -allowSnapshot /dst/ # Create a from_snapshot with the same name hdfs dfs -createSnapshot /dst/ snap1 # Put one text file under /src/ echo "This is the 1st text file." > 1.txt hdfs dfs -put 1.txt /src/ # Create the second snapshot hdfs dfs -createSnapshot /src/ snap2 # Put another text file under /src/ echo "This is the 2nd text file." > 2.txt hdfs dfs -put 2.txt /src/ # Create the third snapshot hdfs dfs -createSnapshot /src/ snap3
次に、distcp同期を実行します。
hadoop distcp -update -diff snap1 snap2 /src/ /dst/
上記のコマンドは成功するはずです。1.txt
は/src/
から/dst/
にコピーされます。繰り返しますが、-update
オプションが必要です。
同じコマンドをもう一度実行すると、DistCp sync failed
例外が発生します。これは、snap1
以降、宛先に新しいファイル1.txt
が追加されたためです。つまり、/dst/
から1.txt
を手動で削除して同期を実行すると、コマンドは成功します。
まず、実験1のクリーンアップを行います。
hdfs dfs -rm -skipTrash /dst/1.txt
同期コマンドを実行します。実験1のsnap2
からsnap3
に変更されていることに注意してください。
hadoop distcp -update -diff snap1 snap3 /src/ /dst/
1.txt
と2.txt
の両方が/dst/
にコピーされます。
実験2の終了時からの継続です。
hdfs dfs -rm -skipTrash /dst/2.txt # Create snap2 at destination, it contains 1.txt hdfs dfs -createSnapshot /dst/ snap2 # Delete 1.txt from source hdfs dfs -rm -skipTrash /src/1.txt # Create snap4 at source, it only contains 2.txt hdfs dfs -createSnapshot /src/ snap4
ここで同期コマンドを実行します。
hadoop distcp -update -diff snap2 snap4 /src/ /dst/
2.txt
がコピーされ、1.txt
が/dst/
から削除されます。
/src/
と/dst/
の両方に同じ名前snap2
のスナップショットがある場合でも、スナップショットの内容が同じである必要はありません。つまり、/dst/
のsnap2
に1.txt
があっても、内容が異なれば、1.txt
は/dst/
から削除されます。同期コマンドは、削除しようとしているファイルの内容をチェックしません。単に<from_snapshot>
と<to_snapshot>
間のスナップショット差分リストに従います。
また、上記の手順で/dst/
上でsnap2
を作成する前に/dst/
から1.txt
を削除し、同期コマンドを実行する前に/dst/
のsnap2
に1.txt
が存在しないようにした場合でも、コマンドは成功します。存在しない/dst/
から1.txt
を削除しようとする際に例外は発生しません。
このセクションはHDFSのみに適用されます。
ターゲットとすべてのソースパス名が/.reserved/raw
階層内にある場合、「raw」名前空間拡張属性は保持されます。「raw」xattrは、暗号化メタデータなどの内部関数でシステムによって使用されます。それらは、/.reserved/raw
階層を介してアクセスした場合にのみユーザーに表示されます。
raw xattrは、/.reserved/rawプレフィックスが提供されているかどうかに基づいてのみ保持されます。-p(保持、下記参照)フラグは、raw xattrの保持に影響しません。
raw xattrの保持を防ぐには、ソースパスとターゲットパスのいずれにも/.reserved/raw
プレフィックスを使用しないようにしてください。
ソースパスとターゲットパスのサブセットのみに/.reserved/raw
プレフィックスが指定されている場合、エラーが表示され、0以外の終了コードが返されます。
フラグ | 説明 | 備考 |
---|---|---|
-p[rbugpcaxt] |
保持 r: レプリケーション数 b: ブロックサイズ u: ユーザー g: グループ p: 権限 c: チェックサムタイプ a: ACL x: XAttr t: タイムスタンプ | -update が指定されている場合、ファイルサイズも異なる場合(つまり、ファイルが再作成された場合)を除き、ステータス更新は同期されません。-paが指定されている場合、DistCpはACLが権限のスーパーセットであるため、権限も保持します。オプション-prは、ソースディレクトリとターゲットディレクトリの両方がイレージャーコード化されていない場合にのみ有効です。 |
-i |
エラーを無視する | 付録で説明されているように、このオプションを使用すると、デフォルトの場合よりも正確なコピーに関する統計情報を保持できます。また、失敗したコピーからのログも保持され、デバッグに役立ちます。最後に、失敗したマップは、すべての分割が試行される前にジョブが失敗する原因にはなりません。 |
-log <logdir> |
<logdir>にログを書き込む | DistCpは、コピーしようとした各ファイルのログをマップ出力として保持します。マップが失敗した場合、再実行された場合はログ出力は保持されません。 |
-v |
SKIP/COPYログに追加情報(パス、サイズ)を記録する | このオプションは-logオプションと組み合わせてのみ使用できます。 |
-m <num_maps> |
同時コピーの最大数 | データをコピーするマップの数を指定します。マップを増やすとスループットが向上するとは限りません。 |
-overwrite |
宛先を上書きする | マップが失敗し、-i が指定されていない場合、失敗したファイルだけでなく、分割内のすべてのファイルが再コピーされます。使用方法のドキュメントで説明されているように、宛先パスの生成に関するセマンティクスも変更されるため、ユーザーは慎重に使用する必要があります。 |
-update |
ソースと宛先のサイズ、ブロックサイズ、またはチェックサムが異なる場合に上書きする | 前述のように、これは「同期」操作ではありません。調査される基準は、ソースファイルと宛先ファイルのサイズ、ブロックサイズ、チェックサムです。それらが異なる場合、ソースファイルが宛先ファイルを置き換えます。使用方法のドキュメントで説明されているように、宛先パスの生成に関するセマンティクスも変更されるため、ユーザーは慎重に使用する必要があります。 |
-append |
名前が同じで長さが異なるファイルの増分コピー | ソースファイルの長さが宛先ファイルよりも長い場合、共通の長さの部分のチェックサムが比較されます。チェックサムが一致する場合、読み取りと追加の機能を使用して、差分のみがコピーされます。-appendオプションは、-skipcrccheck なしの-update と組み合わせてのみ機能します。 |
-f <urilist_uri> |
<urilist_uri>のリストをソースリストとして使用する | これは、コマンドラインで各ソースをリストすることと同じです。urilist_uri リストは、完全修飾URIである必要があります。 |
-filters |
パターン文字列のリストを含むファイルへのパス(1行に1つの文字列)。パターンに一致するパスはコピーから除外されます。 | java.util.regex.Patternで指定された正規表現をサポートします。 |
-filelimit <n> |
ファイルの総数を≦nに制限する | 非推奨!新しいDistCpでは無視されます。 |
-sizelimit <n> |
合計サイズを≦nバイトに制限する | 非推奨!新しいDistCpでは無視されます。 |
-delete |
宛先に存在するがソースに存在しないファイルを削除する | 削除はFS Shellによって実行されます。そのため、有効になっている場合はゴミ箱が使用されます。削除は、updateまたはoverwriteオプションでのみ適用できます。 |
-strategy {dynamic|uniformsize} |
DistCpで使用されるコピー戦略を選択する | デフォルトでは、uniformsizeが使用されます。(つまり、マップは各マップによってコピーされたファイルの合計サイズでバランスが取られます。レガシーと同様です)「dynamic」が指定されている場合、代わりにDynamicInputFormat が使用されます。(これは、「InputFormats」の下のアーキテクチャセクションで説明されています。) |
-bandwidth |
マップごとの帯域幅をMB/秒で指定する | 各マップは、指定された帯域幅のみを使用するように制限されます。これは常に正確ではありません。マップはコピー中に帯域幅消費量を抑制するため、使用される**ネット**帯域幅は指定値に近づきます。 |
-atomic {-tmp <tmp_dir>} |
オプションのテンポラリディレクトリを使用して、アトミックコミットを指定する | -atomic は、DistCpにソースデータを一時的なターゲット場所にコピーしてから、一時的なターゲットを最終的な場所にアトミックに移動するように指示します。データは、完全かつ一貫した形式で最終的なターゲットで使用可能になるか、まったく使用可能になりません。オプションとして、-tmp を使用して一時的なターゲットの場所を指定できます。指定されていない場合、デフォルトが選択されます。**注記:**tmp_dirは最終的なターゲットクラスタ上に存在する必要があります。 |
-async |
DistCpを非同期で実行する。Hadoopジョブが起動されるとすぐに終了する。 | 追跡のためにHadoopジョブIDがログに記録されます。 |
-diff <oldSnapshot> <newSnapshot> |
指定された2つのスナップショット間のスナップショット差分レポートを使用して、ソースとターゲットの違いを特定し、その差分をターゲットに適用してソースと同期状態にします。 | このオプションは-update オプションでのみ有効であり、次の条件を満たす必要があります。
|
-rdiff <newSnapshot> <oldSnapshot> |
指定された2つのスナップショット間のスナップショット差分レポートを使用して、ターゲットのスナップショット<oldSnapshot> が作成されてからターゲットで何が変更されたかを特定し、その差分をターゲットに逆方向に適用し、ソースの<oldSnapshot> から変更されたファイルをコピーして、ターゲットを<oldSnapshot> と同じにします。 |
このオプションは-update オプションでのみ有効であり、次の条件を満たす必要があります。
|
-numListstatusThreads |
ファイルリストの作成に使用するスレッド数 | 最大40スレッド。 |
-skipcrccheck |
ソースパスとターゲットパスの間のCRCチェックをスキップするかどうか。 | |
-blocksperchunk <blocksperchunk> |
チャンクあたりのブロック数。指定すると、ファイルをチャンクに分割して並列にコピーします。 | 正の値に設定すると、これより多くのブロックを持つファイルは<blocksperchunk> ブロックのチャンクに分割され、並列で転送され、宛先で再構成されます。デフォルトでは、<blocksperchunk> は0であり、ファイルは分割せずに全体として送信されます。このスイッチは、ソースファイルシステムがgetBlockLocationsメソッドを実装し、ターゲットファイルシステムがconcatメソッドを実装する場合にのみ適用されます。 |
-copybuffersize <copybuffersize> |
使用するコピーバッファのサイズ。デフォルトでは、<copybuffersize> は8192Bに設定されます。 |
|
-xtrack <path> |
欠損しているソースファイルに関する情報を指定されたパスに保存します。 | このオプションは-update オプションでのみ有効です。これは実験的なプロパティであり、-atomic オプションと組み合わせて使用することはできません。 |
-direct |
宛先パスに直接書き込む | 宛先がオブジェクトストレージである場合、非常に高価になる可能性のある一時ファイルの名前変更操作を回避するのに役立ちます。 |
-useiterator |
リストの作成にシングルスレッドのlistStatusIteratorを使用する | クライアント側のメモリ節約に役立ちます。このオプションを使用すると、numListstatusThreadsオプションは無視されます。 |
新しいDistCpのコンポーネントは、次のカテゴリに分類できます。
DistCpドライバコンポーネントは、次の役割を担います。
コマンドラインでDistCpコマンドに渡された引数を、
コマンド引数を適切なDistCpOptionsオブジェクトに組み立て、DistCpを初期化します。これらの引数には、以下が含まれます。
コピー操作のオーケストレーション
パーサー要素は、コマンドラインからのみ実行されます(またはDistCp::run()が呼び出された場合)。DistCpクラスは、DistCpOptionsオブジェクトを構築し、DistCpオブジェクトを適切に初期化することで、プログラム的にも使用できます。
コピー一覧ジェネレータクラスは、ソースからコピーするファイル/ディレクトリのリストを作成する役割を担います。ソースパス(ワイルドカードを含むファイル/ディレクトリ)の内容を調べ、DistCp Hadoopジョブで使用するために、コピーが必要なすべてのパスをSequenceFileに記録します。このモジュールにおける主なクラスには以下が含まれます。
CopyListing
:コピー一覧ジェネレータ実装によって実装されるべきインターフェース。具体的なCopyListing実装を選択するためのファクトリメソッドも提供します。SimpleCopyListing
:複数のソースパス(ファイル/ディレクトリ)を受け入れ、それぞれの下にある個々のファイルとディレクトリを再帰的にリストしてコピーするためのCopyListing
の実装。GlobbedCopyListing
:ソースパス内のワイルドカードを展開するCopyListing
の別の実装。FileBasedCopyListing
:指定されたファイルからソースパスリストを読み取るCopyListing
の実装。DistCpOptionsでソースファイルリストが指定されているかどうかに基づいて、ソースリストは次のいずれかの方法で生成されます。
GlobbedCopyListing
が使用されます。すべてのワイルドカードが展開され、すべての展開がSimpleCopyListingに転送され、SimpleCopyListingは順番に(各パスの再帰的下降によって)リストを構築します。FileBasedCopyListing
が使用されます。ソースパスは指定されたファイルから読み取られ、GlobbedCopyListing
に転送されます。その後、上記のようにリストが構築されます。CopyListingインターフェースのカスタム実装を提供することで、コピーリストの構築方法をカスタマイズできます。パスのコピーの考慮方法において、DistCpの動作は従来のDistCpとは異なります。
コピーすべきではないファイルのフィルタリングをカスタマイズするには、サポートされているCopyFilterインターフェースの現在の実装を渡すか、新しい実装を作成できます。これは、DistCpOptionsでdistcp.filters.class
を設定することで指定できます。
distcp.filters.class
を"RegexCopyFilter"に設定します。この実装を使用する場合は、フィルタリングに使用される正規表現を含む"CopyFilter" distcp.filters.file
も渡す必要があります。java.util.regex.Patternで指定された正規表現をサポートします。distcp.filters.class
を"RegexpInConfigurationFilter"に設定します。"DistCpOptions"でdistcp.exclude-file-regex
パラメータも使用して正規表現を渡す必要があります。java.util.regex.Patternで指定された正規表現をサポートします。"RegexCopyFilter"と比較して、より動的なアプローチです。distcp.filters.class
を"TrueCopyFilter"に設定します。上記いずれのオプションも指定されていない場合、デフォルトの実装として使用されます。従来の実装では、ターゲットに確実にコピーする必要があるパスのみがリストされます。たとえば、ファイルが既にターゲットに存在する場合(そして-overwrite
が指定されていない場合)、そのファイルはMapReduceコピージョブでは考慮されません。設定時に(つまり、MapReduceジョブの前に)これを決定するには、ファイルサイズとチェックサムの比較が必要であり、時間がかかる可能性があります。
新しいDistCpは、MapReduceジョブまでこのようなチェックを延期するため、設定時間が短縮されます。これらのチェックが複数のマップに並列化されるため、パフォーマンスがさらに向上します。
InputFormatsとMapReduceコンポーネントは、ファイルとディレクトリのソースパスから宛先パスへの実際のコピーを担当します。コピー一覧生成中に作成された一覧ファイルは、コピーの実行時にこの時点で消費されます。ここで重要なクラスには以下が含まれます。
UniformSizeInputFormat:org.apache.hadoop.mapreduce.InputFormatの実装により、マップ間の負荷分散において従来のDistCpと同等の機能を提供します。UniformSizeInputFormatの目的は、各マップがほぼ同じバイト数のファイルをコピーすることです。適切に、一覧ファイルはパスグループに分割され、各InputSplit内のファイルサイズの合計が他のすべてのマップとほぼ等しくなります。分割が常に完璧というわけではありませんが、その単純な実装により、設定時間が短縮されます。
DynamicInputFormatとDynamicRecordReader:DynamicInputFormatはorg.apache.hadoop.mapreduce.InputFormat
を実装し、DistCpの新機能です。一覧ファイルはいくつかの「チャンクファイル」に分割され、チャンクファイルの正確な数は、Hadoopジョブで要求されたマップ数の倍数になります。各マップタスクは、ジョブが起動される前に、チャンクの1つを(チャンクをタスクのIDに名前変更することで)「割り当て」られます。パスはDynamicRecordReader
を使用して各チャンクから読み取られ、CopyMapperで処理されます。チャンク内のすべてのパスが処理されると、現在のチャンクは削除され、新しいチャンクが取得されます。チャンクがなくなるまで、このプロセスが継続されます。この「動的」なアプローチにより、高速なマップタスクは遅いタスクよりも多くのパスを消費できるため、DistCpジョブ全体の速度が向上します。
CopyMapper:このクラスは、物理的なファイルコピーを実装します。入力パスは、(ジョブの設定で指定された)入力オプションに対してチェックされ、ファイルをコピーする必要があるかどうかが決定されます。ファイルは、次のいずれかがtrueの場合にのみコピーされます。
-skipcrccheck
が指定されていない。-overwrite
が指定されている。CopyCommitter:このクラスは、DistCpジョブのコミットフェーズを担当し、以下を含みます。
デフォルトでは、DistCpは各マップがほぼ同じバイト数をコピーするように、各マップのサイズを同等にする試みを行います。ファイルは最も細かい粒度であるため、同時コピアー(つまりマップ)の数を増やしても、同時コピーの数や全体のスループットが常に増加するとは限りません。
新しいDistCpは、高速なデータノードが遅いノードよりも多くのバイトをコピーできるように、マップサイズを「動的に」調整する戦略も提供します。-strategy dynamic
(アーキテクチャで説明)を使用すると、各マップタスクに固定されたソースファイルセットを割り当てるのではなく、ファイルはいくつかのセットに分割されます。セットの数は、通常、マップ数の2〜3倍になります。各マップは、チャンクにリストされているすべてのファイルを取得してコピーします。チャンクを使い果たすと、新しいチャンクが取得され処理され、チャンクがなくなるまで続きます。
ソースパスを固定されたマップに割り当てないことで、高速なマップタスク(つまりデータノード)はより多くのチャンクを消費し、したがって遅いノードよりも多くのデータをコピーできます。この分散は均一ではありませんが、各マッパーの容量に関して公平です。
動的戦略は、DynamicInputFormat
によって実装されます。ほとんどの状況で優れたパフォーマンスを提供します。
長時間実行されるジョブや定期的に実行されるジョブの場合、ソースクラスタと宛先クラスタのサイズ、コピーのサイズ、利用可能な帯域幅に合わせてマップ数を調整することをお勧めします。
Hadoopの異なるメジャーバージョン間(例:1.Xと2.Xの間)のコピーを行うには、通常、WebHdfsFileSystemを使用します。以前のHftpFileSystemとは異なり、webhdfsは読み取りと書き込みの両方の操作で使用できるため、DistCpはソースクラスタと宛先クラスタの両方で実行できます。リモートクラスタはwebhdfs://<namenode_hostname>:<http_port>
として指定します。Hadoopクラスタの同じメジャーバージョン間(例:2.Xと2.Xの間)のコピーを行う場合は、パフォーマンス向上のためにhdfsプロトコルを使用します。
webhdfsがSSLで保護されている場合は、「swebhdfs://
」スキームを使用します。詳細については、SWebHDFSのSSL設定を参照してください。
前述のように、マップが入力データの1つのコピーに失敗した場合、いくつかの副作用が発生します。
-overwrite
が指定されていない限り、再実行時に前のマップによって正常にコピーされたファイルは「スキップ済み」としてマークされます。mapreduce.map.maxattempts
回失敗した場合、残りのマップタスクはキルされます(-i
が設定されていない限り)。mapreduce.map.speculative
がfinalおよびtrueに設定されている場合、コピーの結果は未定義です。DistCpは、Amazon S3、Azure ABFS、Google GCSなどのオブジェクトストアと連携します。
前提条件
DistCpを使用してデータをアップロードできます。
hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
データをダウンロードするには
hadoop distcp s3a://bucket/generated/results hdfs://nn1:8020/results
オブジェクトストア間でデータをコピーするには
hadoop distcp s3a://bucket/generated/results \ wasb://updates@example.blob.core.windows.net
オブジェクトストア内でデータをコピーするには
hadoop distcp wasb://updates@example.blob.core.windows.net/current \ wasb://updates@example.blob.core.windows.net/old
そして、変更されたファイルのみをコピーするために-update
を使用します。
hadoop distcp -update -numListstatusThreads 20 \ s3a://history/2016 \ hdfs://nn1:8020/history/2016
オブジェクトストアはファイルのリスト作成が遅いので、大きなディレクトリツリーで-update
操作を実行する場合は、-numListstatusThreads
オプションを設定することを検討してください(制限は40スレッドです)。
オブジェクトストアでDistCp -update
を使用する場合、一般的に、個々のファイルの最終更新時刻と長さのみが比較され、2つのストア間のチェックサムアルゴリズムが異なる場合は、チェックサムは比較されません。
異なるチェックサムアルゴリズムを持つ2つのオブジェクトストア間のdistcp -update
は、ファイルコピーをスキップするかどうかを判断するために、ソースファイルとターゲットファイルの最終更新時刻とファイルサイズを比較します。この動作はプロパティdistcp.update.modification.time
によって制御され、デフォルトでtrueに設定されています。ソースファイルがターゲットファイルよりも最近変更されている場合、コンテンツが変更されたと見なされ、ファイルが更新される必要があります。マシン間にクロックスキューがないことを確認する必要があります。ほとんどのオブジェクトストアにはディレクトリの有効なタイムスタンプがあるという事実は無関係です。ファイルのタイムスタンプのみが比較されます。ただし、クライアントコンピュータのクロックをインフラストラクチャのクロックに近づけることが重要であり、これにより、クライアント/HDFSクラスタとオブジェクトストア間のタイムスタンプが一致するようになります。そうでない場合、変更されたファイルが不足したり、頻繁にコピーされたりする可能性があります。
distcp.update.modification.time
は、2つのストアのいずれかにチェックサム検証がない場合、つまり2つのストア間のチェックサム比較が互換性のない場合にのみ使用されます。プロパティがtrueに設定されている場合でも、2つのストア間に有効なチェックサム比較がある場合は使用されません。
最終更新時刻チェックをオフにするには、core-site.xmlでこれ を設定します。
<property> <name>distcp.update.modification.time</name> <value>false</value> </property>
備考
-atomic
オプションは一時データのリネームを引き起こすため、操作終了時のコミット時間が大幅に増加します。さらに、(オプションで)wasb://
以外のオブジェクトストアはディレクトリの原子的なリネームを提供しないため、-atomic
操作は実際には期待通りの動作をしません。使用を避けてください。
-append
オプションはサポートされていません。
-diff
およびrdiff
オプションはサポートされていません。
-skipCrc
フラグの値に関係なく、CRCチェックは実行されません。
パーミッション、ユーザーおよびグループ情報、属性チェックサム、レプリケーションを保持するための-p
オプションはすべて、一般的に無視されます。wasb://
コネクタは情報を保持しますが、パーミッションを強制適用しません。
一部のオブジェクトストアコネクタ(例:S3Aコネクタ)は、出力のメモリ内バッファリングオプションを提供しています。大規模なファイルをコピーする際にこのようなオプションを使用すると、ヒープオーバーフローやYARNコンテナの終了など、何らかのメモリ不足イベントが発生する可能性があります。これは、クラスタとオブジェクトストア間のネットワーク帯域幅が制限されている場合(リモートオブジェクトストアを使用する場合など)、特に一般的です。このようなオプションを無効化/回避し、ディスクバッファリングに依存することをお勧めします。
単一のオブジェクトストア内でのコピー操作は、オブジェクトストアが内部的により効率的なCOPY操作を実装している場合でも、Hadoopクラスタ内で実行されます。
つまり、次のような操作では
hadoop distcp s3a://bucket/datasets/set1 s3a://bucket/datasets/set2
各バイトがHadoopワーカーノードに転送され、その後バケットに戻されます。これは遅くなるだけでなく、料金が発生する可能性もあります。
-direct
オプションを使用すると、オブジェクトストアのターゲットパスに直接書き込むことができ、それ以外の場合は発生する可能性のある非常にコストのかかる一時ファイルのリネーム操作を回避できます。
なぜ-updateは既存のターゲットディレクトリ下に親ソースディレクトリを作成しないのですか? -update
と-overwrite
の動作については、このドキュメントの使用方法セクションで詳しく説明されています。簡単に言うと、いずれかのオプションを既存の宛先ディレクトリで使用した場合、ソースディレクトリ自体ではなく、各ソースディレクトリの内容がコピーされます。この動作は、従来のDistCp実装とも一貫しています。
新しいDistCpと従来のDistCpのセマンティクス上の違いは何ですか?
なぜ新しいDistCpは従来のDistCpよりも多くのマップを使用するのですか?従来のDistCpは、コピージョブが開始される前に実際にターゲットにコピーする必要があるファイルを確認し、コピーに必要な数のマップを起動することによって動作します。したがって、ほとんどのファイルが(既に存在するためなど)スキップされる必要がある場合、必要なマップは少なくなります。その結果、セットアップ(つまり、M/Rジョブの前)に費やす時間は長くなります。新しいDistCpは、ソースパスのコンテンツのみを計算します。スキップできるファイルをフィルターしようとしません。その決定は、M/Rジョブが実行されるまで延期されます。(実行時間に関して)はるかに高速ですが、起動されるマップの数は-m
オプションで指定した数、または指定されていない場合は20(デフォルト)になります。
なぜマップ数を増やしてもDistCpが高速にならないのですか?現在、DistCpの最小作業単位はファイルです。つまり、ファイルは1つのマップによってのみ処理されます。ファイル数を超える値にマップ数を増やしても、パフォーマンス上の利点は得られません。起動されるマップの数はファイル数と等しくなります。
なぜDistCpはメモリ不足になるのですか?ソースパスからコピーされる個々のファイル/ディレクトリの数が非常に多い場合(例:1,000,000パス)、DistCpはコピーするパスのリストを決定している間にメモリ不足になる可能性があります。これは新しいDistCp実装特有のものではありません。これを回避するには、次のように-Xmx
JVMヒープサイズパラメータを変更することを検討してください。
bash$ export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m" bash$ hadoop distcp /source /target