このドキュメントでは、Manifest Committerのアーキテクチャおよびその他の実装/正確性に関する側面について説明します。
プロトコルとその正確性については、Manifest Committer プロトコルで説明されています。
マニフェストコミッターは、ABFS での「実世界」クエリのパフォーマンス、および GCS でのパフォーマンスと正確性を提供するコミッターです。
このコミッターは、S3A コミッター用に導入された拡張ポイントを使用します。ユーザーは、abfs://
および gcs://
URL 用の新しいコミッターファクトリーを宣言できます。Hadoop MapReduce および Apache Spark で使用できます。
用語 | 意味 |
---|---|
コミッター | タスクおよびジョブコミット操作を実行するために MR/Spark によって呼び出すことができるクラス。 |
Spark ドライバー | 作業をスケジュールし、コミット操作を調整する Spark プロセス。 |
ジョブ | MapReduce では、アプリケーション全体。Spark では、これは一連の作業における単一のステージです。 |
ジョブ試行 | ジョブの単一の試行。MR は、部分的なジョブの失敗からの回復により、複数のジョブ試行をサポートします。Spark は「最初からやり直す」と言います。 |
タスク | 1 つのファイル、またはファイルの一部を処理するなど、ジョブのサブセクション |
タスク ID | このジョブ内で一意のタスクの ID。通常は 0 から始まり、ファイル名 (part-0000, part-001 など) で使用されます。 |
タスク試行 (TA) | タスクを実行する試行。失敗する可能性があり、その場合、MR/spark は別のタスクをスケジュールします。 |
タスク試行 ID | タスク試行の一意の ID。タスク ID + 試行カウンター。 |
宛先ディレクトリ | 作業の最終的な宛先。 |
ジョブ試行ディレクトリ | ジョブ試行で使用される一時ディレクトリ。これは常に宛先ディレクトリの下にあり、HDFS と同じ暗号化ゾーン、他のファイルシステムのストレージボリュームなどにあることを保証します。 |
タスク試行ディレクトリ | (「タスク試行作業ディレクトリ」とも呼ばれます)。ファイルが書き込まれる各タスク試行専用のディレクトリ |
タスクコミット | タスク試行の出力を取得し、その「成功した」タスクの最終的な/排他的な結果にすること。 |
ジョブコミット | コミットされたすべてのタスクのすべての出力を集約し、ジョブの最終結果を生成すること。 |
コミッターの目的は、タスクの失敗が発生した場合でも、ジョブの完全な出力が宛先に確実に存在するようにすることです。
Hive の従来の階層型ディレクトリ構造テーブルの場合、ジョブコミットでは、コミットされたすべてのタスクの出力をディレクトリツリー内の正しい場所に配置する必要があります。
hadoop-mapreduce-client-core
モジュールに組み込まれているコミッターは、FileOutputCommitter
です。
Manifest Committer は、多くのタスクを通じて深いディレクトリツリー全体にファイルを作成するジョブ用の、ABFS および GCS ストレージ用の高性能コミッターです。
hdfs://
および実際には file://
URL でも機能しますが、クラウドストレージでのリストと名前変更のパフォーマンスおよびスロットリングの問題に対処するために最適化されています。
マニフェストファイルをコミットするためにアトミックな名前変更-上書きなし操作に依存しているため、S3 では正しく機能しません。また、生成されたすべてのデータを移動するのではなくコピーするというパフォーマンスの問題もあります。
MapReduce で動作しますが、以前の失敗した試行からの回復を伴う複数のジョブ試行の処理はありません。
マニフェストファイルは、(IOStatistics やその他のものとともに) 次のものを含むように設計されています。
タスク試行は、次のようにコミットされます。
名前変更は行われません。ファイルは元の場所に残されます。
ディレクトリツリーウォークはシングルスレッドであり、O(ディレクトリ)
であり、各ディレクトリリストには 1 つ以上のページングされた LIST 呼び出しが使用されます。
これは単純であり、ほとんどのタスクでは、スキャンはジョブのクリティカルパスから外れています。
統計分析により、将来的に並列スキャンに移行することが正当化される可能性があります。
ジョブコミットは、次のもので構成されます。
_SUCCESS
ファイルを保存します (テスト用、アトミック保存には書き込みと名前変更を使用します)。ジョブコミットフェーズは、多くのタスクとタスクごとの多くのファイルに対して並列化をサポートします。具体的には、
O(ファイル)
であり、OAuth 認証を使用する場合は ABFS でも同様であるため、これは重要です。必要に応じて、すべての上位をスキャンします...いずれかがファイルの場合は、削除します。
getFileStatus()
を呼び出します。見つからない場合: ディレクトリを作成し、エントリとすべての親パスのエントリを追加します。見つかってディレクトリの場合: エントリとすべての親パスのエントリを追加します。見つかってファイルの場合: 削除します。次に、前と同じように作成します。ディレクトリの同時作成 (または削除+作成) を効率的に処理することは、問題のある箇所になります。作成するディレクトリのセットを構築するために、ある程度の労力が費やされています。
ファイルは並行してリネームされます。
そのパスに何かが存在するかどうかの事前リネームチェック(および削除)はオプションになります。Sparkが各ファイルに新しいUUIDを作成するため、これは起こることはなく、HTTPリクエストを節約できます。
コミットされたすべてのファイルのオプションのスキャンを行い、長さと、既知の場合はetagを検証します。テストと診断用です。
このソリューションはGCSに必要であり、ABFSでもタスクコミッターでリストのオーバーヘッドが支払われるため有益であるはずです。
重要な目標は、マニフェストコミッターを隔離し、既存のコミッターコードやhadoopコードベースの他の部分に触れないようにすることです。
S3Aコミッター用に既に実装されている変更以外に、MRおよびSparkに直接プラグインする必要があります。
PathOutputCommitterFactory
を介してバインドする必要があります。この結果、org.apache.hadoop.util.functional.TaskPool
はS3ACommitterのorg.apache.hadoop.fs.s3a.commit.Tasks
に基づいているなど、他の場所から少しコピーアンドペーストされています。
_SUCCESS
ファイルは、S3A JSONファイルと互換性がある必要があります。これは、S3Aコミッターの出力を検証する既存のテストスイートが、変更なしにマニフェストコミッターによって実行されるジョブを再ターゲットできるようにするためです。
いつ?提案:リネームが最終的に完了するまでハートビートします。
ジョブコミット全体を停止する必要があります。各タスクコミッターのスレッドがディレクトリを反復処理する際(または各ファイルを処理する際)に、アトミックなブール値「ジョブの中断」をチェックする必要があります。リストまたはリネームの失敗は、ジョブコミット全体の停止にエスカレーションする必要があります。これは、非同期リネーム操作またはタスクコミッタースレッドで発生したIOEが、次のことを行う必要があることを意味します。
commitJob()
呼び出しの最後に再スローされるジョブコミットステージがタスクごとの操作(ファイルのロードなど)にスレッドプールを使用している場合、同じスレッドプールをタスクごとのステージ内の並列操作に使用してはなりません。
すべてのJobStage
はタスクまたはジョブコミット内で順次実行されるため、ステージ間で同じスレッドプールを共有しても安全です。
現在の実装では、ジョブコミットにおいてファイルを実際にロードする以外に、並列の「マニフェストごと」の操作はありません。ディレクトリを作成し、ファイルの名前を変更する操作は、実際には個々のマニフェストの並列処理を実行せずに実行されます。
ディレクトリ準備:すべてのマニフェストのディレクトリリストをマージし、一意のディレクトリの(うまくいけば非常に小さい)セットの作成をキューに入れます。
リネーム:すべてのマニフェストを反復処理し、リネームのためにそれらのリネームをプールに入れます。
スレッドプールの寿命は、ステージ構成の寿命に制限されます。これは、セットアップ、コミット、中断、クリーンアップの各PathOutputCommitter
メソッド内に制限されます。
これにより、S3Aコミッターのスレッドプールライフサイクルの問題が回避されます。
これは、多くのタスクがそれぞれ多くのファイルを生成したテラソートでの失敗でした。コミットするファイルの完全なリスト(およびすべてのブロックのetag)がメモリ内に構築され、実行前に検証されました。
マニフェストコミッターは、メモリに保存されるデータ量が少ないと想定しています。これは、コミットされるすべてのファイルのすべてのブロックのetagを保存する必要がなくなったためです。
作成するディレクトリのすべてのリストを結合し、重複を削除します。
実装アーキテクチャは、S3Aコネクターからの教訓を反映しています。
コミッターは、ファイルシステムに対して実行/呼び出すすべての操作の実行時間の統計を収集します。*タスクコミット中に収集されたものは、マニフェストに保存されます(そのファイルの保存とリネームにかかる時間を除く)。*これらのマニフェストがジョブコミット中にロードされると、これらの統計がマージされて、ジョブ全体の集計統計が形成されます。*これは_SUCCESS
ファイルに保存されます。*また、mapreduce.manifest.committer.summary.report.directory
で指定されたディレクトリにあるそのファイルのコピーにも保存されます(設定されている場合)。保存されます。*クラスorg.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
は、これらをロードして印刷できます。
クエリで使用されるファイルシステム、入力ストリーム、および出力ストリームからのIO統計は収集されません。
PathOutputCommitter
APIを介してManifestCommitter
を呼び出すと、次の属性がアクティブな(スレッド)コンテキストに追加されます。
キー | 値 |
---|---|
ji |
ジョブID |
tai |
タスク試行 ID |
st |
ステージ |
これらは、ステージの実行の一部として作業を実行するすべてのヘルパースレッドでもすべて設定されます。
監査をサポートするストア/FSは、このデータを収集してログに含めることができます。
バックポートを容易にするために、すべての監査統合は、単一のクラスorg.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration
にあります。