DistCpガイド



概要

DistCp(分散コピー)は、大規模なクラスタ内/クラスタ間コピーに使用されるツールです。MapReduceを使用して、分散処理、エラー処理と回復、およびレポートを実行します。ファイルとディレクトリのリストをマップタスクへの入力に展開し、それぞれがソースリストに指定されたファイルのパーティションをコピーします。

DistCpの以前の実装https://hadoop.dokyumento.jp/docs/r1.2.1/distcp.htmlには、使用方法、拡張性、パフォーマンスの両方において、独自の癖と欠点があります。DistCpのリファクタリングの目的は、これらの欠点を修正し、プログラムによって使用および拡張できるようにすることです。レガシーの動作をデフォルトとして維持しながら、実行時と設定のパフォーマンスを向上させるために、新しいパラダイムが導入されました。

このドキュメントでは、新しいDistCpの設計、その新しい機能、最適な使用方法、およびレガシー実装からのずれについて説明することを目的としています。

使用方法

基本的な使用方法

DistCpの最も一般的な呼び出しは、クラスタ間コピーです。

bash$ hadoop distcp hdfs://nn1:8020/foo/bar \
hdfs://nn2:8020/bar/foo

これにより、nn1上の/foo/barの下のネームスペースが一時ファイルに展開され、その内容がマップタスクのセットに分割され、各NodeManager上でnn1からnn2へのコピーが開始されます。

コマンドラインで複数のソースディレクトリを指定することもできます。

bash$ hadoop distcp hdfs://nn1:8020/foo/a \
hdfs://nn1:8020/foo/b \
hdfs://nn2:8020/bar/foo

または、同等に、-fオプションを使用してファイルから指定することもできます。

bash$ hadoop distcp -f hdfs://nn1:8020/srclist \
hdfs://nn2:8020/bar/foo

ここで、srclistには以下が含まれています。

hdfs://nn1:8020/foo/a
hdfs://nn1:8020/foo/b

複数のソースからコピーする場合、2つのソースが衝突すると、DistCpはエラーメッセージとともにコピーを中止しますが、宛先での衝突は、指定されたオプションに従って解決されます。デフォルトでは、宛先に既に存在するファイルはスキップされます(つまり、ソースファイルで置き換えられません)。各ジョブの最後にスキップされたファイル数が報告されますが、一部のファイルでコピアが失敗したが、後の試行で成功した場合、不正確になる可能性があります。

