Manifest Committer アーキテクチャ

このドキュメントでは、Manifest Committerのアーキテクチャおよびその他の実装/正確性に関する側面について説明します。

プロトコルとその正確性については、Manifest Committer プロトコルで説明されています。

マニフェストコミッターは、ABFS での「実世界」クエリのパフォーマンス、および GCS でのパフォーマンスと正確性を提供するコミッターです。

このコミッターは、S3A コミッター用に導入された拡張ポイントを使用します。ユーザーは、abfs:// および gcs:// URL 用の新しいコミッターファクトリーを宣言できます。Hadoop MapReduce および Apache Spark で使用できます。

背景

用語

用語 意味
コミッター タスクおよびジョブコミット操作を実行するために MR/Spark によって呼び出すことができるクラス。
Spark ドライバー 作業をスケジュールし、コミット操作を調整する Spark プロセス。
ジョブ MapReduce では、アプリケーション全体。Spark では、これは一連の作業における単一のステージです。
ジョブ試行 ジョブの単一の試行。MR は、部分的なジョブの失敗からの回復により、複数のジョブ試行をサポートします。Spark は「最初からやり直す」と言います。
タスク 1 つのファイル、またはファイルの一部を処理するなど、ジョブのサブセクション
タスク ID このジョブ内で一意のタスクの ID。通常は 0 から始まり、ファイル名 (part-0000, part-001 など) で使用されます。
タスク試行 (TA) タスクを実行する試行。失敗する可能性があり、その場合、MR/spark は別のタスクをスケジュールします。
タスク試行 ID タスク試行の一意の ID。タスク ID + 試行カウンター。
宛先ディレクトリ 作業の最終的な宛先。
ジョブ試行ディレクトリ ジョブ試行で使用される一時ディレクトリ。これは常に宛先ディレクトリの下にあり、HDFS と同じ暗号化ゾーン、他のファイルシステムのストレージボリュームなどにあることを保証します。
タスク試行ディレクトリ (「タスク試行作業ディレクトリ」とも呼ばれます)。ファイルが書き込まれる各タスク試行専用のディレクトリ
タスクコミット タスク試行の出力を取得し、その「成功した」タスクの最終的な/排他的な結果にすること。
ジョブコミット コミットされたすべてのタスクのすべての出力を集約し、ジョブの最終結果を生成すること。

コミッターの目的は、タスクの失敗が発生した場合でも、ジョブの完全な出力が宛先に確実に存在するようにすることです。

  • 完全: 出力には、成功したすべてのタスクの作業が含まれます。
  • 排他的: 失敗したタスクの出力は存在しません。
  • 同時実行: 複数のタスクが並行してコミットされる場合、出力はタスクコミットがシリアル化された場合と同じです。これはジョブコミットの要件ではありません。
  • 中断可能: ジョブとタスクは、ジョブコミットの前に中断される可能性があり、その後、出力は表示されません。
  • 正確性の継続性: ジョブがコミットされると、失敗、中断、または成功しなかったタスクの出力が将来のある時点で表示されてはなりません。

Hive の従来の階層型ディレクトリ構造テーブルの場合、ジョブコミットでは、コミットされたすべてのタスクの出力をディレクトリツリー内の正しい場所に配置する必要があります。

hadoop-mapreduce-client-core モジュールに組み込まれているコミッターは、FileOutputCommitter です。

Manifest Committer: Azure および Google ストレージ上の Spark 用の高性能コミッター。

Manifest Committer は、多くのタスクを通じて深いディレクトリツリー全体にファイルを作成するジョブ用の、ABFS および GCS ストレージ用の高性能コミッターです。

hdfs:// および実際には file:// URL でも機能しますが、クラウドストレージでのリストと名前変更のパフォーマンスおよびスロットリングの問題に対処するために最適化されています。

マニフェストファイルをコミットするためにアトミックな名前変更-上書きなし操作に依存しているため、S3 では正しく機能しません。また、生成されたすべてのデータを移動するのではなくコピーするというパフォーマンスの問題もあります。

MapReduce で動作しますが、以前の失敗した試行からの回復を伴う複数のジョブ試行の処理はありません。

マニフェスト

マニフェストファイルは、(IOStatistics やその他のものとともに) 次のものを含むように設計されています。

  1. 存在しない場合に作成する必要がある宛先ディレクトリのリスト。
  2. 名前変更するファイルのリスト。絶対ソース、絶対宛先、ファイルサイズのエントリとして記録されます。

タスクコミット

タスク試行は、次のようにコミットされます。

  1. タスク試行の作業ディレクトリを再帰的にリストして構築
  2. ファイルの名前が変更されるディレクトリのリスト。
  3. 名前変更するファイルのリスト: ソース、宛先、サイズ、およびオプションで etag。
  4. この情報を、タスク ID から派生したファイル名を持つジョブ試行ディレクトリ内のマニフェストファイルに保存します。注: マニフェストの作成がアトミックであることを保証するために、一時ファイルに書き込んでから、最終パスに名前を変更します。

