マニフェストコミッタープロトコル

このドキュメントでは、マニフェストコミッターのコミットプロトコルについて説明します。

背景

用語

用語 意味
コミッター タスクおよびジョブのコミット操作を実行するためにMR Sparkによって呼び出すことができるクラス。
Sparkドライバー 作業をスケジューリングし、コミット操作を調整するSparkプロセス。
ジョブ: MapReduceの場合 アプリケーション全体。Sparkでは、これは一連の作業の単一のステージです。
ジョブ試行 ジョブの1回の試行。MRは、ジョブの部分的な失敗からの回復を伴う複数のジョブ試行をサポートします。Sparkは「最初からやり直す」と言います。
タスク 1つのファイルまたはファイルの一部を処理するなど、ジョブのサブセクション
タスクID このジョブ内で一意のタスクのID。通常は0から始まり、ファイル名(part-0000、part-001など)で使用されます。
タスク試行 (TA) タスクを実行しようとする試み。失敗する可能性があり、その場合、MR/Sparkは別のタスクをスケジュールします。
タスク試行ID タスク試行の一意のID。タスクID + 試行カウンター。
宛先ディレクトリ 作業の最終的な宛先。
ジョブ試行ディレクトリ ジョブ試行で使用される一時ディレクトリ。これは常に宛先ディレクトリの下にあるため、HDFS、他のファイルシステムのストレージボリュームなどと同じ暗号化ゾーンにあることを保証します。
タスク試行ディレクトリ ジョブ試行ディレクトリの下にあるディレクトリで、タスク試行が自身の作業のためのサブディレクトリを作成します
タスク試行作業ディレクトリ ファイルが書き込まれるタスク試行ごとに排他的なディレクトリ
タスクのコミット タスク試行の出力を取得し、その「成功した」タスクの最終的/排他的な結果にする。
ジョブのコミット コミットされたすべてのタスクのすべての出力を集約し、ジョブの最終結果を生成します。

コミッターの目的は、タスクの失敗があっても、ジョブの完全な出力が宛先に確実に到達するようにすることです。

  • 完全: 出力には、成功したすべてのタスクの作業が含まれます。
  • 排他的: 失敗したタスクの出力は存在しません。
  • 同時: 複数のタスクが並行してコミットされた場合、出力はタスクのコミットがシリアル化された場合と同じです。これはジョブコミットの要件ではありません。
  • 中止可能: ジョブとタスクは、ジョブコミットの前に中止される可能性があり、その後、それらの出力は表示されません。
  • 正確性の継続性: ジョブがコミットされると、失敗、中止、または成功しなかったタスクの出力が将来のある時点で表示されない必要があります。

Hiveの従来の階層型ディレクトリ構造のテーブルの場合、ジョブコミットでは、コミットされたすべてのタスクの出力がディレクトリツリー内の正しい場所に配置される必要があります。

hadoop-mapreduce-client-coreモジュールに組み込まれているコミッターは、FileOutputCommitterです。

これにはv1とv2の2つのアルゴリズムがあります。

v1アルゴリズムは、あらゆる形式のタスクの失敗に対して回復力がありますが、新しく作成された各ファイルをテーブル内の正しい場所に1つずつ名前変更するため、最終的な集計出力をコミットするときに遅くなります。

v2アルゴリズムは、出力がジョブコミットまで遅延されるのではなく、個々のタスクがコミットされたときに表示されるため、安全とはみなされません。複数のタスク試行がデータを出力ディレクトリツリーに取得する可能性があり、ジョブが失敗/中止された場合、ジョブがコミットされる前に、その出力が表示されます。

ファイル出力コミッターV1とV2

ファイル出力コミッターV1およびV2のコミットアルゴリズム

タスク試行の実行 (V1およびV2)

$dest/__temporary/$jobAttemptId/のジョブ試行ディレクトリには、進行中のジョブのすべての出力が含まれており、すべてのタスク試行には独自のタスク試行ディレクトリ$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptIdが割り当てられます