各NodeManagerがソースファイルシステムと宛先ファイルシステムの両方にアクセスして通信できることが重要です。HDFSの場合、ソースと宛先の両方で同じバージョンのプロトコルを実行するか、下位互換性のあるプロトコルを使用する必要があります。[バージョンの間のコピー](#Copying_Between_Versions_of_HDFS)を参照してください。

コピー後、ソースと宛先のリストを生成して相互にチェックし、コピーが実際に成功したことを確認することをお勧めします。DistCpはMap/ReduceとファイルシステムAPIの両方を使用するため、3つのいずれかまたはそれらの間の問題が、コピーに悪影響を与え、サイレントに影響を与える可能性があります。-updateを有効にして2回目のパスを実行することに成功した人もいますが、ユーザーは試行する前にその意味を理解する必要があります。

別のクライアントがまだソースファイルに書き込んでいる場合、コピーは失敗する可能性があることにも注意してください。宛先に書き込まれているファイルを上書きしようとすると、HDFSでも失敗する可能性があります。ソースファイルがコピーされる前に(再)移動されると、FileNotFoundExceptionでコピーが失敗します。

DistCpで使用可能なすべてのオプションの詳細については、コマンドラインリファレンスを参照してください。

更新と上書き

-updateは、ターゲットに存在しないか、ターゲットバージョンと異なるソースからのファイルをコピーするために使用されます。-overwriteは、ターゲットに存在するターゲットファイルを上書きします。

更新と上書きオプションは、ソースパスの処理方法がデフォルトとは微妙に異なるため、特別な注意が必要です。/source/first//source/second/から/target/へのコピーを検討してください。ソースパスには次の内容が含まれています。

hdfs://nn1:8020/source/first/1
hdfs://nn1:8020/source/first/2
hdfs://nn1:8020/source/second/10
hdfs://nn1:8020/source/second/20

-updateまたは-overwriteを指定せずにDistCpを呼び出すと、DistCpのデフォルトでは、/targetの下にfirst/second/ディレクトリが作成されます。したがって

distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target

は、/targetに次の内容を作成します。

hdfs://nn2:8020/target/first/1
hdfs://nn2:8020/target/first/2
hdfs://nn2:8020/target/second/10
hdfs://nn2:8020/target/second/20

-updateまたは-overwriteを指定すると、ソースディレクトリの**内容**がターゲットにコピーされ、ソースディレクトリ自体はコピーされません。したがって

distcp -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target

は、/targetに次の内容を作成します。

hdfs://nn2:8020/target/1
hdfs://nn2:8020/target/2
hdfs://nn2:8020/target/10
hdfs://nn2:8020/target/20

拡張として、両方のソースフォルダに同じ名前のファイル(たとえば、0)が含まれている場合、両方のソースは宛先の/target/0にエントリをマップします。この競合を許可するのではなく、DistCpは中止します。

次に、次のコピー操作を考えてみましょう。

distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target

ソース/サイズ:

hdfs://nn1:8020/source/first/1 32
hdfs://nn1:8020/source/first/2 32
hdfs://nn1:8020/source/second/10 64
hdfs://nn1:8020/source/second/20 32

宛先/サイズ:

hdfs://nn2:8020/target/1 32
hdfs://nn2:8020/target/10 32
hdfs://nn2:8020/target/20 64

は次のようになります。

hdfs://nn2:8020/target/1 32
hdfs://nn2:8020/target/2 32
hdfs://nn2:8020/target/10 64
hdfs://nn2:8020/target/20 32

ファイルの長さと内容が一致するため、1はスキップされます。2はターゲットに存在しないためコピーされます。内容がソースと一致しないため、1020は上書きされます。

-updateを使用する場合、ファイルの長さと内容が一致するため、1はスキップされます。2はターゲットに存在しないためコピーされます。内容がソースと一致しないため、1020は上書きされます。ただし、-appendも追加で使用されると、10のみが上書きされ(ソースの長さが宛先より短い)、20にはファイルの変更が追加されます(ファイルが宛先の元の長さまで一致する場合)。

-overwriteを使用すると、1も上書きされます。

同期

-diffオプションは、スナップショット差分を使用して、ソースクラスタからターゲットクラスタにファイルを同期します。スナップショット差分リスト内のファイルをコピー、名前変更、削除します。

-diffオプションを使用する場合は、-updateオプションを含める必要があります。

現時点では、ほとんどのクラウドプロバイダーは同期と適切に連携しません。

使用方法

hadoop distcp -update -diff <from_snapshot> <to_snapshot> <source> <destination>

hadoop distcp -update -diff snap1 snap2 /src/ /dst/

上記のコマンドは、/src/のsnapshot snap1からsnap2(つまり、snap1からsnap2のスナップショット差分)の変更を/dst/に適用します。明らかに、/src/にはsnap1snap2の両方のスナップショットが必要です。しかし、宛先/dst/にも、この場合はsnap1と同様に<from_snapshot>と同じ名前のスナップショットが必要です。宛先/dst/には、snap1以降に新しいファイル操作(作成、名前変更、削除)があってはなりません。このコマンドが完了すると、宛先/dst/に新しいスナップショットsnap2は**作成されません**。

-diffオプションを使用するには、-updateが必要です。

たとえば、/src/で、snap1の作成後、snap2の作成前に1.txtが追加され、2.txtが削除された場合、上記のコマンドは/src/から/dst/1.txtをコピーし、/dst/から2.txtを削除します。

同期の動作は、以下の実験を使用して詳しく説明します。

実験1:隣接する2つのスナップショットの差分の同期

開始前にいくつかの準備をします。

# Create source and destination directories
hdfs dfs -mkdir /src/ /dst/
# Allow snapshot on source
hdfs dfsadmin -allowSnapshot /src/
# Create a snapshot (empty one)
hdfs dfs -createSnapshot /src/ snap1
# Allow snapshot on destination
hdfs dfsadmin -allowSnapshot /dst/
# Create a from_snapshot with the same name
hdfs dfs -createSnapshot /dst/ snap1

# Put one text file under /src/
echo "This is the 1st text file." > 1.txt
hdfs dfs -put 1.txt /src/
# Create the second snapshot
hdfs dfs -createSnapshot /src/ snap2

# Put another text file under /src/
echo "This is the 2nd text file." > 2.txt
hdfs dfs -put 2.txt /src/
# Create the third snapshot
hdfs dfs -createSnapshot /src/ snap3

次に、distcp同期を実行します。

hadoop distcp -update -diff snap1 snap2 /src/ /dst/

上記のコマンドは成功するはずです。1.txt/src/から/dst/にコピーされます。繰り返しますが、-updateオプションが必要です。

同じコマンドをもう一度実行すると、DistCp sync failed例外が発生します。これは、snap1以降、宛先に新しいファイル1.txtが追加されたためです。つまり、/dst/から1.txtを手動で削除して同期を実行すると、コマンドは成功します。

実験2:隣接しない2つのスナップショット間の差分の同期

まず、実験1のクリーンアップを行います。

hdfs dfs -rm -skipTrash /dst/1.txt

同期コマンドを実行します。実験1のsnap2からsnap3に変更されていることに注意してください。

hadoop distcp -update -diff snap1 snap3 /src/ /dst/

1.txt2.txtの両方が/dst/にコピーされます。

実験3:ファイル削除操作の同期

実験2の終了時からの継続です。

hdfs dfs -rm -skipTrash /dst/2.txt
# Create snap2 at destination, it contains 1.txt
hdfs dfs -createSnapshot /dst/ snap2

# Delete 1.txt from source
hdfs dfs -rm -skipTrash /src/1.txt
# Create snap4 at source, it only contains 2.txt
hdfs dfs -createSnapshot /src/ snap4

ここで同期コマンドを実行します。

hadoop distcp -update -diff snap2 snap4 /src/ /dst/

2.txtがコピーされ、1.txt/dst/から削除されます。

/src//dst/の両方に同じ名前snap2のスナップショットがある場合でも、スナップショットの内容が同じである必要はありません。つまり、/dst/snap21.txtがあっても、内容が異なれば、1.txt/dst/から削除されます。同期コマンドは、削除しようとしているファイルの内容をチェックしません。単に<from_snapshot><to_snapshot>間のスナップショット差分リストに従います。

また、上記の手順で/dst/上でsnap2を作成する前に/dst/から1.txtを削除し、同期コマンドを実行する前に/dst/snap21.txtが存在しないようにした場合でも、コマンドは成功します。存在しない/dst/から1.txtを削除しようとする際に例外は発生しません。

raw名前空間拡張属性の保持

このセクションはHDFSのみに適用されます。

ターゲットとすべてのソースパス名が/.reserved/raw階層内にある場合、「raw」名前空間拡張属性は保持されます。「raw」xattrは、暗号化メタデータなどの内部関数でシステムによって使用されます。それらは、/.reserved/raw階層を介してアクセスした場合にのみユーザーに表示されます。

raw xattrは、/.reserved/rawプレフィックスが提供されているかどうかに基づいてのみ保持されます。-p(保持、下記参照)フラグは、raw xattrの保持に影響しません。

raw xattrの保持を防ぐには、ソースパスとターゲットパスのいずれにも/.reserved/rawプレフィックスを使用しないようにしてください。

ソースパスとターゲットパスのサブセットのみに/.reserved/rawプレフィックスが指定されている場合、エラーが表示され、0以外の終了コードが返されます。

コマンドラインオプション

フラグ 説明 備考
-p[rbugpcaxt] 保持 r: レプリケーション数 b: ブロックサイズ u: ユーザー g: グループ p: 権限 c: チェックサムタイプ a: ACL x: XAttr t: タイムスタンプ -updateが指定されている場合、ファイルサイズも異なる場合(つまり、ファイルが再作成された場合)を除き、ステータス更新は同期されません。-paが指定されている場合、DistCpはACLが権限のスーパーセットであるため、権限も保持します。オプション-prは、ソースディレクトリとターゲットディレクトリの両方がイレージャーコード化されていない場合にのみ有効です。
-i エラーを無視する 付録で説明されているように、このオプションを使用すると、デフォルトの場合よりも正確なコピーに関する統計情報を保持できます。また、失敗したコピーからのログも保持され、デバッグに役立ちます。最後に、失敗したマップは、すべての分割が試行される前にジョブが失敗する原因にはなりません。
-log <logdir> <logdir>にログを書き込む DistCpは、コピーしようとした各ファイルのログをマップ出力として保持します。マップが失敗した場合、再実行された場合はログ出力は保持されません。
-v SKIP/COPYログに追加情報(パス、サイズ)を記録する このオプションは-logオプションと組み合わせてのみ使用できます。
-m <num_maps> 同時コピーの最大数 データをコピーするマップの数を指定します。マップを増やすとスループットが向上するとは限りません。
-overwrite 宛先を上書きする マップが失敗し、-iが指定されていない場合、失敗したファイルだけでなく、分割内のすべてのファイルが再コピーされます。使用方法のドキュメントで説明されているように、宛先パスの生成に関するセマンティクスも変更されるため、ユーザーは慎重に使用する必要があります。
-update ソースと宛先のサイズ、ブロックサイズ、またはチェックサムが異なる場合に上書きする 前述のように、これは「同期」操作ではありません。調査される基準は、ソースファイルと宛先ファイルのサイズ、ブロックサイズ、チェックサムです。それらが異なる場合、ソースファイルが宛先ファイルを置き換えます。使用方法のドキュメントで説明されているように、宛先パスの生成に関するセマンティクスも変更されるため、ユーザーは慎重に使用する必要があります。
-append 名前が同じで長さが異なるファイルの増分コピー ソースファイルの長さが宛先ファイルよりも長い場合、共通の長さの部分のチェックサムが比較されます。チェックサムが一致する場合、読み取りと追加の機能を使用して、差分のみがコピーされます。-appendオプションは、-skipcrccheckなしの-updateと組み合わせてのみ機能します。
-f <urilist_uri> <urilist_uri>のリストをソースリストとして使用する これは、コマンドラインで各ソースをリストすることと同じです。urilist_uriリストは、完全修飾URIである必要があります。
-filters パターン文字列のリストを含むファイルへのパス(1行に1つの文字列)。パターンに一致するパスはコピーから除外されます。 java.util.regex.Patternで指定された正規表現をサポートします。
-filelimit <n> ファイルの総数を≦nに制限する 非推奨!新しいDistCpでは無視されます。
-sizelimit <n> 合計サイズを≦nバイトに制限する 非推奨!新しいDistCpでは無視されます。
-delete 宛先に存在するがソースに存在しないファイルを削除する 削除はFS Shellによって実行されます。そのため、有効になっている場合はゴミ箱が使用されます。削除は、updateまたはoverwriteオプションでのみ適用できます。
-strategy {dynamic|uniformsize} DistCpで使用されるコピー戦略を選択する デフォルトでは、uniformsizeが使用されます。(つまり、マップは各マップによってコピーされたファイルの合計サイズでバランスが取られます。レガシーと同様です)「dynamic」が指定されている場合、代わりにDynamicInputFormatが使用されます。(これは、「InputFormats」の下のアーキテクチャセクションで説明されています。)
-bandwidth マップごとの帯域幅をMB/秒で指定する 各マップは、指定された帯域幅のみを使用するように制限されます。これは常に正確ではありません。マップはコピー中に帯域幅消費量を抑制するため、使用される**ネット**帯域幅は指定値に近づきます。
-atomic {-tmp <tmp_dir>} オプションのテンポラリディレクトリを使用して、アトミックコミットを指定する -atomicは、DistCpにソースデータを一時的なターゲット場所にコピーしてから、一時的なターゲットを最終的な場所にアトミックに移動するように指示します。データは、完全かつ一貫した形式で最終的なターゲットで使用可能になるか、まったく使用可能になりません。オプションとして、-tmpを使用して一時的なターゲットの場所を指定できます。指定されていない場合、デフォルトが選択されます。**注記:**tmp_dirは最終的なターゲットクラスタ上に存在する必要があります。
-async DistCpを非同期で実行する。Hadoopジョブが起動されるとすぐに終了する。 追跡のためにHadoopジョブIDがログに記録されます。
-diff <oldSnapshot> <newSnapshot> 指定された2つのスナップショット間のスナップショット差分レポートを使用して、ソースとターゲットの違いを特定し、その差分をターゲットに適用してソースと同期状態にします。 このオプションは-updateオプションでのみ有効であり、次の条件を満たす必要があります。
  1. ソースファイルシステムとターゲットファイルシステムの両方がDistributedFileSystemである必要があります。
  2. ソースFS上に2つのスナップショット<oldSnapshot><newSnapshot>が作成されており、<oldSnapshot><newSnapshot>よりも古いものです。
  3. ターゲットには、同じスナップショット<oldSnapshot>があります。<oldSnapshot>が作成されてからターゲットに変更は加えられていないため、<oldSnapshot>の内容はターゲットの現在の状態と同じです。ターゲット内のすべてのファイル/ディレクトリは、ソースの<oldSnapshot>と同じです。
-rdiff <newSnapshot> <oldSnapshot> 指定された2つのスナップショット間のスナップショット差分レポートを使用して、ターゲットのスナップショット<oldSnapshot>が作成されてからターゲットで何が変更されたかを特定し、その差分をターゲットに逆方向に適用し、ソースの<oldSnapshot>から変更されたファイルをコピーして、ターゲットを<oldSnapshot>と同じにします。 このオプションは-updateオプションでのみ有効であり、次の条件を満たす必要があります。
  1. ソースファイルシステムとターゲットファイルシステムの両方がDistributedFileSystemである必要があります。ソースとターゲットは、2つの異なるクラスタ/パスにすることも、まったく同じクラスタ/パスにすることもできます。後者の場合、変更されたファイルはターゲットの<oldSnapshot>からターゲットの現在の状態にコピーされます。
  2. ターゲットFS上に2つのスナップショット<newSnapshot><oldSnapshot>が作成されており、<oldSnapshot><newSnapshot>よりも古いものです。ターゲットの<newSnapshot>が作成されてからターゲットに変更は加えられていません。
  3. ソースには、ターゲット上の<oldSnapshot>と同じ内容を持つ同じスナップショット<oldSnapshot>があります。ターゲットの<oldSnapshot>内のすべてのファイル/ディレクトリは、ソースの<oldSnapshot>と同じです。
-numListstatusThreads ファイルリストの作成に使用するスレッド数 最大40スレッド。
-skipcrccheck ソースパスとターゲットパスの間のCRCチェックをスキップするかどうか。
-blocksperchunk <blocksperchunk> チャンクあたりのブロック数。指定すると、ファイルをチャンクに分割して並列にコピーします。 正の値に設定すると、これより多くのブロックを持つファイルは<blocksperchunk>ブロックのチャンクに分割され、並列で転送され、宛先で再構成されます。デフォルトでは、<blocksperchunk>は0であり、ファイルは分割せずに全体として送信されます。このスイッチは、ソースファイルシステムがgetBlockLocationsメソッドを実装し、ターゲットファイルシステムがconcatメソッドを実装する場合にのみ適用されます。
-copybuffersize <copybuffersize> 使用するコピーバッファのサイズ。デフォルトでは、<copybuffersize>は8192Bに設定されます。
-xtrack <path> 欠損しているソースファイルに関する情報を指定されたパスに保存します。 このオプションは-updateオプションでのみ有効です。これは実験的なプロパティであり、-atomicオプションと組み合わせて使用することはできません。
-direct 宛先パスに直接書き込む 宛先がオブジェクトストレージである場合、非常に高価になる可能性のある一時ファイルの名前変更操作を回避するのに役立ちます。
-useiterator リストの作成にシングルスレッドのlistStatusIteratorを使用する クライアント側のメモリ節約に役立ちます。このオプションを使用すると、numListstatusThreadsオプションは無視されます。

DistCpのアーキテクチャ

新しいDistCpのコンポーネントは、次のカテゴリに分類できます。

  • DistCpドライバ
  • コピーリストジェネレータ
  • 入力形式とMapReduceコンポーネント

DistCpドライバ

DistCpドライバコンポーネントは、次の役割を担います。

  • コマンドラインでDistCpコマンドに渡された引数を、

    • OptionsParserを介して、および
    • DistCpOptionsSwitchを介して解析します。
  • コマンド引数を適切なDistCpOptionsオブジェクトに組み立て、DistCpを初期化します。これらの引数には、以下が含まれます。

    • ソースパス
    • ターゲット場所
    • コピーオプション(更新コピー、上書き、保持するファイル属性など)
  • コピー操作のオーケストレーション

    • コピー対象ファイル一覧を生成するためのコピー一覧ジェネレータの呼び出し。
    • コピーを実行するためのHadoop Map-Reduceジョブの設定と起動。
    • オプションに基づき、Hadoop MRジョブへのハンドルをすぐに返すか、完了まで待機するかを選択。

パーサー要素は、コマンドラインからのみ実行されます(またはDistCp::run()が呼び出された場合)。DistCpクラスは、DistCpOptionsオブジェクトを構築し、DistCpオブジェクトを適切に初期化することで、プログラム的にも使用できます。

コピー一覧ジェネレータ

コピー一覧ジェネレータクラスは、ソースからコピーするファイル/ディレクトリのリストを作成する役割を担います。ソースパス(ワイルドカードを含むファイル/ディレクトリ)の内容を調べ、DistCp Hadoopジョブで使用するために、コピーが必要なすべてのパスをSequenceFileに記録します。このモジュールにおける主なクラスには以下が含まれます。

  1. CopyListing:コピー一覧ジェネレータ実装によって実装されるべきインターフェース。具体的なCopyListing実装を選択するためのファクトリメソッドも提供します。
  2. SimpleCopyListing:複数のソースパス(ファイル/ディレクトリ)を受け入れ、それぞれの下にある個々のファイルとディレクトリを再帰的にリストしてコピーするためのCopyListingの実装。
  3. GlobbedCopyListing:ソースパス内のワイルドカードを展開するCopyListingの別の実装。
  4. FileBasedCopyListing:指定されたファイルからソースパスリストを読み取るCopyListingの実装。

DistCpOptionsでソースファイルリストが指定されているかどうかに基づいて、ソースリストは次のいずれかの方法で生成されます。

  1. ソースファイルリストがない場合、GlobbedCopyListingが使用されます。すべてのワイルドカードが展開され、すべての展開がSimpleCopyListingに転送され、SimpleCopyListingは順番に(各パスの再帰的下降によって)リストを構築します。
  2. ソースファイルリストが指定されている場合、FileBasedCopyListingが使用されます。ソースパスは指定されたファイルから読み取られ、GlobbedCopyListingに転送されます。その後、上記のようにリストが構築されます。

CopyListingインターフェースのカスタム実装を提供することで、コピーリストの構築方法をカスタマイズできます。パスのコピーの考慮方法において、DistCpの動作は従来のDistCpとは異なります。

コピーすべきではないファイルのフィルタリングをカスタマイズするには、サポートされているCopyFilterインターフェースの現在の実装を渡すか、新しい実装を作成できます。これは、DistCpOptionsでdistcp.filters.classを設定することで指定できます。

  1. distcp.filters.classを"RegexCopyFilter"に設定します。この実装を使用する場合は、フィルタリングに使用される正規表現を含む"CopyFilter" distcp.filters.fileも渡す必要があります。java.util.regex.Patternで指定された正規表現をサポートします。
  2. distcp.filters.classを"RegexpInConfigurationFilter"に設定します。"DistCpOptions"でdistcp.exclude-file-regexパラメータも使用して正規表現を渡す必要があります。java.util.regex.Patternで指定された正規表現をサポートします。"RegexCopyFilter"と比較して、より動的なアプローチです。
  3. distcp.filters.classを"TrueCopyFilter"に設定します。上記いずれのオプションも指定されていない場合、デフォルトの実装として使用されます。

従来の実装では、ターゲットに確実にコピーする必要があるパスのみがリストされます。たとえば、ファイルが既にターゲットに存在する場合(そして-overwriteが指定されていない場合)、そのファイルはMapReduceコピージョブでは考慮されません。設定時に(つまり、MapReduceジョブの前に)これを決定するには、ファイルサイズとチェックサムの比較が必要であり、時間がかかる可能性があります。

新しいDistCpは、MapReduceジョブまでこのようなチェックを延期するため、設定時間が短縮されます。これらのチェックが複数のマップに並列化されるため、パフォーマンスがさらに向上します。

InputFormatsとMapReduceコンポーネント

InputFormatsとMapReduceコンポーネントは、ファイルとディレクトリのソースパスから宛先パスへの実際のコピーを担当します。コピー一覧生成中に作成された一覧ファイルは、コピーの実行時にこの時点で消費されます。ここで重要なクラスには以下が含まれます。

  • UniformSizeInputFormat:org.apache.hadoop.mapreduce.InputFormatの実装により、マップ間の負荷分散において従来のDistCpと同等の機能を提供します。UniformSizeInputFormatの目的は、各マップがほぼ同じバイト数のファイルをコピーすることです。適切に、一覧ファイルはパスグループに分割され、各InputSplit内のファイルサイズの合計が他のすべてのマップとほぼ等しくなります。分割が常に完璧というわけではありませんが、その単純な実装により、設定時間が短縮されます。

  • DynamicInputFormatとDynamicRecordReader:DynamicInputFormatはorg.apache.hadoop.mapreduce.InputFormatを実装し、DistCpの新機能です。一覧ファイルはいくつかの「チャンクファイル」に分割され、チャンクファイルの正確な数は、Hadoopジョブで要求されたマップ数の倍数になります。各マップタスクは、ジョブが起動される前に、チャンクの1つを(チャンクをタスクのIDに名前変更することで)「割り当て」られます。パスはDynamicRecordReaderを使用して各チャンクから読み取られ、CopyMapperで処理されます。チャンク内のすべてのパスが処理されると、現在のチャンクは削除され、新しいチャンクが取得されます。チャンクがなくなるまで、このプロセスが継続されます。この「動的」なアプローチにより、高速なマップタスクは遅いタスクよりも多くのパスを消費できるため、DistCpジョブ全体の速度が向上します。

  • CopyMapper:このクラスは、物理的なファイルコピーを実装します。入力パスは、(ジョブの設定で指定された)入力オプションに対してチェックされ、ファイルをコピーする必要があるかどうかが決定されます。ファイルは、次のいずれかがtrueの場合にのみコピーされます。

    • 同じ名前のファイルがターゲットに存在しない。
    • 同じ名前のファイルがターゲットに存在するが、ファイルサイズが異なる。
    • 同じ名前のファイルがターゲットに存在するが、チェックサムが異なり、-skipcrccheckが指定されていない。
    • 同じ名前のファイルがターゲットに存在するが、-overwriteが指定されている。
    • 同じ名前のファイルがターゲットに存在するが、ブロックサイズが異なり(そしてブロックサイズを保持する必要がある)。
  • CopyCommitter:このクラスは、DistCpジョブのコミットフェーズを担当し、以下を含みます。

    • ディレクトリパーミッションの保持(オプションで指定されている場合)
    • 一時ファイル、作業ディレクトリなどのクリーンアップ。

付録

マップのサイズ設定

デフォルトでは、DistCpは各マップがほぼ同じバイト数をコピーするように、各マップのサイズを同等にする試みを行います。ファイルは最も細かい粒度であるため、同時コピアー(つまりマップ)の数を増やしても、同時コピーの数や全体のスループットが常に増加するとは限りません。

新しいDistCpは、高速なデータノードが遅いノードよりも多くのバイトをコピーできるように、マップサイズを「動的に」調整する戦略も提供します。-strategy dynamic(アーキテクチャで説明)を使用すると、各マップタスクに固定されたソースファイルセットを割り当てるのではなく、ファイルはいくつかのセットに分割されます。セットの数は、通常、マップ数の2〜3倍になります。各マップは、チャンクにリストされているすべてのファイルを取得してコピーします。チャンクを使い果たすと、新しいチャンクが取得され処理され、チャンクがなくなるまで続きます。

ソースパスを固定されたマップに割り当てないことで、高速なマップタスク(つまりデータノード)はより多くのチャンクを消費し、したがって遅いノードよりも多くのデータをコピーできます。この分散は均一ではありませんが、各マッパーの容量に関して公平です。

動的戦略は、DynamicInputFormatによって実装されます。ほとんどの状況で優れたパフォーマンスを提供します。

長時間実行されるジョブや定期的に実行されるジョブの場合、ソースクラスタと宛先クラスタのサイズ、コピーのサイズ、利用可能な帯域幅に合わせてマップ数を調整することをお勧めします。

HDFSのバージョンの間でのコピー

Hadoopの異なるメジャーバージョン間(例:1.Xと2.Xの間)のコピーを行うには、通常、WebHdfsFileSystemを使用します。以前のHftpFileSystemとは異なり、webhdfsは読み取りと書き込みの両方の操作で使用できるため、DistCpはソースクラスタと宛先クラスタの両方で実行できます。リモートクラスタはwebhdfs://<namenode_hostname>:<http_port>として指定します。Hadoopクラスタの同じメジャーバージョン間(例:2.Xと2.Xの間)のコピーを行う場合は、パフォーマンス向上のためにhdfsプロトコルを使用します。

distcpを使用したワイヤ経由のセキュアコピー

webhdfsがSSLで保護されている場合は、「swebhdfs://」スキームを使用します。詳細については、SWebHDFSのSSL設定を参照してください。

MapReduceおよびその他の副作用

前述のように、マップが入力データの1つのコピーに失敗した場合、いくつかの副作用が発生します。

  • -overwriteが指定されていない限り、再実行時に前のマップによって正常にコピーされたファイルは「スキップ済み」としてマークされます。
  • マップがmapreduce.map.maxattempts回失敗した場合、残りのマップタスクはキルされます(-iが設定されていない限り)。
  • mapreduce.map.speculativeがfinalおよびtrueに設定されている場合、コピーの結果は未定義です。

DistCpとオブジェクトストア

DistCpは、Amazon S3、Azure ABFS、Google GCSなどのオブジェクトストアと連携します。

前提条件

  1. オブジェクトストアの実装を含むJARは、すべての依存関係とともにクラスパス上にあります。
  2. JARがバンドルされたファイルシステムクライアントを自動的に登録しない限り、ファイルシステムスキーマを実装するクラスを指定するように設定を変更する必要がある場合があります。ASF独自のオブジェクトストアクライアントはすべて自己登録されます。
  3. 関連するオブジェクトストアアクセス資格情報は、クラスタ設定で使用可能であるか、またはその他の方法ですべてのクラスタホストで使用可能である必要があります。

DistCpを使用してデータをアップロードできます。

hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1

データをダウンロードするには

hadoop distcp s3a://bucket/generated/results hdfs://nn1:8020/results

オブジェクトストア間でデータをコピーするには

hadoop distcp s3a://bucket/generated/results \
  wasb://updates@example.blob.core.windows.net

オブジェクトストア内でデータをコピーするには

hadoop distcp wasb://updates@example.blob.core.windows.net/current \
  wasb://updates@example.blob.core.windows.net/old

そして、変更されたファイルのみをコピーするために-updateを使用します。

hadoop distcp -update -numListstatusThreads 20  \
  s3a://history/2016 \
  hdfs://nn1:8020/history/2016

オブジェクトストアはファイルのリスト作成が遅いので、大きなディレクトリツリーで-update操作を実行する場合は、-numListstatusThreadsオプションを設定することを検討してください(制限は40スレッドです)。

オブジェクトストアでDistCp -updateを使用する場合、一般的に、個々のファイルの最終更新時刻と長さのみが比較され、2つのストア間のチェックサムアルゴリズムが異なる場合は、チェックサムは比較されません。

  • 異なるチェックサムアルゴリズムを持つ2つのオブジェクトストア間のdistcp -updateは、ファイルコピーをスキップするかどうかを判断するために、ソースファイルとターゲットファイルの最終更新時刻とファイルサイズを比較します。この動作はプロパティdistcp.update.modification.timeによって制御され、デフォルトでtrueに設定されています。ソースファイルがターゲットファイルよりも最近変更されている場合、コンテンツが変更されたと見なされ、ファイルが更新される必要があります。マシン間にクロックスキューがないことを確認する必要があります。ほとんどのオブジェクトストアにはディレクトリの有効なタイムスタンプがあるという事実は無関係です。ファイルのタイムスタンプのみが比較されます。ただし、クライアントコンピュータのクロックをインフラストラクチャのクロックに近づけることが重要であり、これにより、クライアント/HDFSクラスタとオブジェクトストア間のタイムスタンプが一致するようになります。そうでない場合、変更されたファイルが不足したり、頻繁にコピーされたりする可能性があります。

  • distcp.update.modification.timeは、2つのストアのいずれかにチェックサム検証がない場合、つまり2つのストア間のチェックサム比較が互換性のない場合にのみ使用されます。プロパティがtrueに設定されている場合でも、2つのストア間に有効なチェックサム比較がある場合は使用されません。

最終更新時刻チェックをオフにするには、core-site.xmlでこれ を設定します。

<property>
        <name>distcp.update.modification.time</name>
        <value>false</value>
</property>

備考

  • -atomicオプションは一時データのリネームを引き起こすため、操作終了時のコミット時間が大幅に増加します。さらに、(オプションで)wasb://以外のオブジェクトストアはディレクトリの原子的なリネームを提供しないため、-atomic操作は実際には期待通りの動作をしません。使用を避けてください

  • -appendオプションはサポートされていません。

  • -diffおよびrdiffオプションはサポートされていません。

  • -skipCrcフラグの値に関係なく、CRCチェックは実行されません。

  • パーミッション、ユーザーおよびグループ情報、属性チェックサム、レプリケーションを保持するための-pオプションはすべて、一般的に無視されます。wasb://コネクタは情報を保持しますが、パーミッションを強制適用しません。

  • 一部のオブジェクトストアコネクタ(例:S3Aコネクタ)は、出力のメモリ内バッファリングオプションを提供しています。大規模なファイルをコピーする際にこのようなオプションを使用すると、ヒープオーバーフローやYARNコンテナの終了など、何らかのメモリ不足イベントが発生する可能性があります。これは、クラスタとオブジェクトストア間のネットワーク帯域幅が制限されている場合(リモートオブジェクトストアを使用する場合など)、特に一般的です。このようなオプションを無効化/回避し、ディスクバッファリングに依存することをお勧めします。

  • 単一のオブジェクトストア内でのコピー操作は、オブジェクトストアが内部的により効率的なCOPY操作を実装している場合でも、Hadoopクラスタ内で実行されます。

    つまり、次のような操作では

    hadoop distcp s3a://bucket/datasets/set1 s3a://bucket/datasets/set2

    各バイトがHadoopワーカーノードに転送され、その後バケットに戻されます。これは遅くなるだけでなく、料金が発生する可能性もあります。

  • -directオプションを使用すると、オブジェクトストアのターゲットパスに直接書き込むことができ、それ以外の場合は発生する可能性のある非常にコストのかかる一時ファイルのリネーム操作を回避できます。

よくある質問

  1. なぜ-updateは既存のターゲットディレクトリ下に親ソースディレクトリを作成しないのですか? -update-overwriteの動作については、このドキュメントの使用方法セクションで詳しく説明されています。簡単に言うと、いずれかのオプションを既存の宛先ディレクトリで使用した場合、ソースディレクトリ自体ではなく、各ソースディレクトリの内容がコピーされます。この動作は、従来のDistCp実装とも一貫しています。

  2. 新しいDistCpと従来のDistCpのセマンティクス上の違いは何ですか?

    • 従来のDistCpを使用してコピーされた場合、コピー中にスキップされたファイルのファイル属性(パーミッション、所有者/グループ情報など)も変更されませんでした。これらは、ファイルコピーがスキップされた場合でも、現在更新されます。
    • 従来のDistCpでは、ソースパスの入力にある空のルートディレクトリはターゲットに作成されませんでした。これらは現在作成されます。
  3. なぜ新しいDistCpは従来のDistCpよりも多くのマップを使用するのですか?従来のDistCpは、コピージョブが開始される前に実際にターゲットにコピーする必要があるファイルを確認し、コピーに必要な数のマップを起動することによって動作します。したがって、ほとんどのファイルが(既に存在するためなど)スキップされる必要がある場合、必要なマップは少なくなります。その結果、セットアップ(つまり、M/Rジョブの前)に費やす時間は長くなります。新しいDistCpは、ソースパスのコンテンツのみを計算します。スキップできるファイルをフィルターしようとしません。その決定は、M/Rジョブが実行されるまで延期されます。(実行時間に関して)はるかに高速ですが、起動されるマップの数は-mオプションで指定した数、または指定されていない場合は20(デフォルト)になります。

  4. なぜマップ数を増やしてもDistCpが高速にならないのですか?現在、DistCpの最小作業単位はファイルです。つまり、ファイルは1つのマップによってのみ処理されます。ファイル数を超える値にマップ数を増やしても、パフォーマンス上の利点は得られません。起動されるマップの数はファイル数と等しくなります。

  5. なぜDistCpはメモリ不足になるのですか?ソースパスからコピーされる個々のファイル/ディレクトリの数が非常に多い場合(例:1,000,000パス)、DistCpはコピーするパスのリストを決定している間にメモリ不足になる可能性があります。これは新しいDistCp実装特有のものではありません。これを回避するには、次のように-Xmx JVMヒープサイズパラメータを変更することを検討してください。

     bash$ export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m"
     bash$ hadoop distcp /source /target