このドキュメントでは、S3A コミッターのアーキテクチャと実装の詳細について説明します。
コミッターの使用に関する情報は、S3A コミッターを参照してください。
S3が完全に整合性を持つようになったため、ディレクトリ一覧の不整合に関連する問題は解消されました。ただし、名前変更の問題は依然として存在します。ディレクトリの名前を変更して作業をコミットすることは、安全ではないだけでなく、非常に遅くなります。
このアーキテクチャドキュメントとコミッターは、S3が不整合だった時代に記述されました。2つのコミッターは、この問題に異なるアプローチで対処しました。
S3が整合性を持つようになったため、マジックコミッターは任意のS3バケットで安全に使用できます。したがって、どちらを使用するかは実験によって決定されます。
このアーキテクチャドキュメントは2017年に記述されたもので、S3Guardなどの追加の一貫性レイヤーを使用する場合にのみS3が整合性を持っていた時代のものでした。ドキュメントでは、当時存在していた要件/制約のうち、現在廃止されているものを示しています。
標準のコミットアルゴリズム(FileOutputCommitter
とそのv1およびv2アルゴリズム)は、ディレクトリの名前変更がO(1)
の原子操作であることに依存しています。呼び出し元は、作業を宛先ファイルシステムの一時ディレクトリに出力し、その後、これらのディレクトリを最終的な宛先に名前変更することで作業をコミットします。これは、一貫性のあるリスト操作があり、FileSystem.rename()
コマンドが原子的なO(1)
操作であるファイルシステムに対して作業をコミットするための完璧なソリューションです。
名前変更を使用すると、個々のタスクは一時ディレクトリで作業でき、名前変更という原子操作を使用して、タスクと最終的にはジョブ全体を明示的にコミットできます。名前変更のコストが低いので、タスクとジョブのコミット中に最小限の遅延で実行できます。HDFSは名前変更操作中にnamenodeメタデータをロックするため、すべてのrename()呼び出しはシリアル化されます。ただし、2つのディレクトリエントリのメタデータのみを更新するため、ロックの持続時間は短いです。
「実際の」ファイルシステムとは対照的に、AmazonのS3Aオブジェクトストアは、ほとんどの他のオブジェクトストアと同様に、rename()
をまったくサポートしていません。ファイル名のハッシュ操作によってデータの場所が決定されます。変更する個別のメタデータはありません。名前変更を模倣するために、Hadoop S3Aクライアントはデータを宛先ファイル名を持つ新しいオブジェクトにコピーし、元のエントリを削除する必要があります。このコピーはサーバー側で実行できますが、クラスタ内でのコピーが完了するまで完了しないため、データの量に比例した時間がかかります。
名前変更のオーバーヘッドは最も顕著な問題ですが、最も危険な問題ではありません。それは、2020年末まで、パスのリストに整合性保証がなく、ファイルの追加または削除に遅延があった可能性があるという事実です。ファイルがリストされていない場合、コミット操作はそれらをコピーせず、最終出力に表示されません。
この問題に対する解決策は、S3プロトコル自体と密接に関連しています。マルチパートPUT操作の完了の遅延です。
つまり、タスクはすべてのデータをマルチパートアップロードとして書き込みますが、最終的な単一のジョブコミットアクションまで最終的なコミットアクションを遅延させます。ジョブコミットアクションでコミットされたデータのみが表示されます。推測的実行と失敗したタスクからの作業はインスタンス化されません。名前変更がないため、データが一時ディレクトリから最終ディレクトリにコピーされる間に遅延はありません。コミットの持続時間は、構築するコミット操作を決定し、それらを実行するために必要な時間になります。
ジョブの出力が、より大きな操作シーケンスまたは他のアプリケーションの他のステージに表示されるのは、ジョブが正常に完了した場合のみです。
ジョブドライバ。ここで使用する用語がはっきりしません。タスクの実行をスケジュールし、成功/失敗を追跡し、すべての作業が処理された時点を判断し、出力をコミットするプロセスです。ジョブが失敗し、回復できないと判断することもあり、その場合はジョブが中止されます。MRとTezでは、これはYARNアプリケーションマスター内にあります。Sparkでは、ドライバであり、AM、YARNクライアント、または他の場所(例:Livy?)で実行できます。
最終ディレクトリ:ジョブの出力が配置され、表示されるディレクトリ。
タスク:単一のプロセスでのジョブ内の単一の操作で、1つ以上のファイルが生成されます。ジョブが正常に完了した後、データは最終ディレクトリに表示されなければなりません。タスクは、何らかの方法で失敗することなく(処理エラー、ネットワーク/プロセス障害)、期待どおりに出力をすべて生成した場合に正常に完了します。
ジョブコンテキスト:org.apache.hadoop.mapreduce.JobContext
クラスのインスタンスで、ジョブの読み取り専用ビューをジョブドライバとタスクに提供します。
タスク試行コンテキスト:org.apache.hadoop.mapreduce.TaskAttemptContext extends JobContext, Progressable
クラスのインスタンスで、ステータスの取得と設定、進捗状況、カウンタ値などのタスクの操作を提供します。
タスク作業ディレクトリ:コミットされていない作業を配置できる、単一のタスクによって排他的にアクセスできるディレクトリ。
タスクコミット:タスク作業ディレクトリにあるタスクの出力を取得し、最終ディレクトリに表示する行為。これは従来、FileSystem.rename()
呼び出しによって実装されていました。
タスク側コミット(タスクのプロセスでその作業後に実行される操作)とドライバ側タスクコミット(ジョブドライバがコミット操作を実行する)を区別することが役立ちます。タスク側コミット作業はクラスタ全体で実行され、ジョブ実行の重要な部分から実行される可能性があります。ただし、コミットプロトコルがすべてのタスクにジョブドライバからのシグナルを待機することを要求しない限り、タスク側コミットは最終ディレクトリに出力をインスタンス化できません。これらは、推測的実行と失敗に対処するために、成功したタスクの出力をジョブコミットの準備ができた状態に昇格するために使用できます。
ジョブコミット:ジョブの正常に完了したすべてのタスクを取得し、それらをコミットする行為。このプロセスは一般的に非原子的なものであり、ジョブの最後にシリアル化された操作であることが多いため、そのパフォーマンスはボトルネックになる可能性があります。
タスク中止:そのデータがコミットされないようにタスクをキャンセルすること。
ジョブ中止:ジョブ内のすべての作業をキャンセルすること。どのタスクの作業もコミットされません。
推測的タスク実行/「推測」:同じ入力データセットに対して複数のタスクを並列で実行し、最初に完了したタスクが成功したと見なされるもの。その出力はコミットされなければなりません。他のタスクは中止されなければなりません。タスクを並列で実行でき、ジョブがコミットされるまでタスクの出力が表示されないようにする必要があります。各タスクでの出力は同じであることが期待されますが、必ずしもそうとは限りません。重要なのは、推測的タスクのインスタンスがコミットされている場合、出力は有効と見なされなければならないことです。
ジョブドライバとタスクが通信できることが期待されています。タスクがタスクコミットフェーズ中にそれ自体で操作を実行する場合、ジョブドライバの指示がある場合にのみ実行する必要があります。同様に、タスクが最終的なステータスをジョブドライバに伝えることができない場合、その作業をコミットしてはなりません。これはS3を使用する場合に非常に重要です。一部のネットワークパーティションがジョブドライバからタスクを分離する可能性があるのに対し、タスクはS3へのアクセスを維持する可能性があるためです。
セットアップ:
ドライバ
:JobContext
が作成/設定されます。JobContext
を使用してコミッターインスタンスがインスタンス化され、setupJob()
が呼び出されます。FileOutputCommitter
標準的なコミットプロトコルは、`org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter` に実装されています。
2つのアルゴリズムがあります。「v1」はMapReduceアプリケーションマスターの障害と再開に対処するために設計されています。V2アルゴリズムは、ジョブ全体を再実行する以外に障害から回復できません。ただし、タスクコミット時にすべての作業を出力ディレクトリに伝播します。`rename()` をコピーと削除で模倣するオブジェクトストアを使用する場合、リスト、コピー、削除の回数が削減されるため、より効率的です。また、これらがタスクコミットで実行されるため、すべての作業の最後に発生する最終的な `O(data)` の停止時間がなくなります。依然として非常に非効率ですが、実行における大きな停止時間として非効率性が目立たなくなります。
備考
「v1」コミットアルゴリズムはHadoop 2.xのデフォルトのコミットアルゴリズムです。これは MAPREDUCE-2702 の一部として実装されました。
このアルゴリズムは、ジョブドライバの障害と再起動を処理するように設計されており、再起動されたジョブドライバは未完了のタスクのみを再実行します。完了したタスクの出力は、再起動されたジョブが完了したときにコミットのために復旧されます。
コストがあります。すべてのコミットされたタスクディレクトリ内のすべてのファイルを再帰的にリストし、名前を変更するコミット時間です。
これは順次実行されるため、コミット時間は `O(files)` であり、一般的には `O(tasks)` です。
#Job Attempt Path is `$dest/_temporary/$appAttemptId/` jobAttemptPath = '$dest/_temporary/$appAttemptId/' # Task Attempt Path is `$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID` taskAttemptPath = '$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID' #Task committed path is `$dest/_temporary/$appAttemptId/$taskAttemptID` taskCommittedPath = '$dest/_temporary/$appAttemptId/$taskAttemptID'
タスクはタスク試行パス内/下に書き込みます。
fs.mkdir(jobAttemptPath)
なし:ディレクトリは必要に応じて作成されます。
タスク試行パスをタスクコミットパスに名前変更します。
def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest): return fs.exists(taskAttemptPath) def commitTask(fs, jobAttemptPath, taskAttemptPath, dest): if fs.exists(taskAttemptPath) : fs.delete(taskCommittedPath, recursive=True) fs.rename(taskAttemptPath, taskCommittedPath)
実際のファイルシステムでは、これは `O(1)` のディレクトリの名前変更です。
模倣された名前変更を行うオブジェクトストアでは、コピーに `O(data)` 、すべてのファイルのリストと削除のオーバーヘッド(S3の場合、`(1 + files/500)` のリストと、同じ数の削除呼び出し)がかかります。
タスク試行パスを削除します。
def abortTask(fs, jobAttemptPath, taskAttemptPath, dest): fs.delete(taskAttemptPath, recursive=True)
実際のファイルシステムでは、これは `O(1)` の操作です。オブジェクトストアでは、ファイルのリストと削除にかかる時間に比例し、通常はバッチ処理されます。
すべてのタスクコミットパス内のすべてのファイル/ディレクトリを最終的な宛先パスにマージします。オプションで、宛先パスに0バイトの `_SUCCESS` ファイルを作成します。
def commitJob(fs, jobAttemptDir, dest): for committedTask in fs.listFiles(jobAttemptDir): mergePathsV1(fs, committedTask, dest) fs.touch("$dest/_SUCCESS")
(`mergePaths()` の詳細については以下を参照してください)
ジョブコミット中の障害は、クエリ全体を再実行する以外に回復できません。
def isCommitJobRepeatable() : return False
したがって、これはプロトコルにおける障害ポイントです。ファイル数が少なく、名前変更/リストアルゴリズムが高速な場合、脆弱性の期間は短くなります。大規模になると、脆弱性が増加します。コミットされたタスクの名前変更を並列実行することで、実際には削減できます。
ジョブ試行パス下のすべてのデータを削除します。
def abortJob(fs, jobAttemptDir, dest): fs.delete(jobAttemptDir, recursive = True)
def cleanupJob(fs, dest): fs.delete('$dest/_temporary', recursive = True)
すべてのコミッターについて、回復プロセスはアプリケーションマスターで行われます。1. 前回の試行のジョブ履歴ファイルがロードされ、成功したと記録されたタスクが判別されます。2. 成功した各タスクについて、ジョブコミッターは、前回の試行の詳細から構築された `TaskAttemptContext` を使用して、その `recoverTask()` メソッドが呼び出されます。3. メソッドが例外を発生させない場合、回復したと見なされ、再実行されません。4. その他のすべてのタスクは実行のためにキューに入れられます。
v1コミッターの場合、タスクの回復は簡単です。前回の試行からのコミットされたタスクのディレクトリは、現在のアプリケーション試行のディレクトリの下に移動されます。
def recoverTask(tac): oldAttemptId = appAttemptId - 1 fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}', '$dest/_temporary/appAttemptId/${tac.taskId}')
これにより、ジョブドライバ(ここではMR AM)の障害からの回復時間が大幅に向上します。失われた作業は、データが生成されたがまだコミットされていない進行中のタスクの作業のみです。
ジョブドライバの障害のみがジョブの再起動を必要とし、個々のタスクは必要ありません。したがって、これは並列実行されるタスクの数とは無関係に、クエリの期間によって発生する確率です。
タスクが長いほど、障害のリスクが高くなり、進行中の作業を回復する価値が高まります。
高速クエリは障害のリスクが低いだけでなく、ジョブ全体を再実行するだけで障害から回復できます。これは、進行中のジョブを回復しようとしないSparkにおける暗黙的な戦略です。クエリが高速であるほど、回復戦略はシンプルになります。
これは不可能です。ジョブコミット中の障害により、宛先ディレクトリをクリーンアップした後、ジョブ全体を再実行する必要があります。
`mergePaths()` はデータをマージするためのコアアルゴリズムです。実装では、`mergePaths()` と `renameOrMerge()` の2つの相互再帰ルーチンにわたって両方のアルゴリズムの戦略が混在しているため、やや分かりにくくなっています。
ここでは、2つのアルゴリズムが分割され、相互再帰メソッドの1つがインライン化されています。
def mergePathsV1(fs, src, dest) : if fs.exists(dest) : toStat = fs.getFileStatus(dest) else: toStat = None if src.isFile : if not toStat is None : fs.delete(dest, recursive = True) fs.rename(src.getPath, dest) else : # src is directory, choose action on dest type if not toStat is None : if not toStat.isDirectory : # Destination exists and is not a directory fs.delete(dest) fs.rename(src.getPath(), dest) else : # Destination exists and is a directory # merge all children under destination directory for child in fs.listStatus(src.getPath) : mergePathsV1(fs, child, dest + child.getName) else : # destination does not exist fs.rename(src.getPath(), dest)
v2アルゴリズムは、タスク出力を宛先ディレクトリに直接コミットします。これは、Hadoop 1.xコミットアルゴリズムの実装です。
以下に示すように、`mergePaths` を異なる方法で実装しています。
def mergePathsV2(fs, src, dest) : if fs.exists(dest) : toStat = fs.getFileStatus(dest) else: toStat = None if src.isFile : if not toStat is None : fs.delete(dest, recursive = True) fs.rename(src.getPath, dest) else : # destination is directory, choose action on source type if src.isDirectory : if not toStat is None : if not toStat.isDirectory : # Destination exists and is not a directory fs.delete(dest) fs.mkdirs(dest) # for child in fs.listStatus(src.getPath) : # HERE mergePathsV2(fs, child, dest + child.getName) # else : # Destination exists and is a directory # merge all children under destination directory for child in fs.listStatus(src.getPath) : mergePathsV2(fs, child, dest + child.getName) else : # destination does not exist fs.mkdirs(dest) # for child in fs.listStatus(src.getPath) : # HERE mergePathsV2(fs, child, dest + child.getName) #
どちらもソースディレクトリツリーを再帰的に下り、ファイルの名前変更によって単一ファイルをコミットします。
重要な違いは、v1アルゴリズムがディレクトリの名前変更によってソースディレクトリをコミットすることです。これは従来、`O(1)` 操作です。
対照的に、v2アルゴリズムはソースディレクトリのすべての直接の子をリストし、それらに対して `mergePath()` を再帰的に呼び出し、最終的に個々のファイルの名前を変更します。そのため、実行する名前変更の数はソースの *ファイル* 数に等しく、ソースの *ディレクトリ* 数ではありません。ディレクトリのリスト数は `O(depth(src))` であり、`depth(path)` は指定されたパス下のディレクトリの深さを返す関数です。
通常のファイルシステムでは、v2マージアルゴリズムはv1アルゴリズムよりも高価になる可能性があります。ただし、マージはタスクコミットでのみ行われるため、実行プロセス全体でのボトルネックになる可能性は低くなります。
オブジェクトストアでは、`rename()` が `O(1)` 操作であるという期待、および再帰的なツリーウォークがデータのツリーを列挙して操作する効率的な方法であるという期待から、最適ではありません。アルゴリズムを `FileSystem.listFiles(path, recursive)` を使用して、パス下のすべての子を列挙するための単一の呼び出しに変更した場合、少なくとも深く広いツリーでは、リスト操作が大幅に高速になります。ただし、現実的なデータセットでは、出力ファイルのサイズが遅延の主な原因となる可能性が高いです。つまり、`mergePathsV2` のコストが `O(depth(src)) + O(data))` である場合、一般的に `O(data)` の値は `depth(src)` よりも重要になります。
1つの重要な例外があります。少量のデータで動作するテストですが、現実的な出力ディレクトリ構造を生成しようとします。これらのテストでは、ディレクトリのリストと `getFileStatus()` の呼び出しのコストがコピー呼び出しのコストを超える可能性があります。これが、オブジェクトストアに対するコミットアルゴリズムの小規模なテストを非常に誤解を招くものと見なす必要がある理由です。
タスク試行パスをタスクコミットパスに名前変更します。
def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest): return fs.exists(taskAttemptPath) def commitTask(fs, jobAttemptPath, taskAttemptPath, dest): if fs.exists(taskAttemptPath) : mergePathsV2(fs, taskAttemptPath, dest)
タスク試行パスを削除します。
def abortTask(fs, jobAttemptPath, taskAttemptPath, dest): fs.delete(taskAttemptPath, recursive=True)
コスト:通常のファイルシステムでは `O(1)` 、オブジェクトストアでは `O(files)`。
すべてのタスク出力が既に完了しているため、必要なのは `_SUCCESS` マーカーにアクセスすることだけです。
def commitJob(fs, jobAttemptDir, dest): fs.touch("$dest/_SUCCESS")
コスト:`O(1)`
ジョブ中止中の障害は暗黙的に繰り返すことができます。
def isCommitJobRepeatable() : return True
ジョブ試行パス下のすべてのデータを削除します。
def abortJob(fs, jobAttemptDir, dest): fs.delete(jobAttemptDir, recursive=True)
コスト:通常のファイルシステムでは `O(1)` 、オブジェクトストアでは `O(files)`。
データは宛先ディレクトリに書き込まれないため、タスク試行ディレクトリを削除することでタスクをクリーンアップできます。
データは宛先ディレクトリに名前変更されているため、コミット済みとして記録されたすべてのタスクには、まったく回復は必要ありません。
def recoverTask(tac):
アクティブなタスクとキューに入れられたタスクはすべて実行のためにスケジュールされます。
`commitTask()` 中の障害と同じ弱点があります。そのコミット操作中に失敗したタスクを繰り返すのは、生成されたすべてのファイルの名前がすべてのタスク試行で一定の場合にのみ安全です。
タスク試行がコミットするように指示され、そのコミットが完了したと記録されていない間にジョブAMが失敗した場合、進行中のタスクの状態は不明です…実際には、この時点でジョブを回復することは安全ではありません。
これは簡単です。`commitJob()` が再呼び出されます。
MapReduceは、独自のコンテナで各MapperまたはReducerを実行します。これは独自のプロセスを取得します。暗黙的に、これは各タスクがすべてのファイルシステムの独自のインスタンスを取得することも意味します。
作業は `org.apache.hadoop.mapred.Task` で調整されます。これには、Map(`MapTask`)とReduce(`ReduceTask`)の特定のサブクラスがあります。クリーンアップタスク(役割はシンプル:クリーンアップ)と、`OutputCommittre.setupJob` を実行するジョブ設定タスクもあります。つまり、ジョブ設定フェーズもタスクで実行されます。
MapTaskはコミッターを使用して、Reducerの準備が整うように、すべてのMapperの出力をファイルシステムに書き込みます。各パーティションは、データを `MapFile` として書き込みます。これは実際にはディレクトリ内の2つの `SequenceFile` ファイルです。すべてのキーと値の出力の `data` ファイルと、ファイル内のいくつかのキーのインデックスを含む `index` です。
これらはすべてローカルファイルシステムに書き込まれます。
`ReduceTask` は、宛先ファイルシステムへの最終的な書き込みを行います。
Mapフェーズはコミッタを使用して中間作業をコミットするため、拡張メカニズムを通じてプロセスに提供されるプラグインコミッタは、必ずマッパーによって生成された出力で動作する必要があります。ステージングコミッタは、一意のファイル名が無効になっている場合にのみ機能しますが、それらとマジックコミッタは作業の最終出力にのみ使用されることを意図しているため、やや無意味です。重要なのは、Mapフェーズと最終的なReduceフェーズで異なるコミッタを使用できることです。異なるファイルシステムに対して個別のコミッタが定義されている場合、これは暗黙的です。新しいコミッタは最終的な出力先FSに対して定義でき、file://
はデフォルトのFileOutputCommitter
を保持できます。
Task.initialize()
:構成を読み込み、現在のジョブとタスクにバインドされたJobContextImpl
とTaskAttemptContextImpl
インスタンスをインスタンス化します。
作業が完了すると、Task.done()
が呼び出されます。これは基本的に次のコードパスです。
if (committer.needsTaskCommit(taskContext)) { // get permission to commit from AM int retries = MAX_RETRIES; while(true) { try { umbilical.commitPending(taskId, taskStatus); break; } catch(IOException ie) { if (--retries == 0) { // FAIL WITHOUT CALLING ABORT System.exit(67); } } } // commit the work try { committer.commitTask(taskContext); } catch (IOException iee) { // failure: abort try { committer.abortTask(taskContext); } catch (IOException ioe) { LOG.warn("Failure cleaning up: " + StringUtils.stringifyException(ioe)); } throw iee; } }
つまり、書き込むデータがある場合に限り、タスクはAMからそうするためのクリアランスを要求します。これにより、別のタスクが書き込んだ場合に投機的作業がコミットされないこと、そして、タスクがAMとの連絡を失った場合でも、投機的ではない作業がコミットされないことが保証されます。
つまり、ジョブのAMが失敗した場合、またはタスクから別のネットワークパーティションにある場合、タスクの出力はコミットされません。
許可が与えられると、コミット呼び出しが呼び出されます。これが機能しない場合、abortTask()
が呼び出されます。そこで発生したすべてのエラーはログに記録され、無視されます。
このメソッドは実際には、committer.abortTask()
が常に呼び出されることを保証しないという点で制限されているようです。umbilical.commitPending()
呼び出しが繰り返し失敗し、タスクプロセスが中止された場合、committer.abortTask()
は呼び出されません。これがネットワークパーティションによるものである場合、AMがcommitter.abortTask()
を呼び出し、AMの障害である場合は、再起動されたAMが以前の試行をクリーンアップできると期待できます。従来のFileOutputCommitterの場合、以前の試行のデータのリスト表示と削除は簡単です。ただし、マルチパートアップロードをコミットされていないデータをアップロードする手段として使用しているS3コミッタの場合、保留中のアップロードが常に中止されるようにすることが重要です。これは、次のように行うことができます。
Task.done()
内のすべてのタスク側エラーブランチがcommitter.abortTask()
を呼び出すようにします。AMはコミッタを使用してジョブを設定およびコミットします。AMの障害と回復をサポートするために、OutputCommitter.isRecoverySupported()
を使用して、成功したタスクのすべての出力が最終ジョブで使用できるか、またはジョブ全体をリセットして繰り返す必要があるかを宣言します。OutputCommitter.isCommitJobRepeatable()
は、もう1つの質問に対処します。コミッタはコミットプロセス自体の失敗から回復できますか。
コミッタはApplication Masterで作成され、org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler
のインスタンスに渡され、その後、ジョブレベルのライフサイクルを管理します。
MRv1 APIが使用されている場合、コミッタは"mapred.output.committer.class"
の値から選択されます。MRv2 APIでは、出力形式がインスタンス化され、タスクとタスク試行ID 0を使用してコミッタが要求されます。コミッタはCommitter.getOutputCommitter(taskContext)
への呼び出しを介して出力形式から取得されます。(ジョブ、0、0)タスクと試行IDを持つタスク試行コンテキストが再度使用されます。つまり、ジョブコミッタの場合でも、コミッタを要求する際に、常にタスクコンテキストが出力形式に渡されます。OutputCommitter.abortTask()
のすべての実装について、コンテナとタスクを実行しているホストではなく、AMから実行できることが重要です。さらに、中止に必要なすべての情報(パス、ファイルシステムインスタンスなど)は、コンストラクタに渡されたコンテキストから初期化されたフィールドに依存するのではなく、メソッドに渡されたTaskAttemptContext
から取得する必要があります。
OutputCommitter.setupJob()
これはorg.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.StartTransition
で開始されます。サービスの開始時に呼び出されるorg.apache.hadoop.mapreduce.v2.app.MRAppMaster.startJobs()
で非同期実行のためにキューに入れられます。したがって、ジョブはAMの開始時に設定されます。
OutputCommitter.commitJob()
"デフォルト"/クラスタファイルシステムでは、CommitterEventHandler
は、yarn.app.mapreduce.am.staging-dir
(デフォルト/tmp/hadoop-yarn/staging/$user/.staging
)で定義されたステージング領域のデータを使用し、ジョブIDから名前が付けられたサブディレクトリを使用します。
コミットプロセスの状態を追跡するためにファイルシステムを使用するための3つのパスが構築され、サポートされている場合はコミット操作を回復可能にすることを目標としています。
名前 | 役割 |
---|---|
COMMIT_STARTED |
ジョブコミットの開始をマークします |
COMMIT_SUCCESS |
ジョブコミットの成功した完了をマークします |
COMMIT_FAIL |
ジョブコミットの失敗をマークします |
これらのマーカーは、ジョブコミット中にジョブの再起動/失敗が発生した場合に、ジョブの再起動/失敗を管理するために使用されます。
AMが起動すると、これらのファイルを探してステージング領域を確認します。ジョブの以前の状態を判断する方法として。COMMIT_
マーカーファイルがない場合、ジョブはまだコミットを試みていないと見なされます。
COMMIT_SUCCESS
またはCOMMIT_FAIL
の存在は、前のジョブが成功または失敗して完了したという証拠として受け取られます。その後、AMはジョブの再実行を試みずに、成功/失敗のエラーコードで完了します。
COMMIT_STARTED
が存在するが、完了マーカーのいずれも存在しない場合、コミッタがそのジョブコミット操作を繰り返すことができると宣言している場合(Committer.isCommitJobRepeatable(jobContext) == true
)、ジョブの再コミットが試みられ、COMMIT_STARTED
が削除され、コミットプロセスが再び開始されます。
これらのCOMMIT_STARTED
ファイルは単なる0バイトファイルですが、ジョブコミットが繰り返せるものと見なされる場合にのみ、上書きビットがtrueに設定されて作成されます。
private void touchz(Path p, boolean overwrite) throws IOException { fs.create(p, overwrite).close(); }
つまり、クラスタファイルシステムでのcreate(path, overwrite=false)
の原子性が、1つのプロセスだけが特定のジョブのコミットを試行することを保証するために使用されます。
boolean commitJobIsRepeatable = committer.isCommitJobRepeatable( event.getJobContext()); try { touchz(startCommitFile, commitJobIsRepeatable); waitForValidCommitWindow(); committer.commitJob(event.getJobContext()); touchz(endCommitSuccessFile, commitJobIsRepeatable); } catch (Exception e) { touchz(endCommitFailureFile, commitJobIsRepeatable); }
waitForValidCommitWindow()
操作は重要です。これは、yarn.app.mapreduce.am.job.committer.commit-window
ミリ秒(デフォルト:10,000)以内に関与するYARNリソースマネージャとの通信がない限り、コミッタはコミットしてはならないことを宣言しています。これは、受信した次のハートビートを待つことによって行われます。ここに潜在的なバグがあります。間隔が小さすぎると、スレッドがウィンドウ内でコールバックを永久に待機してスピンする可能性があります。それを無視すると、このアルゴリズムは次のことを保証します。
overwrite=false
を使用してファイルを作成できる呼び出しは1つだけであるため、繰り返すことができないジョブをコミットしようとするプロセスの試行は1つだけです。
構成されたウィンドウ内でYARNと通信しているプロセスだけがジョブをコミットできます。
AMがネットワークの残りの部分からパーティション化されている場合、その時計がクラスタの残りの部分と同じ速度で単調増加していることを前提とすると、クラスタの残りの部分は、AMがYARN RMに正常にハートビートを送信してからyarn.app.mapreduce.am.ob.committer.commit-window
ミリ秒後、このジョブ試行の出力が決してコミットされないことを確信できます。これにより、1つのジョブを同時に実行できますが、それらの1つだけがコミットを試行することを保証するのに役立ちます。
YARNのハートビートは、COMMIT_STARTED
ファイルを正常に作成したAMにのみ送信されることを前提として、コミット操作を開始します。
このアルゴリズムに関する潜在的な問題
COMMIT_STARTED
ファイルは、ハートビートを取得するのを待つ前に作成されます。このAMがYARNとの連絡を失っている可能性がありますが、まだそれを知りません。YARNの生存可能性プロトコルが最終的にタイムアウトすると、AMは正しく終了しますが、この時点でCOMMIT_STARTED
ファイルが作成されているため、起動された他のAMはコミットできません。
2つのコミッタが繰り返すことができないコミットでCOMMIT_STARTED
ファイルの作成を試行した場合、1つは成功し、ハートビートを待ってから(おそらく遅い)コミットを試行します。2番目のコミッタは失敗し、すぐにCOMMIT_FAILED
ファイルを作成します。その結果、ステージング領域の状態はコミットが失敗したことを示唆していますが、実際には進行中で、2番目のプロセスだけが失敗したことを意味します。
1. COMMIT_STARTED
ファイルを作成する前にハートビートを待つこと。1.おそらく:COMMIT_STARTED
ファイルの作成中に失敗した場合、COMMIT_FAILED
ファイルを作成しないこと。つまり、COMMIT_STARTED
ファイルを正常に作成したプロセスだけが、コミットが失敗したことを示すことができます。これにより、より安全に対処できると思われます。
コミットを実行するスレッドが中断されます。CommitterEventHandler
はそれが終了するのを待ちます。(ミリ秒単位でyarn.app.mapreduce.am.job.committer.cancel-timeout
に設定されています)。
AMは、コンテナの失敗/損失を処理するときに、タスク試行コンテキストを使用してOutputCommitter.taskAbort()
を呼び出す場合があります。つまり、コンテナの失敗時に、AMのコミッタを使用してAMでタスクの中断操作が実行されます。これにより、新しいコンテナを作成する必要がなくなり、「ベストエフォート」タスクの中断はコンテナの失敗にも対応できるようになります。
AMとタスクコンテナ間のパーティションは、パーティション化されたコンテナ内のタスクがまだ実行されている間に、このAMによって実行されるタスクの中断が発生する可能性があることを意味します。中断操作の後に出力の書き込み操作が失敗しない限り、パーティション化されたタスクは、talkToAMTGetPermissionToCommit()
のTask.done()
での独自のタスクコミットシーケンスまで、パーティションに気付かない可能性があります。
S3Aコミッタの設計要件は次のとおりです。
中心的な問題は、オブジェクトストレージはファイルシステムではないということです。S3Aクライアントでrename()
がどのようにエミュレートされているかによって、既存のMRコミッターアルゴリズムの両方に大きなパフォーマンスの問題が生じています。
create()
およびrename()
でそのようなチェックを実行して、宛先パスの状態、つまり操作が許可されるかどうかを確認します。O(data)
であり、失敗した場合、ファイルシステムが不明な状態になる可能性があります。Hadoop S3Aファイルシステムクライアントは、データをアップロードするためのPUTとマルチパートPUTをサポートしており、構成パラメータfs.s3a.multipart.size
(デフォルト:100MB)で設定された閾値を超えると、HADOOP-13560のS3ABlockOutputStream
が書き込まれたデータをマルチパートPUTの一部としてアップロードします。
S3Guardは、共有DynamoDBテーブルをメタデータの信頼できるストアとして使用して、ファイルシステムの一貫したビューをすべてのプロセスに追加するオプションを追加しました。提案されたアルゴリズムは、DynamoDBテーブルを必要とせずに、そのようなオブジェクトストレージで動作するように設計されました。AWS S3が2020年に一貫性を持つようになったため、これらはストアと直接連携します。
DirectOutputCommitter
検討すべき実装の1つは、Spark 1.6のDirectOutputCommitter
です。
これは、ParquetOutputCommitter
をサブクラス化し、
作業ディレクトリを宛先ディレクトリとして使用すると、成功したコミット時にタスク出力を移動/リネームする必要がありません。ただし、欠陥があります。「コミット」または「アボート」という概念がなく、投機的実行や障害を処理する機能がないためです。これが、コミッターがSpark 2から削除された理由です。SPARK-10063
進行中のデータが表示されるという問題もあります。これは問題になる場合とそうでない場合があります。
IBMのStocatorは、V1/V2コミッターの非直接書き込みを宛先ディレクトリへの直接書き込みに変換できます。
どのようにこれを行うのでしょうか?これは、_temporary
パスへの書き込みを認識し、それらを基本ディレクトリへの書き込みに変換する特別なHadoopFileSystem
実装です。書き込み操作を変換するだけでなく、元のパスでgetFileStatus()
呼び出しをサポートし、最終的な宛先にあるファイルの詳細を返します。これにより、コミットするアプリケーションは、書き込まれたファイルの作成/存在/サイズを確認できます(後述のマジックコミッターとは対照的です)。
このFSはOpenstack Swiftをターゲットとしていますが、他のオブジェクトストレージも異なるバックエンドを介してサポート可能です。
このソリューションは、Spark Direct OutputCommitterと同じセマンティクス(および障害モード)を提供するようですが、SparkやHadoopコミッターの変更を必要としないという点で革新的です。対照的に、ここで提案されているコミッターは、プラグインの容易さのためにHadoop MRコミッターを変更することを組み合わせ、S3専用の新しいコミッターを提供します。これは、S3Aファイルシステムに強く依存し、緊密に統合されています。
Stocatorコミッターのシンプルさは評価に値します。
S3 REST APIでは、マルチパートアップロードにより、クライアントはファイルの「パート」を複数アップロードしてから、最終的な呼び出しでアップロードをコミットできます。
呼び出し元は、宛先バケット、キー、メタデータを含むマルチパートリクエストを開始します。
POST bucket.s3.aws.com/path?uploads
UploadId
が返されます。
呼び出し元は1つ以上の部分をアップロードします。
PUT bucket.s3.aws.com/path?partNumber=PartNumber&uploadId=UploadId
パート番号はPUTの順序を宣言するために使用されます。これらはパラレルに、そして順不同でアップロードできます。最終部分を除くすべてのパートは5MB以上である必要があります。すべてのアップロードは、返されるetagで完了します。
呼び出し元は操作を完了します。
POST /ObjectName?uploadId=UploadId <CompleteMultipartUpload> <Part><PartNumber>(number)<PartNumber><ETag>(Tag)</ETag></Part> ... </CompleteMultipartUpload>
この最終的な呼び出しは、アップロードされたすべてのパートのetagと、オブジェクト内のパートの実際の順序をリストします。
完了操作は明らかにO(1)
です。おそらくPUTリクエストはすでにデータをサーバーにアップロードしており、最終的なパスに対してデータを配信するサーバーになります。アップロードを完了するために必要なのは、サーバーのローカルファイルシステム内のファイルをリンクしてオブジェクトを作成し、オブジェクトストアのインデックステーブルのエントリを更新することだけです。
S3Aクライアントでは、シーケンス内のすべてのPUT呼び出しと最終的なコミットは、同じプロセスによって開始されます。これは必ずしもそうである必要はありません。アップロードの異なる部分を異なるプロセスが実行できるという事実は、このアルゴリズムを有効にします。
NetflixのRyan Blueは、いくつかの魅力的な機能を持つ代替コミッターを提出しました。
最後の点は過小評価できません。整合性レイヤーも必要ありません。* 全体的によりシンプルな設計です。特に、発生する可能性のあるさまざまな障害モードに対して回復力を持つ必要があることを考えると。
コミッターは、ローカルFSの一時ディレクトリにタスク出力を書き込みます。タスク出力は、getTaskAttemptPath
とgetWorkPath
によってローカルFSに向けられます。タスクコミット時に、コミッターはタスク試行ディレクトリ内のファイル(隠しファイルは無視)を列挙します。各ファイルは、マルチパートアップロードAPIを使用してS3にアップロードされます。
アップロードをコミットするために必要な情報はHDFSに保存され、そのプロトコルを介してコミットされます。ジョブがコミットされると、成功したタスクの保留中のアップロードはすべてコミットされます。
コアアルゴリズムは次のとおりです。
FileOutputFormat
とそのサブクラス)は、ローカルfile://
参照です。FileOutputCommitter
(アルゴリズム1)を使用して、これらのファイルのコミット/アボートを管理します。つまり、成功したタスクからのコミットするファイルのリストのみを(一時的な)ジョブコミットディレクトリにコピーします。FileOutputCommmitter
を使用してコミットするファイルのリストの伝播を管理することにより、既存のコミットアルゴリズムは、ジョブの最後にコミットされるファイルを暗黙的に定義するアルゴリズムになります。
Netflixの貢献には、S3のHadoopOutputCommitter
実装があります。
主なクラスは3つあります。* S3MultipartOutputCommitter
は、コミットロジックを処理する基本コミッタークラスです。これは直接使用しないでください。* S3DirectoryOutputCommitter
は、競合解決を使用してS3に非パーティションデータを書き込みます。* S3PartitionedOutputCommitter
は、競合解決を使用してS3にパーティションデータを書き込みます。
呼び出し元は、単一ディレクトリ出力にはS3DirectoryOutputCommitter
を、パーティションデータにはS3PartitionedOutputCommitter
を使用する必要があります。
これらのS3コミッターは、ローカルFSの一時ディレクトリにタスク出力を書き込むことによって機能します。タスク出力は、getTaskAttemptPath
とgetWorkPath
によってローカルFSに向けられます。
単一ディレクトリとパーティションコミッターは、データをアップロードする前に、S3にターゲットパスが存在するかどうかを確認することで競合解決を処理します。fs.s3a.committer.staging.conflict-mode
を設定することで制御される3つの競合解決モードがあります。
fail
:出力ディレクトリまたはパーティションが既に存在する場合は、タスクを失敗させます。(デフォルト)append
:ディレクトリまたはパーティションが既に存在するかどうかを確認せずに、データファイルをアップロードします。replace
:出力ディレクトリが存在する場合は、削除して新しいデータで現在のコンテンツを置き換えます。パーティションコミッターは、ジョブの実行前ではなく、出力データとの競合が検出された場合に競合モードを適用します。競合解決は出力モードとは異なるため、競合がない場合はモードを適用しません。たとえば、パーティションを上書きする場合は、新しい出力が作成されるかどうかに関係なく、含まれるすべてのサブパーティションとデータを削除する必要があります。競合解決は、出力データを持つパーティションのみを置き換えます。
競合モードがreplace
の場合、競合するディレクトリはジョブコミット中に削除されます。データは、すべてのタスクが正常に完了した場合にのみ削除されます。
書き込みを識別するUUIDが、S3にアップロードされるファイル名に追加されます。これにより、ジョブコミット中に失敗したジョブからのデータのロールバックが可能になり(下記の失敗例を参照)、データを追加で既存のディレクトリに追加する場合のファイルレベルの競合が回避されます。
注記 存在のチェックは、宛先パスのS3AFileSystem.getFileStatus()
リクエストを介して行われます。S3ストアのビューが一貫性がない限り、新しく削除されたオブジェクトがプローブでまだ検出される可能性があり、実際の競合がなくてもコミットが失敗する可能性があります。
これまでの提案(以降「マジック」コミッター)と比較して、このコミッター(「ステージング」コミッター)は、各タスクの最後に各ファイルをアップロードするという追加のオーバーヘッドを追加します。これはO(data)
操作です。並列化できますが、コンピュートノードからS3への帯域幅と、S3の宛先シャードの書き込み/IOP容量によって制限されます。多くのタスクがほぼ同時に完了する場合、帯域幅のピーク負荷によりアップロードが遅くなる可能性があります。
コミット時間は同じになり、Netflixコミッターは既にここで並列化ロジックを実装しているため、O(files/threads)
の時間になります。
タスクとジョブのコミットには、既に多くの障害処理コードが存在します。
コミットのいずれかの障害は、タスク/ジョブのコミットアクションのベストエフォートによる中止/ロールバックをトリガーします。
タスクコミットは、FileOutputCommitter
に委譲して、1 つのタスクの出力だけがジョブコミットに到達するようにします。
同様に、タスクが中止された場合、ローカルFSの一時出力は削除されます。
コミッターの実行中にタスクが停止した場合、データがローカルFSに残ったり、S3に未完成の部分として残ったりする可能性があります。S3の未完成のアップロード部分はテーブルリーダーには表示されず、ターゲットバケットのライフサイクルポリシーのルールに従ってクリーンアップされます。
ジョブコミット中の障害は、既に完了しているファイルの削除と残りのアップロードの中止によって処理されます。アップロードは個別に完了するため、削除されたファイルはリーダーに表示されていました。
ジョブコミッターの実行中にプロセスが停止した場合、2 つの可能性のある障害が発生します。
ジョブコミット中にプロセスが停止した場合、クリーンアップは手動プロセスです。ファイル名には書き込みごとにUUIDが含まれているため、ファイルの識別と削除が可能です。
タスク実行中の障害
すべてのデータはローカルの一時ファイルに書き込まれます。これらをクリーンアップする必要があります。
ジョブは、ローカル(保留中)データをパージする必要があります。
タスクコミット中の障害
アップロードプロセス中のプロセス障害により、保留中のマルチパートPUTのリストがクラスタファイルシステムに永続化されません。この期間はタスク実行全体よりも短くなりますが、特に大規模なアップロードでは、依然として無視できない可能性があります。
ファイルごとの永続化、またはアップロードリストの増分上書きによって、ここでの問題は軽減される可能性がありますが、それでも保留中のマルチパートアップロードが記録されないという小さなリスクが残ります。
タスクコミット前の明示的なタスク中止
タスクはすべてのローカルデータを削除します。アップロードは開始されません。
データアップロード中のS3との通信失敗
アップロードに失敗した場合、タスクは
明示的なジョブ中止
実行中のすべてのタスクが中止され、クリーンアップされます。完了したすべてのタスクの保留中のコミットデータを読み込み、保留中のマルチパートPUTリクエストを中止できます。
これは、以下によって行われます。
アクション#2のため、アクション#1は冗長です。これは、アクション#2を構成可能なオプションにするオプションを残しておくため、(同時に実行される>1つのパーティション化されたコミットの使用例を処理するために必要になります)に残されています。
ジョブコミット前のジョブドライバ障害
ローカルデータはv1コミットアルゴリズムで管理されているため、ジョブの2回目の試行では、最初の試行の保留中のコミットデータすべてが復旧されます。これらのタスクは再実行されません。
これにより、ジョブの中止時に、個々のタスクの.pendingsetファイルを読み取り、それらのアップロードの中止を開始するために使用できることも保証されます。つまり、復旧されたジョブは、前のジョブの保留中の書き込みをクリーンアップできます。
クエリエンジンが複数のジョブ試行をサポートしていない場合、保留中のコミットデータは復旧されません。明示的な中止操作を開始する必要があります(これにはCLIコマンドを追加します)、または保留中のリクエストを自動的に削除するようにS3バケットを構成する必要があります。
ジョブコミット中のジョブドライバ障害
失敗したジョブコミットによって既に実行されたアップロードは永続化されます。実行されていないアップロードは保留されたままになります。
コミッターは現在、回復不可能であると宣言していますが、実際にはそうではない可能性があります。回復プロセスは次のいずれかになる可能性があります。
ここでの主な問題は、ジョブコミット時間競合解決プロセスの進捗状況です。プリコミットフェーズが完了したことを知っている場合にのみ、宛先ディレクトリのすべてのファイルが保持されることを確信できるため、それが完了したかどうかを判断する方法です。これは、「プリコミットが完了するまでアップロードはコミットされない」というルールに基づいて暗黙的に決定できます。1つ以上のアップロードが完了したことが判明した場合、プリコミットが完了したと推測できるため、ジョブを繰り返すことができます。
これは危険な領域です。現時点では、コミッターは回復不可能であると宣言しています。
タスクコミット前のアプリケーション全体の障害
データは、一時ディレクトリのローカルシステムに残ります。これはクリーンアップされない可能性があります。
1つ以上のタスクコミット後、ジョブコミット前のアプリケーション全体の障害
>1つのタスク障害後のジョブ完了/中止
問題:宛先ディレクトリはどうなりますか?上書きするかしないか?マージポリシーによって異なる可能性があります。
PUTコミットのクリーンアップと、コミットされていない書き込みのスケジュールされたGCのために、以前の不完全な操作からのものであるという前提で、ジョブセットアップが宛先ディレクトリへの保留中のすべてのコミットをリストしてキャンセルすることを検討する必要があります。
パス下の保留中のリクエストを調査、リスト、中止する「コミット」コマンドをS3guard CLIに追加する必要があります。例:--has-pending <path>
、--list-pending <path>
、--abort-pending <path>
。
このコミッターの開発は、Netflixがコミッターを寄付する前に開始されました。
S3AFileSystem
とS3ABlockOutputStream
を変更することにより、このコミッターは、特別な(「マジック」)ディレクトリに書き込まれたすべてのファイルの書き込みの完了を延期します。書き込みの最終的な宛先は、最終的なジョブの宛先に変更されます。ジョブがコミットされると、保留中の書き込みがインスタンス化されます。
Netflix Stagingコミッターの追加により、実際のコミッターコードは、永続的なメタデータと作業の並列コミットのための共有ルーチン(Netflixの経験に基づくすべてのエラー処理を含む)で共通の形式を共有するようになりました。
S3に直接データをストリーミングする(ステージングはありません)という点と、保留中のコミットのリストもS3に保存するという点で異なります。一貫性のあるS3ストアが必要です。
このアルゴリズムは、変更されたS3ABlockOutputStream
出力ストリームを使用します。これは、最終的なclose()
操作でアクティブなマルチパートアップロードをコミットする代わりに、独立したプロセスがアップロードを完了または中止できるように、S3リポジトリに十分な情報を保存します。
元々は、OutputStream.close()
で、単一のPUTを実行するか、進行中のマルチパート書き込みを完了するかを選択していました。
マルチパートPUTが進行中の場合、ストリームは進行中のアップロードが完了するまで待ちます(送信された最終ブロックを含む)。その後、最終的なマルチパートコミット操作を作成してPUTします。パーツのリスト(とその順序)は、opt中に作成されています。
対照的に、遅延コミットファイルに書き込む場合
小さな書き込みでも、マルチパート書き込みを必ず開始する必要があります。この書き込みは、ストリームの作成中に開始される可能性があります。
close()
呼び出しで書き込みをコミットする代わりに、操作をコミットするために必要なすべての情報を含むS3リポジトリ内のパスにPUTを実行します。つまり、最終パス、マルチパートアップロードID、およびアップロードされたパーツの順序付きETagリストです。
ファイルが「特殊」であることを認識することは問題です。通常のcreate(Path, Boolean)
呼び出しは、作成されているファイルが遅延コミットファイルであることを認識する必要があります。そのため、特別な新しいストリームを返します。
これは、「マジック」一時ディレクトリ名__magic
を使用して行われ、このパス下に作成されたすべてのファイルは、ストリーム書き込みプロセス中に完了されないことを示します。パス下に作成されたディレクトリは引き続き作成されます。これにより、個々のジョブとタスクの試行ごとに、ジョブ固有およびタスク固有のディレクトリを作成できます。
たとえば、パターン__magic/${jobID}/${taskId}
を使用して、その特定のタスクの最終ディレクトリへの保留中のコミットを保存できます。そのタスクがコミットされると、そのパスに保存されている保留中のすべてのコミットファイルが読み込まれ、最終的なアップロードのコミットに使用されます。
最終ディレクトリが/results/latest
であるジョブを考えてみましょう。
ジョブjob_400_1
のタスク01試行01の中間ディレクトリは次のようになります。
/results/latest/__magic/job_400_1/_task_01_01
これは一時ディレクトリとして返されます。
クライアントがファイル/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo
の作成を試行すると、S3A FSは/results/latest/latest.orc.lzo
の最終宛先でマルチパートリクエストを開始します。
データが出力ストリームに書き込まれると、個々のマルチパートPUT操作として増分的にアップロードされます。
close()
時に、サマリーデータがファイル/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending
に書き込まれます。これには、アップロードIDとアップロードされたデータのすべての部分とETagが含まれます。
マーカーファイルも作成されるため、新しく作成されたファイルが存在することを確認するコードが失敗しません。1.これらのマーカーファイルの長さはゼロバイトです。1.HTTPヘッダーx-hadoop-s3a-magic-data-length
で最終ファイルの全長を宣言します。1.getXAttr("header.x-hadoop-s3a-magic-data-length")
を呼び出すと、アップロードされたデータのバイト数が含まれる文字列が返されます。
これは、Sparkの書き込み追跡コードが作成されたデータ量を報告できるようにするために必要です。
タスクをコミットするために必要な情報は、タスク試行からジョブ試行に移動されます。
.pending
ファイルをリストします。Pendingset
構造にマージされます。.pendingset
ファイルとして保存されます。/results/latest/__magic/job400_1/task_01_01.pendingset
になります。保留中の単一アップロードファイルのいずれかの読み込みに失敗した場合(つまり、ファイルを読み込めなかったか、無効と見なされた場合)、タスクは失敗したと見なされます。正常に読み込まれた保留中のコミットはすべて中止され、次に障害が報告されます。
同様に、.pendingset
ファイルの保存に失敗すると、保留中のすべてのアップロードの中止がトリガーされます。
ジョブコミッターは、ジョブ試行ディレクトリ内のすべての.pendingset
ファイルをロードします。
これらのファイルのいずれかの読み込みに失敗した場合、ジョブの失敗と見なされます。読み込むことができた保留中のpendingsetはすべて中止されます。
全てのpendingsetがロードされた場合、ジョブ内の保留中のコミットは全てコミットされます。これらのコミットのいずれか1つが失敗した場合、成功したコミットは全て、宛先ファイルを削除することでロールバックされます。
タスクがサブディレクトリにデータを生成できるようにするために、最終パスに関する追加の手掛かりを提供する特別なファイル名__base
が使用されます。出力パス/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending
を最終的な宛先パスにマッピングする場合、パスは/results/latest/2017/2017-01-01.orc.lzo
になります。つまり、__magic
と__base
の間にある全てのディレクトリ(両端を含む)は無視されます。
問題点
Q. タスク試行ディレクトリに.pending
以外のファイルが存在する場合、どうなりますか?
A. これは、「マジックパス」機能が有効になっていないS3Aクライアントでマジックコミッタが使用されている場合にのみ発生します。これは、ジョブとタスクのコミッタ初期化時にチェックされます。
ジョブ/タスクコミットプロトコルは、ジョブドライバが指示するまでタスクが作業をコミットすることによってこれを処理することが期待されています。ネットワークパーティションは、タスクコミッタによる作業のキャンセルをトリガーするはずです(これはコミッタの上位のプロトコルです)。
ジョブは再開されます。完了すると、自身でコミットしていない宛先ディレクトリへの未処理のリクエストを全て削除します。
タスクは再開されます。タスクの保留中の作業はコミットされません。ジョブドライバがクリーンアップを行うと、ディレクトリ下の保留中の書き込みがキャンセルされます。
これは、状態を不確定にします。
保留中のアップロードは残りますが、変更は表示されません。
.pendingset
ファイルがジョブ試行ディレクトリに保存されている場合、タスクは事実上コミットされていますが、コントローラへのレポートに失敗しています。これは、ジョブコミット中に、同じファイルをコミットする2つのタスクPendingSetが存在する場合、またはファイルをコミットする際に問題を引き起こします。
提案:pendingsetにタスクIDを記録し、ロード時に重複を認識し、一方のセットをキャンセルしてもう一方をコミットすることで対応する(または失敗する?)。
宛先は不明な状態になります。
コミッタ、あるいはこれらのコミッタを使用するアプリケーションにおいて、アボートプロセスの失敗は適切に処理されていません。アボート操作が失敗した場合、何ができますか?
プロトコルによるタスクのアボート(例:投機的ジョブのアボート)のユースケースではやや仮説的ですが、タスク/ジョブのアボート呼び出しは、コミット失敗時の例外処理ロジックの一部として行われる可能性があります。そのため、呼び出し元はアボートが失敗しないと想定する可能性があります。失敗した場合、新しくスローされた例外によって元の問題が隠される可能性があります。
2つの選択肢があります。
abort()
内の失敗をキャッチ、ログに記録し、無視する。呼び出し側のコードを修正することが最善の戦略のように思われます。これにより、コミッタに隠されるのではなく、コミットプロトコルで失敗を明示的に処理できます。::OpenFile
プリエンプションは、クラスタスケジューラの指示による作業の明示的な終了です。これは失敗ですが、特別な失敗です。プリエンプションされたタスクは、限定された数のトラッカーのみを許可するコードでは失敗としてカウントしてはならず、ジョブドライバはタスクが正常に終了したと仮定できます。
ジョブドライバ自体もプリエンプションされる可能性があります。
1つの失敗ケースは、実行フレームワーク全体が失敗した場合です。新しいプロセスは、保留中の作業のある未処理のジョブを特定し、それらをアボートしてから、適切な__magic
ディレクトリを削除する必要があります。
これは、ディレクトリツリーをスキャンして__magic
ディレクトリを探し、その下をスキャンするか、listMultipartUploads()
呼び出しを使用してパスの下のマルチパートアップロードをリストし、キャンセルすることによって実行できます。最も効率的な解決策は、listMultipartUploads
を使用して未処理のリクエストを全て特定し、それを使用してキャンセルするリクエストと__magic
ディレクトリをスキャンする場所を特定することです。この戦略は、数百万のオブジェクトを持つリポジトリを扱う際のスケーラビリティの問題に対処するはずです—/__magic/**/*.pending
を名前のなかに持つキーを全てリストするのではなく、アクティブなアップロードからデータのあるディレクトリにさかのぼって作業します。
S3 CLIに完全なツリーのスキャンと保留中のアイテムのパージを行うクリーンアップ操作を追加することも検討する必要があります。見つかったものに関する統計情報を提供します。これにより、コストを抑え、クリーンアップに関連する問題を特定できます。
アップロード時間は、今日のブロックアップロード(s3a.fast.upload=true
)の出力ストリームのものです。書き込みを通して継続し、close()
操作では、保留中のデータをアップロードし、保留中のアップロードが全て完了するのを待つ遅延があります。最終的な完了リクエストのオーバーヘッドはありません。まだデータがアップロードされていない場合、close()
時間は、マルチパートリクエストの開始と最終的なputの時間になります。これは、ストリーム作成時に常にマルチパートIDを要求することで簡素化できるかもしれません。
各タスクをコミットする時間は`O(files)`です。タスク試行ディレクトリ内およびその下の全ての.pending
ファイルがリストされ、その内容が読み取られ、集約された.pendingset
ファイルがジョブ試行ディレクトリにPUTされます。その後、.pending
ファイルは削除されます。
ジョブをコミットする時間は`O(files/threads)`になります。
ジョブ試行ディレクトリの全ての.pendingset
ファイルを読み込む必要があり、ファイルにリストされている不完全なアップロードごとにPUTリクエストを発行する必要があります。
[廃止] 全ての子要素の一括リスト作成で完全な整合性が要求されることに注意してください。代わりに、コミットするファイルのリストを、Sparkコミットプロトコルが許可するように、タスクからジョブコミッタに返すことができれば、不整合なオブジェクトストアにデータをコミットできます。
不完全なマルチパートアップロードにおけるコミットされていないデータは、S3バケットのストレージコストで課金されます。コストを抑えるために、失敗したジョブからの未処理のデータを削除する必要があります。これは、S3バケットのライフサイクルポリシー、または作成する必要があるコマンドツールを通じて行うことができます。
ファイルは存在しなくなるため、close()
呼び出し後には表示されません。保留中のコミットファイルを認識していたコードは失敗します。
タスクとジョブの失敗により、未処理のマルチパートアップロードが残ります。これらはガベージコレクションする必要があります。S3は現在自動クリーンアップをサポートしています。S3Aは起動時に実行するオプションがあり、hadoop s3
コマンドで呼び出し元が明示的に実行できるようにする予定です。タスクが書き込みの開始時に書き込みのアップロードIDを明示的に書き込む場合、ジョブコミッタによるクリーンアップが可能になります。
非常に小さなファイルを書き込む時間は、PUTとCOPYよりも長くなる可能性があります。これは本番環境では関連がないため、この問題は無視しています。小さなファイル操作を最適化しようとする試みは、開発、保守、テストを複雑にするだけです。
一時情報を含むファイルは、実際のデータと間違われる可能性があります。
問題の原因の診断が難しくなる可能性があります。特に、一時ディレクトリのディレクトリ構造のデバッグレベルのリスト作成によって、多くのログが役立ちます。
保留中のPUTリクエストを確実にリストするには、リストの一貫性が必要です。S3エンドポイントの一貫性を確実に特定する方法がない場合、データの損失を伴う最終的に一貫性のあるストアを依然として使用する場合があります。
同時に同じ宛先ディレクトリに書き込むジョブが複数ある場合、出力が混乱する可能性があります。これは、現在のコミットアルゴリズムでは今日でも当てはまります。
同じS3Aクライアント/タスク内で、シーケンシャルまたは並列のいずれかで、同じ宛先ファイルに書き込むクライアントを複数作成できます。
[廃止] 一貫性のあるメタデータストアであっても、ジョブが既存のファイルを上書きする場合、更新がデータの全てのレプリカに伝播するまで、古いデータはデータを読み取るクライアントに表示される可能性があります。
操作がディレクトリのコンテンツを完全に上書きしようとしている場合、機能しません。既存のデータはクリーンアップされません。クリーンアップ操作をジョブコミットに含め、上書きされていない宛先ディレクトリ内の全てのファイルを削除する必要があります。
保留中のコミットデータの保存以外には使用できない__magic
などのパス要素が必要です。
全てのFS操作にコードを追加しない限り、__magic
ツリー下のファイルを操作することは依然として可能です。それは悪いことではありませんが、混乱する可能性があります。
書き込まれたデータはコミットされるまでマテリアライズされないため、どのプロセスも、作成したばかりのファイルを読み取ったり、操作したりすることはできません。
S3ABlockOutputStream
への変更S3ABlockOutputStream
をコピーアンドペーストする必要を避けるために、アップロードの即時/遅延完了を管理するPutTracker
クラスを使用して構築されるように変更されました。適切なポイントで呼び出されます。
close()
に入力しました。全てのブロックが完了しました—保留中のマルチパートをコミットする必要があるかどうかを示すマーカーを返します。abort()
呼び出しでのマルチパートアボート(おそらく:コアロジックを他の場所に移動する)。基本実装は、close()
呼び出しでMPUを実行する必要があることを宣言すること以外は何もしません。
S3Aコミッタバージョンは、1. 初期化中に常に開始する 1. close()
操作で、後でコミットするために必要なデータを全て保存します。
遍在するFileOutputFormat
とそのサブクラスをサポートするために、S3Aコミッタは、出力コミッタがFileOutputCommitter
であることを明示的に期待するクラスによって、何らかの方法で有効なコミッタとして受け入れられる必要があります。
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException{ PathOutputCommitter committer = (PathOutputCommitter) getOutputCommitter(context); return new Path(committer.getWorkPath(), getUniqueFile(context, getOutputName(context), extension)); }
検討、調査、そして放棄されたいくつかのオプションを以下に示します。
FileOutputCommitter
インスタンスを作成するためのファクトリメカニズムをさらに追加する。S3A出力のためにこれをサブクラス化し、返す。FileOutputCommitter
の複雑さと、より動的な構築をサポートすることの複雑さにより、実装と保守の観点から危険です。
実際にはコミッタの構成されたクラス名を読み込み、インスタンス化し、コンテキスト情報を渡してコミット操作を中継する新しいコミットアルゴリズム「3」を追加します。この新しいコミッタインターフェースには、メソッドと属性のためのメソッドが追加されます。これは実行可能ですが、高メンテナンスになる可能性のある方法で既存のコミッタコードを変更します。
FileOutputFormat
クラスが、getWorkPath()
メソッドを実装した任意のタスク/ジョブコンテキストコミッタを受け入れるようにします—それがFileOutputCommitter
から必要な唯一の特定の機能です。
オプション3、FileOutputFormat
により汎用的なコミッタをサポートさせることが、現在の設計です。これは、FileOutputFormat
がFileOutputCommitter
から使用する唯一の特定のメソッドがgetWorkPath()
であるという事実を基にしています。
これは、FileOutputCommitter
とS3ACommitter
が実装できる新しい抽象クラスPathOutputCommitter
に引き上げることができます。
public abstract class PathOutputCommitter extends OutputCommitter { /** * Get the directory that the task should write results into. * @return the work directory */ public abstract Path getWorkPath() throws IOException; }
FileOutputFormat
に必要な唯一の変更は、コンテキストコミッタをキャストするものを変更することです。
PathOutputCommitter committer = (PathOutputCommitter) getOutputCommitter(context);
getWorkPath()
がFileOutputFormat
が使用する唯一のメソッドのままの場合、これらの変更により、コードベースへの変更を最小限に抑えて、S3AコミッタでFileOutputCommitter
を置き換えることができます。
更新:これにはコストがあります。MRv1 APIのサポートが失われます。
org.apache.hadoop.mapred.FileOutputFormat
によるMRv1サポートFileOutputCommitter
をサブクラス化しないことのデメリットは、MRv1 API呼び出しのプロトコルをFileOutputCommitter
にラップして中継するために使用されるコードが機能しなくなることです。新しいコミッターは検出されません。
これはSparkで確認できます。Sparkでは、V1 APIはRDD
クラス(RDD.saveAsHadoopFile()
)からエクスポートされています。後継コードであるPairRDDFunctions.saveAsNewAPIHadoopFile()
は機能します。*オブジェクトストレージで高性能なコミットを実現するには、MRv2コミットプロトコルを使用する必要があります。つまり、V2クラスを使用する必要があるということです。
マジックコミッター:ディレクトリ名
この設計では、ディレクトリ名に__magic
を提案しています。HDFSおよび様々なスキャンルーチンは、常に_
で始まるファイルおよびディレクトリを一時データ/除外データとして扱います。
マジックコミッター:サブディレクトリ
タスク作業ディレクトリ内にサブディレクトリを作成することは有効であり、そのディレクトリツリーを保持したまま、それらのサブディレクトリは宛先ディレクトリに移動されます。
つまり、タスク作業ディレクトリがdest/__magic/app1/task1/
の場合、dest/__magic/app1/task1/part-0000/
の下にあるすべてのファイルはdest/part-0000/
パス下に配置される必要があります。
この動作は、MRジョブにおける中間マップデータの書き込みに依存しています。
これは、__magic
の下のすべての要素を単純に削除するだけでは不十分であり、ベースパスを特定することが重要であることを意味します。
提案:コミットのベース要素のマーカーとして、特別な名前__base
を使用します。タスク試行の下では、__base
ディレクトリが作成され、作業ディレクトリになります。このパス下に作成されたすべてのファイルは、ベースディレクトリに対する相対パスを使用して宛先にコミットされます。
より正式には、パスにおける最後の親要素が__base
である場合、その下に作成された相対パスのベースが設定されます。
コミッターは、S3互換オブジェクトストレージに対してのみテストできます。
コミッターにはいくつかの単体テストと、org.apache.hadoop.mapreduce.lib.output.TestFileOutputCommitter
から持ち上げたプロトコル統合テストに基づいた統合テストがあり、コミットメカニズムの様々な状態遷移をテストするために、ステージングコミッターのバリアントをサポートするように拡張されています。
AbstractITCommitMRJob
という抽象的な統合テストがあり、MiniYARNクラスタとMiniHDFSクラスタを結合し、関連するコミッターを使用して単純なMRジョブを送信します。これにより、コミッターが実際に機能するかどうかが検証されます。「プロトコルに従っているように見える」だけではありません。
このテスト中に追加された機能の1つは、保存された_SUCCESS
マーカーファイルが0バイトファイルではなく、org.apache.hadoop.fs.s3a.commit.files.SuccessData
に実装されているJSONマニフェストファイルであるということです。このファイルには、使用されたコミッター、コミットを実行したホスト名、タイムスタンプデータ、およびコミットされたパスのリストが含まれています。
SuccessData{ committer='PartitionedStagingCommitter', hostname='devbox.local', description='Task committer attempt_1493832493956_0001_m_000000_0', date='Wed May 03 18:28:41 BST 2017', filenames=[/test/testMRJob/part-m-00000, /test/testMRJob/part-m-00002, /test/testMRJob/part-m-00001] }
これは、フォークされたプロセスで正しいコミッターが実際に呼び出されたことを検証する手段として役立ちました。0バイトの_SUCCESS
マーカーは、従来のFileOutputCommitter
が使用されたことを意味していました。読み取ることができれば、コミット操作に関する詳細情報が提供され、テストスイートのアサーションで使用されます。
その後、メトリクスやその他の値を収集するように拡張され、Spark統合テストでも同様に役立っています。
Sparkはコミットプロトコルorg.apache.spark.internal.io.FileCommitProtocol
を定義しており、HadoopMapReduceCommitProtocol
、基となるHadoopコミッタークラスの設定可能な宣言をサポートするサブクラスSQLHadoopMapReduceCommitProtocol
、およびStructured Streaming用のManifestFileCommitProtocol
に実装しています。後者は「複雑さ」として最もよく定義されますが、これをサポートしないと、S3をストリームチェックポイントの信頼できる宛先として使用できません。
Sparkコミットプロトコルの1つの側面は、Hadoopファイルコミッターとともに、コミット操作のターゲットとして絶対パスを要求するAPI、newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String
が存在することです。各タスクの一時ファイルと絶対ファイルのマッピングは、タスクがローカルコミット操作を実行した後(実行者からのコミット許可の要求を含む)、返されるTaskCommitMessage
でSparkドライバに渡されます。これらのテンポラリパスは、最終的な絶対パスに名前変更され、FileCommitProtocol.commitJob()
で名前変更されます。これは現在、他のすべての作業の最後にシリアル化された名前変更シーケンスです。この絶対パスの使用は、ディレクトリ名がパーティション名(年、月など)によって決定される宛先ディレクトリツリーにデータを作成する場合に使用されます。
その機能をサポートするのは困難です。パーティションツリー内の各ディレクトリに、保留中のPUT操作を文書化する独自のステージングディレクトリを許可するか、(より良い方法として)ベースパスからステージングディレクトリツリーを作成し、保留中のすべてのコミットを対応するディレクトリツリーで追跡します。
あるいは、Sparkタスクが完了時にジョブコミッターにデータを提供するという事実を考えると、保留中のPUTコマンドのリストを作成し、コミット操作をFileCommitProtocol
のS3A固有の実装で実行することができます。
[廃止] 前述のように、これにより、一貫性のあるリスト操作の必要性を回避できる可能性があります。失敗したタスクでの作業の中断を支援するために、書き込まれたもののリストをリストすることが依然として重要ですが、成功したタスクによって作成されたファイルのリストをタスクからコミッターに直接渡すことで、潜在的に矛盾するリストを回避できます。
Sparkのorg.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Parquet出力形式は、org.apache.parquet.hadoop.ParquetOutputCommitter
のサブクラスを必要とし、そのオプションは構成キーspark.sql.parquet.output.committer.class
でクラス名によって定義されます。その後、spark.sql.sources.outputCommitterClass
の値にパッチが適用され、SQLHadoopMapReduceCommitProtocol
によって取得され、作業のコミッターとしてインスタンス化されます。
これは、おそらくユーザーがオプション"parquet.enable.summary-metadata"
を設定することでメタデータサマリーファイルの要求を可能にするために行われます。サマリーファイルの作成には、ジョブコミット時に宛先ディレクトリのすべてのファイルをスキャンする必要があるため、非常にコストが高く、S3を使用する場合は推奨しません。
S3Aコミッターを使用するには、Parquetコミッターとしても識別する必要があります。インスタンスが動的にインスタンス化されるという事実により、プロセスがやや複雑になります。
初期テストでは、パス出力コミッターのファクトリを設定する以外に、Sparkコードや構成を変更せずに、ORC出力のコミッターを切り替えることができます。Parquetサポートの場合、特定のコミッターのクラス名(つまり、ファクトリではない)を宣言するだけで十分な可能性があります。
これは残念なことです。宛先ファイルシステムの種類またはバケットごとの構成に基づいてコミッタープロトコルを動的に選択することが複雑になるためです。
初期プロトタイプで実装されているソリューションは、2つの要素で構成されています。
PathOutputCommitProtocol extends HadoopMapReduceCommitProtocol
クラスは、常にPathOutputCommitterFactory
メカニズムを使用してコミッターを作成します。これにより、出力形式独自のコミッターが出力ファクトリメカニズムに置き換えられます。
org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter
クラスは、直接インスタンス化可能な出力コミッターであり、ファクトリメカニズムを介してコミッターを作成し、すべての操作をそのコミッターに委任します。これにより、コミッタークラスを受け取る構成オプションでコミッターを宣言できますが、その下でファクトリメカニズムを使用できます。
任意の出力コミッターをParquetFileOutput
に使用できるSpark 2.3 [SPARK-21762]へのパッチを追加します。
全体的に、実装、ドキュメント化、および使用がやや複雑です。ユーザーは、3つのspark.hadoopオプションに加えて、2つのSpark SQLオプションを宣言する必要があります。
spark.sql.sources.commitProtocolClass=com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol spark.sql.parquet.output.committer.class=org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory spark.hadoop.fs.s3a.committer.name=magic spark.hadoop.fs.s3a.committer.magic.enabled=true
既存のFileOutputFormat
に新しいアルゴリズム「3」を追加することで、実際にはこれを簡素化できます。これにより、FileOutputFormat
自体がファクトリを使用してコミッターを作成するように指示し、ファクトリが自動的に取得されます。この状況でのループの回避は「困難」です。ファクトリを介してインスタンス化された場合、ファイルコミッターはファクトリ自体を使用しようとしないでください。
カバーする必要がある明白なセキュリティリスク、およびコードレビューでチェックする必要があるものは何ですか?
ステージングされたデータは、ローカルFS、fs.s3a.buffer.dir
でリストされているディレクトリにあり、hadoop.tmp.dir
のディレクトリにフォールバックします。これらは、S3へのPUT/POSTの前に、出力ストリームに書き込まれるデータのブロックをバッファリングするために既に使用されているディレクトリと同じです。したがって、以前よりもリスクが高くなります。テンポラリディレクトリのセキュリティ側面を明確に文書化して、これを確実にします。
タスクによって書き込まれたすべてのファイルは、タスクコミットまでアップロードされないため、より多くのローカルストレージが必要です。1つのシステムで作業を実行する悪意のあるユーザーは、テンポラリディスク領域を潜在的に使い果たす可能性があります。軽減策:ローカルFSでのストレージクォータ、ルートとは異なるマウントされたFSへのテンポラリディレクトリの保持。
中間.pendingset
ファイルは、fs.s3a.committer.staging.tmp.path
のディレクトリの下のHDFSに保存されます。デフォルトは/tmp
です。このデータはワークフローを公開する可能性があり(宛先パスと生成されたデータの量が含まれています)、削除するとジョブが中断されます。悪意のあるコードが、たとえば、順序付けられたetagリストの順序を変更することでファイルを編集した場合、生成されたデータは順序外にコミットされ、無効なファイルが作成されます。これは(通常は一時的な)クラスタFSであるため、クラスタ内の任意のユーザーがこれを行う可能性があります。
悪意のあるクラスタユーザーがワークフローと出力を損傷するのを防ぐには、テンポラリディレクトリのロックダウンが不可欠です。
ジョブがメモリ内でブロックをバッファリングするように構成されていない限り、fs.s3a.buffer.dir
で定義されたディレクトリは、アップロード前にブロックをバッファリングするために使用されます。これは以前と同じです。増分リスクはありません。ブロックはアップロード後にファイルシステムから削除されるため、必要なストレージ量は、データ生成帯域幅とデータアップロード帯域幅によって決まります。
クラスタファイルシステムは使用されません。そこにリスクはありません。
__magic
ディレクトリへの書き込みアクセス権を持つ悪意のあるユーザーは、保留中のアップロードのメタデータを操作または削除したり、新しい作業をコミットに挿入したりする可能性があります。__magic
ディレクトリへのアクセス権を持つことは、親宛先ディレクトリへの書き込みアクセス権を意味します。悪意のあるユーザーは、コミッターの中間ファイルを攻撃する必要なく、最終出力を簡単に操作できます。
内部メタデータサマリーファイル(.pending
および .pendingset
)は悪意のある者によって改ざんされる可能性があり、信頼できるアカウントによって解析または解釈された場合、信頼できないコードを実行したり、失敗したりする可能性があります。これらのファイルの形式はすべてJSONで、Jacksonを使用して解析されます。無効なJSONは構文解析エラーとなり、ジョブを失敗させると想定しています。ジョブの中断は、読み込みまたは解析できないファイルを無視して保留中のファイルをロードしようとする最善の努力を引き起こし、ロードされたファイルによって識別された保留中のアップロードをすべて中止します。
解析されたデータセット内の文字列は、いずれのインタープリター(例:SQLクエリ、シェルコマンドなど)にも渡されません。そのため、(SQL)インジェクション攻撃は不可能です。
一部のデータは、プロセス実行中に生成されるログファイルに出力される可能性があります。例として、コミットID、宛先キーなどがあります。これらはすべてプレーンテキストとして扱われ、ログのブラウザホストビューではそのように表示される必要があります。
一部のデータは、ロードされたクラスのtoString()
値で返されます。これはログに記録されたり、IDEで観察されたりする可能性もあります。
シリアル化されたデータの内容は、Web UIには表示されません。ここで懸念される脆弱性は、将来的に何らかのダウンストリームアプリケーションでこれが発生した場合です。JavaScriptをテキストフィールドに注入し、それがXSS攻撃で実行される可能性があります。ロード時にこのデータをサニタイズすることを検討する必要があるかもしれません。
改ざんされたデータ内のパスは、既存のファイルにアップロードをコミットしようとしたり、MPU IDを変更して異なるアップロードを早期にコミットしようとしたりするために変更される可能性があります。これらの試みは成功しません。アップロードの宛先パスは、MPUを開始する最初のPOSTで宣言されており、MPUに関連付けられた操作もパスを宣言する必要があるためです。パスとIDが一致しない場合、操作は失敗します。有効な(パス、MPU)タプルが改ざんされたファイルに挿入された場合、コミットまたは中止によってアップロードが早期に完了または中止される可能性があります。悪意のある当事者は、この情報を取得するためにターゲットのS3ストアに十分なアクセス権を持っている必要があるため、このような操作を実行するためにJSONファイルを改ざんする必要はほとんどありません。
パス下の保留中のアップロードを一覧表示および削除するためのCLIツールも提供します。
AWSドキュメントMPUの権限。特別な権限s3:ListBucketMultipartUploads
、s3:ListMultipartUploadParts
、s3:AbortMultipartUpload
があります。デフォルトでは、バケット所有者はこれらの権限をすべて持ち、MPUイニシエーターには、アップロードの一覧表示、パーツの一覧表示、および独自のアップロードの中止の権限が付与されます。バケット所有者は、いずれかのユーザーに権限を付与/拒否できます(これにより、ユーザーはMPUを開始および完了することはできますが、削除および中止することはできない場合があります)。
fs.s3a.buffer.dir
を、読み取りおよび書き込みアクセスがアクティブユーザーに制限された/tmp
の下の場所に設定します。fs.s3a.committer.staging.tmp.path
は、アクティブな各ユーザーに隔離する必要があります。提案:デフォルトを修飾されていないパスtmp/staging
にし、現在のユーザーを基準にして絶対パスにします。ユーザーのホームディレクトリ下のアクセスが制限されているファイルシステムでは、この最終的な絶対パスは、信頼できないアカウントからは表示されません。おそらく:テキスト文字列の有効な文字を定義し、検証のための正規表現(例:[a-zA-Z0-9 \.\,\(\) \-\+]+
)を定義し、ロード時と保存時にフリーテキストJSONフィールドを検証します。