タスクのすべての作業は、タスク試行ディレクトリの下に書き込まれます。出力がルートにファイルを持つ深いツリーである場合、タスク試行ディレクトリは、生成されたファイルとそれらの上のディレクトリを含む、同様の構造になります。

MapReduce V1アルゴリズム

v1タスクのコミット

タスク試行ディレクトリは、ジョブ試行ディレクトリの直下に名前が変更されます

rename(
  $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
  $dest/__temporary/$jobAttemptId/$taskId)

V1ジョブコミット

コミットされたタスクごとに、下のすべてのファイルは宛先ディレクトリに名前が変更され、タスクのベースディレクトリからの相対的なファイル名が宛先ディレクトリのファイル名に再マッピングされます。

つまり、$dest/__temporary/$jobAttemptId/$taskIdの下にあるすべてのものが、$destの下のパスに変換されます。

再マップするパスは、再帰的なツリーウォークによって各TAディレクトリ内で識別されます。タスクディレクトリツリーに、宛先の下に存在しないサブディレクトリが含まれている場合は、いくつかの最適化があります。この場合、ディレクトリ全体の名前を変更できます。ディレクトリがすでに存在する場合は、ファイルごとのマージがそのディレクトリに対して行われ、サブディレクトリのアクションは再び宛先の存在に依存します。

その結果、各タスクの出力が別の最終ディレクトリに移動する場合(たとえば、最終パーティションが単一のタスクに固有の場合)、名前変更は子に関係なく、ディレクトリに対してO(1)になります。出力が他のタスクと同じディレクトリに移動する場合(または既存のディレクトリを更新する場合)、名前変更のパフォーマンスはO(ファイル数)になります。

最後に、mapreduce.fileoutputcommitter.marksuccessfuljobsがtrueの場合のみ、0バイトの_SUCCESSファイルが書き込まれます。

MapReduce V2アルゴリズム

V2タスクのコミット

タスク試行ディレクトリ内のファイルは、1つずつ宛先ディレクトリに名前が変更されます。ディレクトリの名前変更を最適化する試みはありません。他のタスクが同時に作業をコミットしている可能性があるためです。したがって、O(ファイル数) + ディレクトリツリーのリスト作成コストとなります。再度述べますが、再帰的なツリーウォークで実行され、HDFSや(ここでは関係ありませんが)S3でより高速になるであろう、深いlistFiles(path, recursive=true)APIは使用されません。

V2 ジョブコミット

mapreduce.fileoutputcommitter.marksuccessfuljobsがtrueの場合のみ、0バイトの_SUCCESSファイルが書き込まれます。

なぜV2コミッターは正しくない/安全でないのか

タスクT1について、タスク試行1(T1A1)がコミット前に失敗した場合、ドライバーは新しい試行「T1A2」をスケジュールし、それをコミットします。すべてうまくいきます。

しかし、T1A1がコミット許可を与えられ、コミットプロセス中に失敗した場合、その出力の一部が宛先ディレクトリに書き込まれている可能性があります。

もし試行T1A2がコミットするように指示された場合、その出力が正確なファイル名のセットを持っている場合にのみ、すでに名前が変更されたファイルが上書きされます。異なるファイル名が生成された場合、出力にはT1A1とT1A2のファイルが含まれることになります。

T1A1がコミットプロセス中にパーティション分割された場合、ジョブコミッターは別の試行をスケジュールし、その作業をコミットします。しかし、もしT1A1がまだファイルシステムに接続している場合、ファイルの改名を続けている可能性があります。たとえ同じファイル名が使用されていたとしても、2つのタスクの出力が混ざり合う可能性があります。

背景:S3Aコミッター

論文、ゼロ名前変更コミッター、Loughran et. al.は、これらのコミッターについて解説しています。

この論文では、コミット問題、正確さの定義、v1およびv2コミッターのアルゴリズム、S3Aコミッター、IBM Stocatorコミッター、そしてEMRのSparkコミッターについて私たちが知っていることも解説しています。