名前変更は行われません。ファイルは元の場所に残されます。

ディレクトリツリーウォークはシングルスレッドであり、O(ディレクトリ)であり、各ディレクトリリストには 1 つ以上のページングされた LIST 呼び出しが使用されます。

これは単純であり、ほとんどのタスクでは、スキャンはジョブのクリティカルパスから外れています。

統計分析により、将来的に並列スキャンに移行することが正当化される可能性があります。

ジョブコミット

ジョブコミットは、次のもので構成されます。

  1. ジョブ試行ディレクトリ内のすべてのマニフェストファイルをリストします。
  2. 各マニフェストファイルをロードし、まだ存在しないディレクトリを作成し、次に名前変更リスト内の各ファイルの名前を変更します。
  3. S3A コミッターと同じ形式の JSON _SUCCESS ファイルを保存します (テスト用、アトミック保存には書き込みと名前変更を使用します)。

ジョブコミットフェーズは、多くのタスクとタスクごとの多くのファイルに対して並列化をサポートします。具体的には、

  1. マニフェストタスクはロードされ、「マニフェストプロセッサー」スレッドのプールで処理されます。
  2. ディレクトリ作成およびファイルの名前変更操作は、それぞれ「エグゼキューター」スレッドのプールで処理されます。多くの名前変更は、最小限のネットワーク IO を使用するため、並行して実行できます。
  3. ジョブのクリーンアップでは、タスク試行ディレクトリの削除を並列化できます。ディレクトリの削除は Google クラウドストレージで O(ファイル)であり、OAuth 認証を使用する場合は ABFS でも同様であるため、これは重要です。

上位ディレクトリの準備

必要に応じて、すべての上位をスキャンします...いずれかがファイルの場合は、削除します。

親ディレクトリの作成

  1. ディレクトリが存在するかどうかを共有ディレクトリマップでプローブします。見つかった場合: 操作は完了です。
  2. マップが空の場合、パスで getFileStatus() を呼び出します。見つからない場合: ディレクトリを作成し、エントリとすべての親パスのエントリを追加します。見つかってディレクトリの場合: エントリとすべての親パスのエントリを追加します。見つかってファイルの場合: 削除します。次に、前と同じように作成します。

ディレクトリの同時作成 (または削除+作成) を効率的に処理することは、問題のある箇所になります。作成するディレクトリのセットを構築するために、ある程度の労力が費やされています。

ファイル名の変更

ファイルは並行してリネームされます。

そのパスに何かが存在するかどうかの事前リネームチェック(および削除)はオプションになります。Sparkが各ファイルに新しいUUIDを作成するため、これは起こることはなく、HTTPリクエストを節約できます。

検証

コミットされたすべてのファイルのオプションのスキャンを行い、長さと、既知の場合はetagを検証します。テストと診断用です。

利点

  • ソースツリーリスト操作をタスクコミットフェーズにプッシュします。これは一般的に実行のクリティカルパスから外れています。
  • ディレクトリのリネームがアトミックであるとは限らないため、GCSへのアトミックなタスクコミットを提供します。
  • ワーカーからマニフェストにIOStatisticsを渡すことが可能です。
  • S3Aの「パーティション分割されたステージングコミッター」と同様に、リネーム前の操作をいくつか許可します。これにより、作成予定のディレクトリ内の既存のエントリをすべて削除するか、これらのパーティションが空でない場合は失敗するように構成できます。パーティション分割されたステージングコミッターを参照してください。
  • オプションのプレフライト検証チェックを許可します(異なるタスクによって作成された重複ファイルがないことを確認)。
  • マニフェストは、開発/デバッグ中に表示したり、出力サイズを決定したりできます。

欠点

  • 新しいマニフェストファイル形式が必要です。
  • タスクコミットがより複雑になる可能性があります。

このソリューションはGCSに必要であり、ABFSでもタスクコミッターでリストのオーバーヘッドが支払われるため有益であるはずです。

実装の詳細

制約

重要な目標は、マニフェストコミッターを隔離し、既存のコミッターコードやhadoopコードベースの他の部分に触れないようにすることです。

S3Aコミッター用に既に実装されている変更以外に、MRおよびSparkに直接プラグインする必要があります。

  • 自己完結型:hadoop-commonなどの変更は必要ありません。
  • 隔離:既存のコミッターに変更を加えないでください。
  • 統合:PathOutputCommitterFactoryを介してバインドする必要があります。

この結果、org.apache.hadoop.util.functional.TaskPoolはS3ACommitterのorg.apache.hadoop.fs.s3a.commit.Tasksに基づいているなど、他の場所から少しコピーアンドペーストされています。

