このドキュメントでは、マニフェストコミッターのコミットプロトコルについて説明します。
用語 | 意味 |
---|---|
コミッター | タスクおよびジョブのコミット操作を実行するためにMR Sparkによって呼び出すことができるクラス。 |
Sparkドライバー | 作業をスケジューリングし、コミット操作を調整するSparkプロセス。 |
ジョブ: MapReduceの場合 | アプリケーション全体。Sparkでは、これは一連の作業の単一のステージです。 |
ジョブ試行 | ジョブの1回の試行。MRは、ジョブの部分的な失敗からの回復を伴う複数のジョブ試行をサポートします。Sparkは「最初からやり直す」と言います。 |
タスク | 1つのファイルまたはファイルの一部を処理するなど、ジョブのサブセクション |
タスクID | このジョブ内で一意のタスクのID。通常は0から始まり、ファイル名(part-0000、part-001など)で使用されます。 |
タスク試行 (TA) | タスクを実行しようとする試み。失敗する可能性があり、その場合、MR/Sparkは別のタスクをスケジュールします。 |
タスク試行ID | タスク試行の一意のID。タスクID + 試行カウンター。 |
宛先ディレクトリ | 作業の最終的な宛先。 |
ジョブ試行ディレクトリ | ジョブ試行で使用される一時ディレクトリ。これは常に宛先ディレクトリの下にあるため、HDFS、他のファイルシステムのストレージボリュームなどと同じ暗号化ゾーンにあることを保証します。 |
タスク試行ディレクトリ | ジョブ試行ディレクトリの下にあるディレクトリで、タスク試行が自身の作業のためのサブディレクトリを作成します |
タスク試行作業ディレクトリ | ファイルが書き込まれるタスク試行ごとに排他的なディレクトリ |
タスクのコミット | タスク試行の出力を取得し、その「成功した」タスクの最終的/排他的な結果にする。 |
ジョブのコミット | コミットされたすべてのタスクのすべての出力を集約し、ジョブの最終結果を生成します。 |
コミッターの目的は、タスクの失敗があっても、ジョブの完全な出力が宛先に確実に到達するようにすることです。
Hiveの従来の階層型ディレクトリ構造のテーブルの場合、ジョブコミットでは、コミットされたすべてのタスクの出力がディレクトリツリー内の正しい場所に配置される必要があります。
hadoop-mapreduce-client-core
モジュールに組み込まれているコミッターは、FileOutputCommitter
です。
これにはv1とv2の2つのアルゴリズムがあります。
v1アルゴリズムは、あらゆる形式のタスクの失敗に対して回復力がありますが、新しく作成された各ファイルをテーブル内の正しい場所に1つずつ名前変更するため、最終的な集計出力をコミットするときに遅くなります。
v2アルゴリズムは、出力がジョブコミットまで遅延されるのではなく、個々のタスクがコミットされたときに表示されるため、安全とはみなされません。複数のタスク試行がデータを出力ディレクトリツリーに取得する可能性があり、ジョブが失敗/中止された場合、ジョブがコミットされる前に、その出力が表示されます。
$dest/__temporary/$jobAttemptId/
のジョブ試行ディレクトリには、進行中のジョブのすべての出力が含まれており、すべてのタスク試行には独自のタスク試行ディレクトリ$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
が割り当てられます
タスクのすべての作業は、タスク試行ディレクトリの下に書き込まれます。出力がルートにファイルを持つ深いツリーである場合、タスク試行ディレクトリは、生成されたファイルとそれらの上のディレクトリを含む、同様の構造になります。
タスク試行ディレクトリは、ジョブ試行ディレクトリの直下に名前が変更されます
rename( $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId $dest/__temporary/$jobAttemptId/$taskId)
コミットされたタスクごとに、下のすべてのファイルは宛先ディレクトリに名前が変更され、タスクのベースディレクトリからの相対的なファイル名が宛先ディレクトリのファイル名に再マッピングされます。
つまり、$dest/__temporary/$jobAttemptId/$taskId
の下にあるすべてのものが、$dest
の下のパスに変換されます。
再マップするパスは、再帰的なツリーウォークによって各TAディレクトリ内で識別されます。タスクディレクトリツリーに、宛先の下に存在しないサブディレクトリが含まれている場合は、いくつかの最適化があります。この場合、ディレクトリ全体の名前を変更できます。ディレクトリがすでに存在する場合は、ファイルごとのマージがそのディレクトリに対して行われ、サブディレクトリのアクションは再び宛先の存在に依存します。
その結果、各タスクの出力が別の最終ディレクトリに移動する場合(たとえば、最終パーティションが単一のタスクに固有の場合)、名前変更は子に関係なく、ディレクトリに対してO(1)になります。出力が他のタスクと同じディレクトリに移動する場合(または既存のディレクトリを更新する場合)、名前変更のパフォーマンスはO(ファイル数)になります。
最後に、mapreduce.fileoutputcommitter.marksuccessfuljobs
がtrueの場合のみ、0バイトの_SUCCESS
ファイルが書き込まれます。
タスク試行ディレクトリ内のファイルは、1つずつ宛先ディレクトリに名前が変更されます。ディレクトリの名前変更を最適化する試みはありません。他のタスクが同時に作業をコミットしている可能性があるためです。したがって、O(ファイル数)
+ ディレクトリツリーのリスト作成コストとなります。再度述べますが、再帰的なツリーウォークで実行され、HDFSや(ここでは関係ありませんが)S3でより高速になるであろう、深いlistFiles(path, recursive=true)
APIは使用されません。
mapreduce.fileoutputcommitter.marksuccessfuljobs
がtrueの場合のみ、0バイトの_SUCCESS
ファイルが書き込まれます。
タスクT1について、タスク試行1(T1A1)がコミット前に失敗した場合、ドライバーは新しい試行「T1A2」をスケジュールし、それをコミットします。すべてうまくいきます。
しかし、T1A1がコミット許可を与えられ、コミットプロセス中に失敗した場合、その出力の一部が宛先ディレクトリに書き込まれている可能性があります。
もし試行T1A2がコミットするように指示された場合、その出力が正確なファイル名のセットを持っている場合にのみ、すでに名前が変更されたファイルが上書きされます。異なるファイル名が生成された場合、出力にはT1A1とT1A2のファイルが含まれることになります。
T1A1がコミットプロセス中にパーティション分割された場合、ジョブコミッターは別の試行をスケジュールし、その作業をコミットします。しかし、もしT1A1がまだファイルシステムに接続している場合、ファイルの改名を続けている可能性があります。たとえ同じファイル名が使用されていたとしても、2つのタスクの出力が混ざり合う可能性があります。
論文、ゼロ名前変更コミッター、Loughran et. al.は、これらのコミッターについて解説しています。
この論文では、コミット問題、正確さの定義、v1およびv2コミッターのアルゴリズム、S3Aコミッター、IBM Stocatorコミッター、そしてEMRのSparkコミッターについて私たちが知っていることも解説しています。
hadoop-aws
JARには、「ステージング」と「マジック」という2つのコミッターが含まれています。これらは両方とも、同じ問題の実装です。すなわち、作業を安全かつ迅速にS3オブジェクトストアにコミットすることです。
コミッターは、S3がファイルを作成するアトミックな方法(PUTリクエスト)を提供しているという事実を利用しています。
ファイルは存在するか、存在しないかのどちらかです。ファイルは宛先に直接アップロードでき、アップロードが完了したときにのみファイルがマニフェスト(既存のコピーを上書き)されます。
大きなファイルの場合、マルチパートアップロードにより、このアップロード操作を一連のPOSTリクエストに分割できます。
1. initiate-upload (パス -> アップロードID)
1. upload part(パス, アップロードID, data[]) -> チェックサム
。これは並列化できます。1つのオブジェクトに最大10,000個のパーツをアップロードできます。最後のパーツを除くすべては、>= 5MBである必要があります。 1. complete-upload (パス, アップロードID, List<チェックサム>)
これにより、チェックサムの順序によって定義されたブロックのシーケンスでパーツからファイルを構築し、ファイルをマニフェストします。
S3Aコミッターの秘訣は、ファイルはタスク試行の実行/コミット中にアップロードされますが、最後のPOSTリクエストはジョブコミットフェーズまで遅延できることです。タスク試行は、各ファイルの最終的な宛先を決定し、マルチパート操作の一部としてデータをアップロードし、アップロードを完了するために必要な情報をファイルに保存する必要があります。このファイルは後でジョブコミッターによって読み取られ、POSTリクエストで使用されます。
ステージングコミッターは、NetflixのRyan Blueによる貢献に基づいています。これは、.pendingset
ファイルを伝播するための整合性のあるストアとしてHDFSに依存しています。
各タスク試行の作業ディレクトリは、ローカルファイルシステムの「ステージングディレクトリ」にあります。アップロードを完了するために必要な情報は、クラスターHDFSファイルシステムと連携するv1 FileOutputCommitterを使用して、タスク試行からジョブコミッターに渡されます。これにより、コミッターがv1アルゴリズムと同じ正確さの保証を持つことが保証されます。
マジックコミッターは、純粋なS3Aであり、ファイルシステムクライアント自体内で変更を行うことができたという事実を利用しています。
「マジック」パスは、書き込み用に開かれたときに、最終的な宛先ディレクトリへのマルチパーティアップロードを開始するように定義されています。出力ストリームがclose()
されると、ゼロバイトのマーカーファイルがマジックパスに書き込まれ、アップロードを完了するために必要なすべての情報を含むJSONの.pendingファイルが保存されます。
タスクコミット:1. 各タスク試行のマジックディレクトリにあるすべての.pending
ファイルをリストします。 1. .pendingset
ファイルに集約します。 1.タスクIDとともにジョブ試行ディレクトリに保存します。
ジョブコミット
.pendingset
ファイルをリストします。マジックコミッターは、整合性のあるS3ストア(当初はS3Guardを使用)を絶対に必要とします。現在、S3は整合性が保たれているため、生のS3を使用できます。HDFSやrename()
を使用する他のファイルシステムは必要ありません。
S3Aコミッターは、次の理由で正しいと見なされています。
修正された重要な問題には、以下が含まれます。
pendingset
が既に存在すると失敗します。spark.sql.sources.writeJobUUID
で一意のジョブIDを取得する必要があります。スケール/パフォーマンス/UXではなく、正確性に影響を与えた問題のうち、HADOOP-17258は、TA1タスクコミットが完了した後(ただし、レポートには失敗していた)の障害からの回復に関係していました。SPARK-33402、SPARK-33230、およびHADOOP-17318はすべて関連しています。もし2つのSparkジョブ/ステージが同じ秒に開始された場合、同じジョブIDを持っていました。これにより、ステージングコミッターで使用されるHDFSディレクトリが混ざり合ってしまいました。
特筆すべきは、これらはすべて、最小限の統合テストスイートでは発見されなかった問題であるということです。
良いニュースは、これらの問題について私たちが知っており、再びそれらを複製することを避けることができるようになったことです。そして、何をテストするためのテストを書くかを知っています。
ABFSでは、V1コミッターのパフォーマンスが低い理由は、次のとおりです。
V2コミッターは、リストと名前変更プロセスをタスクコミットで実行するため、ジョブコミットがはるかに高速です。これは、非アトミックであるため、使用するのが危険であると考えられている理由です。V2タスクコミットアルゴリズムが示すのは、ファイルごとの名前変更のみを使用することで、異なるタスクの出力のコミットを並列化できるということです。
V1コミッターは、タスクコミット操作(ディレクトリの名前変更)でさえも非アトミックなO(ファイル数)
操作であるため、GCSでのパフォーマンスが低くなります。これはまた、安全でないことを意味します。
タスク試行がパーティション分割され、Sparkドライバーが別のTAをスケジュール/コミットした場合、タスクディレクトリには最初の試行のファイルが1つ以上含まれている可能性があります。
このコミッターでサポートされているストア/ファイルシステムは、以下を満たす必要があります。
O(1)
ファイルの名前変更操作が必要です。このコミッターでサポートされているストア/ファイルシステムは、以下を満たす必要があります。
EtagSource
インターフェースを実装します。これは、ABFSの名前変更回復、および最終出力のオプションの検証に使用されます。このコミッターでサポートされているストア/ファイルシステムは、以下の場合があります。
このコミッターでサポートされているストア/ファイルシステムは、以下を満たさない場合があります。
O(1)
ディレクトリ削除をサポートします。CleanupJobStage
はそうではないと仮定しているため、タスク試行ディレクトリを並行して削除します。create(Path, overwrite=false)
操作をサポートします。マニフェストは、タスク試行IDを含むパスに書き込んでから、最終的なパスに名前を変更することでコミットされます。listFiles(path, recursive=true)
呼び出しをサポートします。このAPI呼び出しは使用されません。FileOutputCommitter
と比較して、削除された要件は次のとおりです。
O(1)
のディレクトリ削除。HDFS はこれらの要件をすべて満たしているため、このコミッターから大きな恩恵を受けることはありませんが、それでも動作します。
S3 ストアは、一貫性が保たれるようになった現在でも、このコミッターのリネーム要件を満たしていません。このコミッターは S3 での使用は安全ではありません。
すべてのジョブは一意の ID を持つ必要があります。
実装では、これを保証するための関連パッチが Spark ランタイムに適用されていることを想定しています。
ジョブ ID は、_temporary/0/
のような従来のインクリメントする自然数スキームの代わりに、一時ディレクトリの名前付けに使用されます。このスキームは MapReduce に由来し、attempt ID > 1 のジョブ試行は先行ジョブによってコミットされたタスクを探し、その結果を組み込みます。
このコミッターは、リカバリーを試みない Spark をターゲットにしています。パスにジョブ ID を使用することにより、ジョブがジョブのクリーンアップ/アボート時に _temporary
をすべて削除しないように構成されている場合、複数のジョブが同じテーブルを宛先として使用して実行される可能性があります。
タスク ID とタスク試行 ID は、通常どおりジョブ ID から派生します。
書き込まれたファイルのファイル名は一意であることが期待されます。これは Spark で ORC ファイルと Parquet ファイルに対して行われ、デフォルトでは宛先ファイルのチェックを省略できます。
宛先ディレクトリが destDir: Path
である場合
ID が jobID: String
で試行回数が jobAttemptNumber:int
のジョブは、ディレクトリを使用します
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/
その作業のために使用します (注意: 実際には最後のサブディレクトリは %02d
でフォーマットされます)。
これは *ジョブ試行ディレクトリ* と呼ばれます。
ジョブ試行ディレクトリの下に、サブディレクトリ tasks
が作成されます。これは *タスク試行ディレクトリ* と呼ばれます。すべてのタスク試行には、独自のサブディレクトリがあり、その作業が保存されます。
ジョブ試行ディレクトリの下に、サブディレクトリ manifests
が作成されます。これは *y* と呼ばれます。
コミットされたすべてのタスクのマニフェストは、$taskId-manifest.json
というファイル名でこのディレクトリに保存されます。
フルパス
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json
は、コミットされたタスクによって作成されたすべてのファイルのマニフェストの最終的な場所です。これは *コミットされたタスクのマニフェストパス* と呼ばれます。
タスク試行は、一時ファイル名 $taskAttemptId-manifest.json.tmp
でマニフェストをこのディレクトリに保存します。
これは *タスク試行のマニフェストの一時パス* と呼ばれます。
ジョブとタスクの操作については、次のパスが定義されます。
let jobDirectory = "$destDir/_temporary/manifest_$jobID/" let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/" let manifestDirectory = jobAttemptDirectory + "manifests/" let taskAttemptDirectory = jobAttemptDirectory + "tasks/"
また、各タスク試行については、次のパスも定義されます。
let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId" let taskManifestPath = manifestDirectory + "$taskId-manifest.json" let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"
これは、(IOStatistics といくつかの診断情報に加えて) 次のものを含むように設計された JSON ファイルです。
mkdir(jobAttemptDirectory) mkdir(manifestDirectory) mkdir(taskAttemptDirectory)
mkdir(taskAttemptWorkingDirectory)
タスク試行は次のようにコミットされます。
この時点ではリネームは行われません。ファイルは、ジョブのコミットでリネームされるまで元の場所に残されます。
let (renames, directories) = scan(taskAttemptWorkingDirectory) let manifest = new Manifest(renames, directories) manifest.save(taskAttemptTemporaryManifestPath) rename(taskAttemptTemporaryManifestPath, taskManifestPath)
delete(taskAttemptWorkingDirectory)
ジョブのコミットは、次の構成になります。
_SUCCESS
ファイルを保存します (テスト用; アトミックな保存には書き込みとリネームを使用します)。ジョブのコミットフェーズは、多数のタスクとタスクごとの多数のファイルに対して並列化をサポートしています。具体的には、並列ストア IO 用のスレッドプールがあります。
let manifestPaths = list("$manifestDirectory/*-manifest.json") let manifests = manifestPaths.map(p -> loadManifest(p)) let directoriesToCreate = merge(manifests.directories) let filesToRename = concat(manifests.files) directoriesToCreate.map(p -> mkdirs(p)) filesToRename.map((src, dest, etag) -> rename(src, dest, etag)) if mapreduce.fileoutputcommitter.marksuccessfuljobs then success.save("$destDir/_SUCCESS")
実装メモ
デバッグと開発を容易にするために、サマリーは同じ *または異なる* ファイルシステムの場所に保存できます。中間マニフェストは、ターゲットファイルシステムの場所にリネームできます。
if summary.report.directory != "" then success.save("${summary.report.directory}/$jobID.json") if diagnostics.manifest.directory != null then rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")
サマリーレポートは、ジョブのコミットが何らかの理由で失敗した場合でも保存されます。
ジョブのクリーンアップは、名目上はジョブディレクトリの削除です。
delete(jobDirectory)
オブジェクトストアのスケーリングの問題に対処するために、これは、すべてのタスク試行作業ディレクトリの (並列化された) 削除が先行する必要があります。
let taskAttemptWorkingDirectories = list("taskAttemptDirectory") taskAttemptWorkingDirectories.map(p -> delete(p))