hadoop-aws JARには、「ステージング」と「マジック」という2つのコミッターが含まれています。これらは両方とも、同じ問題の実装です。すなわち、作業を安全かつ迅速にS3オブジェクトストアにコミットすることです。

コミッターは、S3がファイルを作成するアトミックな方法(PUTリクエスト)を提供しているという事実を利用しています。

ファイルは存在するか、存在しないかのどちらかです。ファイルは宛先に直接アップロードでき、アップロードが完了したときにのみファイルがマニフェスト(既存のコピーを上書き)されます。

大きなファイルの場合、マルチパートアップロードにより、このアップロード操作を一連のPOSTリクエストに分割できます。

1. initiate-upload (パス -> アップロードID) 1. upload part(パス, アップロードID, data[]) -> チェックサム。これは並列化できます。1つのオブジェクトに最大10,000個のパーツをアップロードできます。最後のパーツを除くすべては、>= 5MBである必要があります。 1. complete-upload (パス, アップロードID, List<チェックサム>) これにより、チェックサムの順序によって定義されたブロックのシーケンスでパーツからファイルを構築し、ファイルをマニフェストします。

S3Aコミッターの秘訣は、ファイルはタスク試行の実行/コミット中にアップロードされますが、最後のPOSTリクエストはジョブコミットフェーズまで遅延できることです。タスク試行は、各ファイルの最終的な宛先を決定し、マルチパート操作の一部としてデータをアップロードし、アップロードを完了するために必要な情報をファイルに保存する必要があります。このファイルは後でジョブコミッターによって読み取られ、POSTリクエストで使用されます。

ステージングコミッター

ステージングコミッターは、NetflixのRyan Blueによる貢献に基づいています。これは、.pendingsetファイルを伝播するための整合性のあるストアとしてHDFSに依存しています。

各タスク試行の作業ディレクトリは、ローカルファイルシステムの「ステージングディレクトリ」にあります。アップロードを完了するために必要な情報は、クラスターHDFSファイルシステムと連携するv1 FileOutputCommitterを使用して、タスク試行からジョブコミッターに渡されます。これにより、コミッターがv1アルゴリズムと同じ正確さの保証を持つことが保証されます。

  1. タスクコミットは、ローカルファイルシステムのタスク試行作業ディレクトリにあるすべてのファイルを、最終的な宛先パスにアップロードすることで構成されます。ただし、最終的なマニフェストPOSTは保留されます。
  2. タスク試行内のすべてのファイルのアップロードを完了するために必要なすべての情報を含むJSONファイルが、HDFSと連携するラップされたコミッターのジョブ試行ディレクトリに書き込まれます。
  3. ジョブコミット:HDFSジョブ試行ディレクトリ内のすべてのマニフェストファイルをロードし、アップロードを完了するためのPOSTリクエストを発行します。これらは並列化されます。

マジックコミッター

マジックコミッターは、純粋なS3Aであり、ファイルシステムクライアント自体内で変更を行うことができたという事実を利用しています。

「マジック」パスは、書き込み用に開かれたときに、最終的な宛先ディレクトリへのマルチパーティアップロードを開始するように定義されています。出力ストリームがclose()されると、ゼロバイトのマーカーファイルがマジックパスに書き込まれ、アップロードを完了するために必要なすべての情報を含むJSONの.pendingファイルが保存されます。

タスクコミット:1. 各タスク試行のマジックディレクトリにあるすべての.pendingファイルをリストします。 1. .pendingsetファイルに集約します。 1.タスクIDとともにジョブ試行ディレクトリに保存します。

ジョブコミット

  1. ジョブ試行ディレクトリにある.pendingsetファイルをリストします。
  2. POSTリクエストでアップロードを完了します。

マジックコミッターは、整合性のあるS3ストア(当初はS3Guardを使用)を絶対に必要とします。現在、S3は整合性が保たれているため、生のS3を使用できます。HDFSやrename()を使用する他のファイルシステムは必要ありません。

正確さ

S3Aコミッターは、次の理由で正しいと見なされています。

  1. ジョブコミットまで何もマテリアライズされません。
  2. 1つのタスク試行のマニフェストのみをジョブ試行ディレクトリに保存できます。したがって、同じタスクIDのTAのファイルのみが排他的にコミットされます。
  3. ステージングコミッターがTAからジョブコミッターにマニフェストを渡すためにHDFSを使用することで、S3の最終的な整合性によってマニフェストが見逃されることがなくなります。
  4. S3に整合性があるまでは、マジックコミッターはタスクコミットとジョブコミットの両方で必要なリストの整合性を提供するためにS3Guardに依存していました。
  5. 著者およびより広いコミュニティは、本番環境で表面化したコミッターに関連するすべての問題を修正しました。

修正された重要な問題には、以下が含まれます。

  • HADOOP-15961。S3Aコミッター:定期的にprogress()呼び出しがあることを確認します。
  • HADOOP-16570。S3Aコミッターはスケールに関する問題に遭遇します。
  • HADOOP-16798。S3Aコミッターのスレッドプールのシャットダウンの問題。
  • HADOOP-17112。S3Aコミッターはパス内の空白を処理できません。
  • HADOOP-17318。同じアプリ試行IDで同時S3Aコミットジョブをサポートします。
  • HADOOP-17258。MagicS3GuardCommitterがpendingsetが既に存在すると失敗します。
  • HADOOP-17414。マジックコミッターのファイルには、Sparkによって収集された書き込まれたバイト数がありません。
  • SPARK-33230。Hadoopコミッターは、spark.sql.sources.writeJobUUIDで一意のジョブIDを取得する必要があります。
  • SPARK-33402。同じ秒に起動されたジョブは、重複したMapReduce JobIDを持っています。
  • SPARK-33739。S3Aマジックコミッターを介してコミットされたジョブは、書き込まれたバイト数を報告しません(HADOOP-17414に依存)。

スケール/パフォーマンス/UXではなく、正確性に影響を与えた問題のうち、HADOOP-17258は、TA1タスクコミットが完了した後(ただし、レポートには失敗していた)の障害からの回復に関係していました。SPARK-33402、SPARK-33230、およびHADOOP-17318はすべて関連しています。もし2つのSparkジョブ/ステージが同じ秒に開始された場合、同じジョブIDを持っていました。これにより、ステージングコミッターで使用されるHDFSディレクトリが混ざり合ってしまいました。

特筆すべきは、これらはすべて、最小限の統合テストスイートでは発見されなかった問題であるということです。

良いニュースは、これらの問題について私たちが知っており、再びそれらを複製することを避けることができるようになったことです。そして、何をテストするためのテストを書くかを知っています。

V1コミッター:Azureでは遅く、GCSでは遅くて安全ではありません。

ABFSでは、V1コミッターのパフォーマンスが低い理由は、次のとおりです。

  1. ディレクトリのリスト表示とファイルの名前変更は、HDFSの場合よりもABFSの方がやや遅いです。
  2. v1コミッターは、コミットされた各タスクの出力のリストを作成し、宛先に存在しない場合にディレクトリを移動し、既存のディレクトリにファイルをマージすることによって、各タスクの出力を順番にコミットします。

V2コミッターは、リストと名前変更プロセスをタスクコミットで実行するため、ジョブコミットがはるかに高速です。これは、非アトミックであるため、使用するのが危険であると考えられている理由です。V2タスクコミットアルゴリズムが示すのは、ファイルごとの名前変更のみを使用することで、異なるタスクの出力のコミットを並列化できるということです。

V1コミッターは、タスクコミット操作(ディレクトリの名前変更)でさえも非アトミックなO(ファイル数)操作であるため、GCSでのパフォーマンスが低くなります。これはまた、安全でないことを意味します。

タスク試行がパーティション分割され、Sparkドライバーが別のTAをスケジュール/コミットした場合、タスクディレクトリには最初の試行のファイルが1つ以上含まれている可能性があります。


マニフェストコミッタープロトコル

ストアの要件

このコミッターでサポートされているストア/ファイルシステムは、以下を満たす必要があります。

  • 一貫性のあるリスト表示が必要です。
  • アトミックなO(1)ファイルの名前変更操作が必要です。

このコミッターでサポートされているストア/ファイルシステムは、以下を満たす必要があります。

  • 負荷がかかっていても、ファイルの名前を正常に変更します。ABFSはこれを行わないため、特別な回復が提供されています。
  • HADOOP-17979のEtagSourceインターフェースを実装します。これは、ABFSの名前変更回復、および最終出力のオプションの検証に使用されます。

このコミッターでサポートされているストア/ファイルシステムは、以下の場合があります。

  • レイテンシーの高いリスト操作がある。
  • スロットリング応答で負荷がかかっているときに呼び出しを拒否します。これは、ファイルシステムコネクタで処理する必要があります。

このコミッターでサポートされているストア/ファイルシステムは、以下を満たさない場合があります。

  • アトミックなディレクトリの名前変更をサポートします。これは、オプションのクリーンアップ以外では使用されません。
  • O(1)ディレクトリ削除をサポートします。CleanupJobStageはそうではないと仮定しているため、タスク試行ディレクトリを並行して削除します。
  • アトミックなcreate(Path, overwrite=false)操作をサポートします。マニフェストは、タスク試行IDを含むパスに書き込んでから、最終的なパスに名前を変更することでコミットされます。
  • 高速なlistFiles(path, recursive=true)呼び出しをサポートします。このAPI呼び出しは使用されません。

FileOutputCommitterと比較して、削除された要件は次のとおりです。

  • アトミックなディレクトリリネーム。
  • O(1) のディレクトリ削除。
  • 高速なディレクトリリスト表示。
  • スロットリング動作の暗黙的な不在。

HDFS はこれらの要件をすべて満たしているため、このコミッターから大きな恩恵を受けることはありませんが、それでも動作します。

S3 ストアは、一貫性が保たれるようになった現在でも、このコミッターのリネーム要件を満たしていません。このコミッターは S3 での使用は安全ではありません。

タスク ID とジョブ ID

すべてのジョブは一意の ID を持つ必要があります。

実装では、これを保証するための関連パッチが Spark ランタイムに適用されていることを想定しています。

ジョブ ID は、_temporary/0/ のような従来のインクリメントする自然数スキームの代わりに、一時ディレクトリの名前付けに使用されます。このスキームは MapReduce に由来し、attempt ID > 1 のジョブ試行は先行ジョブによってコミットされたタスクを探し、その結果を組み込みます。

このコミッターは、リカバリーを試みない Spark をターゲットにしています。パスにジョブ ID を使用することにより、ジョブがジョブのクリーンアップ/アボート時に _temporary をすべて削除しないように構成されている場合、複数のジョブが同じテーブルを宛先として使用して実行される可能性があります。

タスク ID とタスク試行 ID は、通常どおりジョブ ID から派生します。

書き込まれたファイルのファイル名は一意であることが期待されます。これは Spark で ORC ファイルと Parquet ファイルに対して行われ、デフォルトでは宛先ファイルのチェックを省略できます。

ディレクトリ構造

宛先ディレクトリが destDir: Path である場合

ID が jobID: String で試行回数が jobAttemptNumber:int のジョブは、ディレクトリを使用します

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/

その作業のために使用します (注意: 実際には最後のサブディレクトリは %02d でフォーマットされます)。

これは *ジョブ試行ディレクトリ* と呼ばれます。

ジョブ試行ディレクトリの下に、サブディレクトリ tasks が作成されます。これは *タスク試行ディレクトリ* と呼ばれます。すべてのタスク試行には、独自のサブディレクトリがあり、その作業が保存されます。

ジョブ試行ディレクトリの下に、サブディレクトリ manifests が作成されます。これは *y* と呼ばれます。

コミットされたすべてのタスクのマニフェストは、$taskId-manifest.json というファイル名でこのディレクトリに保存されます。

フルパス

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json

は、コミットされたタスクによって作成されたすべてのファイルのマニフェストの最終的な場所です。これは *コミットされたタスクのマニフェストパス* と呼ばれます。

タスク試行は、一時ファイル名 $taskAttemptId-manifest.json.tmp でマニフェストをこのディレクトリに保存します。

これは *タスク試行のマニフェストの一時パス* と呼ばれます。

ジョブとタスクの操作については、次のパスが定義されます。

let jobDirectory = "$destDir/_temporary/manifest_$jobID/"
let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/"
let manifestDirectory = jobAttemptDirectory + "manifests/"
let taskAttemptDirectory = jobAttemptDirectory + "tasks/"

また、各タスク試行については、次のパスも定義されます。

let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId"
let taskManifestPath = manifestDirectory + "$taskId-manifest.json"
let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"

プロトコルのコアアルゴリズム

  1. 各タスク試行は、すべてのファイルをタスク試行ディレクトリの下にある一意のディレクトリツリーに書き込みます。
  2. タスクのコミットは、そのタスク試行のディレクトリを再帰的にスキャンし、ディレクトリのリストとファイルのリストを作成することで構成されます。
  3. これらのリストは JSON マニフェストファイルとして保存されます。
  4. ジョブのコミットは、すべての JSON マニフェストファイルのリストを作成し、その内容をロードし、宛先ディレクトリの集約セットを作成し、すべてのファイルを最終的な宛先にリネームすることで構成されます。

中間マニフェスト

これは、(IOStatistics といくつかの診断情報に加えて) 次のものを含むように設計された JSON ファイルです。

  1. 存在しない場合に作成する必要がある宛先ディレクトリのリスト。
  2. (絶対ソース、絶対宛先、ファイルサイズ) エントリとしてリネームするファイルのリスト。

ジョブのセットアップ

mkdir(jobAttemptDirectory)
mkdir(manifestDirectory)
mkdir(taskAttemptDirectory)

タスクのセットアップ

mkdir(taskAttemptWorkingDirectory)

タスクのコミット

タスク試行は次のようにコミットされます。

  1. タスク試行の作業ディレクトリを再帰的にリストして構築します
  2. ファイルがリネームされる宛先ディレクトリのリストと、そのステータス (exists, not_found, file)
  3. リネームするファイルのリスト: ソース、宛先、サイズ、およびオプションで etag。
  4. これらのリストは、*中間マニフェスト* である JSON ファイルに格納されます。
  5. タスク試行は、このファイルを *タスク試行のマニフェストの一時パス* に保存します。
  6. タスク試行は、*コミットされたタスクのマニフェストパス* を削除し、独自のマニフェストファイルをそのパスにリネームします。
  7. リネームが成功した場合、タスクのコミットは成功と見なされます。

この時点ではリネームは行われません。ファイルは、ジョブのコミットでリネームされるまで元の場所に残されます。

let (renames, directories) = scan(taskAttemptWorkingDirectory)
let manifest = new Manifest(renames, directories)

manifest.save(taskAttemptTemporaryManifestPath)
rename(taskAttemptTemporaryManifestPath, taskManifestPath)

タスクのアボート/クリーンアップ

delete(taskAttemptWorkingDirectory)

ジョブのコミット

ジョブのコミットは、次の構成になります。

  1. ジョブ試行ディレクトリ内のすべてのマニフェストファイルをリストします。
  2. 各マニフェストファイルをロードし、まだ存在しないディレクトリを作成し、リネームリスト内の各ファイルをリネームします。
  3. オプションで、S3A コミッターと同じ形式の JSON _SUCCESS ファイルを保存します (テスト用; アトミックな保存には書き込みとリネームを使用します)。

ジョブのコミットフェーズは、多数のタスクとタスクごとの多数のファイルに対して並列化をサポートしています。具体的には、並列ストア IO 用のスレッドプールがあります。

  1. マニフェストタスクはロードされ、並行して処理されます。
  2. ディレクトリを作成しようとしている場所でのファイルの削除。
  3. リーフディレクトリの作成。
  4. ファイルのリネーム。
  5. クリーンアップとアボートの場合: タスク試行ディレクトリの削除
  6. テスト/デバッグのために出力の検証が有効になっている場合は、getFileStatus を呼び出してファイル長を比較し、可能であれば etag も比較します。
let manifestPaths = list("$manifestDirectory/*-manifest.json")
let manifests = manifestPaths.map(p -> loadManifest(p))
let directoriesToCreate = merge(manifests.directories)
let filesToRename = concat(manifests.files)

directoriesToCreate.map(p -> mkdirs(p))
filesToRename.map((src, dest, etag) -> rename(src, dest, etag))

if mapreduce.fileoutputcommitter.marksuccessfuljobs then
  success.save("$destDir/_SUCCESS")

実装メモ

デバッグと開発を容易にするために、サマリーは同じ *または異なる* ファイルシステムの場所に保存できます。中間マニフェストは、ターゲットファイルシステムの場所にリネームできます。

if summary.report.directory != "" then
  success.save("${summary.report.directory}/$jobID.json")
if diagnostics.manifest.directory != null then
  rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")

サマリーレポートは、ジョブのコミットが何らかの理由で失敗した場合でも保存されます。

ジョブのアボート/クリーンアップ

ジョブのクリーンアップは、名目上はジョブディレクトリの削除です。

delete(jobDirectory)

オブジェクトストアのスケーリングの問題に対処するために、これは、すべてのタスク試行作業ディレクトリの (並列化された) 削除が先行する必要があります。

let taskAttemptWorkingDirectories = list("taskAttemptDirectory")
taskAttemptWorkingDirectories.map(p -> delete(p))

新しいプロトコルの利点

  • ソースツリーリスト操作を、一般に実行のクリティカルパスから外れているタスクのコミットフェーズにプッシュします。
  • プローブ/作成されるディレクトリの数を、すべての重複が排除された出力ディレクトリの集約セットに削減します。
  • ファイルのリネームは並列化でき、制限は構成されたスレッドプールサイズやレート制限の制約となります。
  • ディレクトリのリネームがアトミックであるという期待がないため、GCS へのアトミックなタスクコミットを提供します。
  • マニフェストを介して、タスク試行からジョブコミッターに IOStatistics を渡すことを許可します。
  • S3A「パーティション分割されたステージングコミッター」と同様に、ジョブコミッターで一部のリネーム前の操作を許可します。これは、作成がスケジュールされているディレクトリ内の既存のエントリをすべて削除するように構成するか、それらのパーティションが空でない場合に失敗するように構成できます。「パーティション分割されたステージングコミッター」を参照してください。
  • オプションの事前検証チェック (異なるタスクによって作成された重複ファイルがないことを確認) を許可します。
  • マニフェストは、開発/デバッグ中に表示したり、出力サイズを決定したりできます。

v1 アルゴリズムと比較した新しいプロトコルの欠点

  • 新しいマニフェストファイル形式が必要です。
  • タスクが多数のファイルやサブディレクトリを作成した場合、または etag が収集され、これらのタグの長さが重要になる場合は、マニフェストが大きくなる可能性があります。HTTP プロトコルでは各 etag が 8 KiB に制限されているため、コストはファイルあたり 8 KiB になる可能性があります。
  • タスクのコミットを v1 アルゴリズムよりも複雑にします。
  • 個々のタスクが一意の出力ディレクトリを作成するジョブでは、ディレクトリのリネームがディレクトリのコミットに使用されないため、最適ではない可能性があります。