_SUCCESSファイルは、S3A JSONファイルと互換性がある必要があります。これは、S3Aコミッターの出力を検証する既存のテストスイートが、変更なしにマニフェストコミッターによって実行されるジョブを再ターゲットできるようにするためです。

ジョブコミットにおける進捗コールバック。

いつ?提案:リネームが最終的に完了するまでハートビートします。

ジョブコミットにおけるエラー処理と中断。

ジョブコミット全体を停止する必要があります。各タスクコミッターのスレッドがディレクトリを反復処理する際(または各ファイルを処理する際)に、アトミックなブール値「ジョブの中断」をチェックする必要があります。リストまたはリネームの失敗は、ジョブコミット全体の停止にエスカレーションする必要があります。これは、非同期リネーム操作またはタスクコミッタースレッドで発生したIOEが、次のことを行う必要があることを意味します。

  1. キャッチされる
  2. 共有フィールド/変数に格納される
  3. 中断をトリガーする
  4. commitJob()呼び出しの最後に再スローされる

デッドロックの回避

ジョブコミットステージがタスクごとの操作(ファイルのロードなど)にスレッドプールを使用している場合、同じスレッドプールをタスクごとのステージ内の並列操作に使用してはなりません。

すべてのJobStageはタスクまたはジョブコミット内で順次実行されるため、ステージ間で同じスレッドプールを共有しても安全です。

現在の実装では、ジョブコミットにおいてファイルを実際にロードする以外に、並列の「マニフェストごと」の操作はありません。ディレクトリを作成し、ファイルの名前を変更する操作は、実際には個々のマニフェストの並列処理を実行せずに実行されます。

ディレクトリ準備:すべてのマニフェストのディレクトリリストをマージし、一意のディレクトリの(うまくいけば非常に小さい)セットの作成をキューに入れます。

リネーム:すべてのマニフェストを反復処理し、リネームのためにそれらのリネームをプールに入れます。

スレッドプールのライフサイクル

スレッドプールの寿命は、ステージ構成の寿命に制限されます。これは、セットアップ、コミット、中断、クリーンアップの各PathOutputCommitterメソッド内に制限されます。

これにより、S3Aコミッターのスレッドプールライフサイクルの問題が回避されます。

S3A HADOOP-16570と同様のスケール問題。

これは、多くのタスクがそれぞれ多くのファイルを生成したテラソートでの失敗でした。コミットするファイルの完全なリスト(およびすべてのブロックのetag)がメモリ内に構築され、実行前に検証されました。

マニフェストコミッターは、メモリに保存されるデータ量が少ないと想定しています。これは、コミットされるすべてのファイルのすべてのブロックのetagを保存する必要がなくなったためです。

宛先ディレクトリ内のディレクトリの重複作成

作成するディレクトリのすべてのリストを結合し、重複を削除します。

実装アーキテクチャ

実装アーキテクチャは、S3Aコネクターからの教訓を反映しています。

  • コミットステージをMRコミットクラスから分離します。これはライフサイクルが複雑です。
  • 代わりに、分離してテストでき、最終プロトコルを提供するために連鎖できる一連のステージに分割します。
  • MRデータ型(taskIDなど)をステージに渡さないでください。一般的な型(stringなど)の構成を渡します。
  • また、ストア操作用のコールバックを渡して、フェイクストアの実装を容易にします。
  • 各ステージについて:前提条件と事後条件、失敗モードを定義します。分離してテストします。

統計

コミッターは、ファイルシステムに対して実行/呼び出すすべての操作の実行時間の統計を収集します。*タスクコミット中に収集されたものは、マニフェストに保存されます(そのファイルの保存とリネームにかかる時間を除く)。*これらのマニフェストがジョブコミット中にロードされると、これらの統計がマージされて、ジョブ全体の集計統計が形成されます。*これは_SUCCESSファイルに保存されます。*また、mapreduce.manifest.committer.summary.report.directoryで指定されたディレクトリにあるそのファイルのコピーにも保存されます(設定されている場合)。保存されます。*クラスorg.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinterは、これらをロードして印刷できます。

クエリで使用されるファイルシステム、入力ストリーム、および出力ストリームからのIO統計は収集されません。

監査

PathOutputCommitter APIを介してManifestCommitterを呼び出すと、次の属性がアクティブな(スレッド)コンテキストに追加されます。

キー
ji ジョブID
tai タスク試行 ID
st ステージ

これらは、ステージの実行の一部として作業を実行するすべてのヘルパースレッドでもすべて設定されます。

監査をサポートするストア/FSは、このデータを収集してログに含めることができます。

バックポートを容易にするために、すべての監査統合は、単一のクラスorg.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegrationにあります。