AzureおよびGoogle Cloud Storage用Manifest Committer

このドキュメントでは、Manifest Committerの使用方法について説明します。

Manifestコミッタは、実世界のクエリでABFSのパフォーマンスを提供し、GCSのパフォーマンスと正確性を提供するワーク用のコミッタです。HDFSを含む他のファイルシステムでも機能しますが、リスト操作が遅く高価なオブジェクトストレージ用に最適化された設計です。

コミッタのアーキテクチャと実装については、Manifest Committerアーキテクチャを参照してください。

プロトコルとその正確性については、Manifest Committerプロトコルを参照してください。

2022年3月に追加され、初期リリースでは不安定とみなされる可能性があります。

問題

SparkからAzure ADLS Gen 2「abfs://」ストレージへのワークの唯一安全に使用できるコミッタは、「v1ファイルコミッタ」です。

これは、タスク試行が失敗した場合、その出力は最終出力に含まれないことが保証されているという意味で「正しい」ものです。「v2」コミットアルゴリズムは、その保証を満たすことができないため、デフォルトではなくなりました。

しかし、出力の深いディレクトリツリーが使用されるジョブでは特に遅いです。なぜ遅いのでしょうか?特に、FileOutputCommitterにインストルメンテーションがないため、特定の原因を指摘するのは困難です。実行中のジョブのスタックトレースには一般的にrename()が表示されますが、リスト操作も表示されます。

Google GCSでは、v1アルゴリズムとv2アルゴリズムのどちらも安全ではありません。Googleファイルシステムには、v1アルゴリズムに必要なアトミックディレクトリのリネームがないためです。

さらに、AzureとGCSストレージの両方で、多くの子孫を持つディレクトリを削除するときにスケール問題が発生する可能性があります。FileOutputCommitterはジョブの後処理がdelete("_temporary", true)への高速呼び出しであると想定しているため、タイムアウトが発生する可能性があります。

解決策。

中間マニフェストコミッタは、ABFSの実世界のクエリでパフォーマンスを提供し、GCSのパフォーマンスと正確性を提供する新しいワーク用のコミッタです。

このコミッタは、S3Aコミッタ用に導入された拡張ポイントを使用します。ユーザーは、abfs://gcs:// URLに対して新しいコミッタファクトリを宣言できます。適切に構成されたSparkデプロイメントは、新しいコミッタを検出します。

ジョブクリーンアップにおけるディレクトリのパフォーマンスの問題は、2つのオプションによって解決できます。1. コミッタは、_temporaryディレクトリを削除する前に、タスク試行ディレクトリの削除を並列化します。1. クリーンアップを無効にできます。

このコミッタは、「リアル」なfile rename()操作を持つファイルシステムクライアントで使用できます。リストとファイルプローブが高価なリモートオブジェクトストレージ用に最適化されています。設計上、HDFSではそれほど大きな速度向上は期待できませんが、並列リネーム操作により、従来のv1アルゴリズムと比較してジョブが高速化されます。

動作方法

詳細はManifest Committerアーキテクチャを参照してください。

コミッタの使用

S3Aコミッタをサポートするために導入されたフックは、すべてのファイルシステムスキーマが独自のコミッタを提供できるように設計されています。S3Aコミッタへの切り替えを参照してください。

abfsスキーマのファクトリはmapreduce.outputcommitter.factory.scheme.abfsで定義され、gcsについても同様です。

特にParquetバインディングについては、対応するSpark設定の変更が必要になります。mapred-default.xml JARで定義されていない場合は、core-site.xmlで行うことができます。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>
<property>
  <name>mapreduce.outputcommitter.factory.scheme.gs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

Sparkでのmanifestコミッタへのバインド。

Apache Sparkでは、コマンドラインオプション(「–conf」の後)またはspark-defaults.confファイルを使用して設定を行うことができます。以下は、内部的にファクトリメカニズムを使用するParquetコミッタのサブクラスを含むParquetの設定例を示したspark-defaults.confの使用例です。

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

Cloudstore committerinfoコマンドを使用してコミッタバインドを調査する。

Hadoopコミッタの設定は、cloudstoreの最近のビルドとそのcommitterinfoコマンドで検証できます。このコマンドは、MRおよびSparkジョブで使用されるものと同じファクトリメカニズムを使用してそのパスに対するコミッタをインスタンス化し、そのtoString値を出力します。

hadoop jar cloudstore-1.0.jar committerinfo abfs://testing@ukwest.dfs.core.windows.net/

2021-09-16 19:42:59,731 [main] INFO  commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer
Committer factory for path abfs://testing@ukwest.dfs.core.windows.net/ is
 org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7
  (classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory)
2021-09-16 19:43:00,897 [main] INFO  manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with
   JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://testing@ukwest.dfs.core.windows.net/
Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter:
 ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://testing@ukwest.dfs.core.windows.net/,
   role='task committer',
   taskAttemptDir=abfs://testing@ukwest.dfs.core.windows.net/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1,
   createJobMarker=true,
   jobUniqueId='job__0000',
   jobUniqueIdSource='JobID',
   jobAttemptNumber=0,
   jobAttemptId='job__0000_0',
   taskId='task__0000_r_000000',
   taskAttemptId='attempt__0000_r_000000_1'},
   iostatistics=counters=();

gauges=();

minimums=();

maximums=();

means=();
}

コミッタが使用されたことを検証する

新しいコミッタは、統計情報を含む操作のサマリーをJSON形式で_SUCCESSファイルに書き込みます。

このファイルが存在し、サイズがゼロバイトの場合、従来のFileOutputCommitterが使用されました。

このファイルが存在し、サイズがゼロバイトより大きい場合、manifestコミッタが使用されたか、S3Aファイルシステムの場合、S3Aコミッタのいずれかが使用されました。すべて同じJSON形式を使用します。

設定オプション

以下は、コミッタの主な設定オプションです。

オプション 意味 デフォルト値
mapreduce.manifest.committer.delete.target.files ターゲットファイルを削除しますか? false
mapreduce.manifest.committer.io.threads 並列操作のスレッド数 64
mapreduce.manifest.committer.summary.report.directory レポートを保存するディレクトリ。 ""
mapreduce.manifest.committer.cleanup.parallel.delete 一時ディレクトリを並列で削除する true
mapreduce.fileoutputcommitter.cleanup.skipped _temporaryディレクトリのクリーンアップをスキップする false
mapreduce.fileoutputcommitter.cleanup-failures.ignored クリーンアップ中のエラーを無視する false
mapreduce.fileoutputcommitter.marksuccessfuljobs 処理が正常に完了したら、_SUCCESS マーカーファイルを作成します。(ジョブセットアップ時に既存のファイルがあれば削除します) true

その他にもいくつかあります。詳細は「詳細」セクション[#advanced]を参照してください。

ジョブのスケーリング mapreduce.manifest.committer.io.threads

このコミッターが従来のFileOutputCommitterよりも高速である主な理由は、ジョブコミット中に可能な限り多くのファイルI/Oを並列化しようとする点にあります。具体的には、

  • タスクマニフェストの読み込み
  • ディレクトリが作成されるファイルの削除
  • ディレクトリの作成
  • ファイルごとの名前変更
  • ジョブクリーンアップにおけるタスク試行ディレクトリの削除

これらの操作はすべて、mapreduce.manifest.committer.io.threads オプションでサイズを設定する同じスレッドプールで実行されます。

より大きな値を使用できます。

Hadoop XML 設定

<property>
  <name>mapreduce.manifest.committer.io.threads</name>
  <value>32</value>
</property>

spark-defaults.conf

spark.hadoop.mapreduce.manifest.committer.io.threads 32

MapReduce AMまたはSpark Driverに割り当てられたコア数よりも大きな値を使用しても、CPUが直接過負荷になることはありません。スレッドは通常、オブジェクトストア/ファイルシステムに対する(遅い)I/Oの完了を待機しているためです。

ジョブコミットにおけるマニフェストの読み込みはメモリを大量に消費する可能性があります。スレッド数が多いほど、同時に読み込まれるマニフェスト数が増えます。

注意点 * Sparkでは、複数のジョブが同じプロセスでコミットされる可能性があり、それぞれがジョブコミットまたはクリーンアップ中に独自のサーレッドプールを作成します。* ストアのI/Oリクエストが多すぎると、Azureのレート制限がトリガーされる可能性があります。レート制限オプションmapreduce.manifest.committer.io.rateを使用すると、これを回避できます。

mapreduce.manifest.committer.writer.queue.capacity

これは2番目のスケールオプションです。ターゲットファイルシステムから読み込まれたマニフェスト、ワーカースレッドプールから読み込まれたマニフェスト、および各マニフェストのエントリをローカルファイルシステムの中間ファイルに保存する単一のスレッドから名前変更するファイルのリストを格納するためのキューのサイズを制御します。

キューがいっぱいになると、すべてのマニフェスト読み込みスレッドがブロックされます。

<property>
  <name>mapreduce.manifest.committer.writer.queue.capacity</name>
  <value>32</value>
</property>

ローカルファイルシステムへの書き込みは、通常、クラウドストレージよりもはるかに高速であるため、このキューサイズはマニフェスト読み込みパフォーマンスの制限にはなりません。

ジョブコミット中のマニフェスト読み込み時に消費されるメモリの量を制限するのに役立ちます。読み込まれたマニフェストの最大数は

mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads

オプション:ジョブコミットでのターゲットファイルの削除

従来のFileOutputCommitterは、ジョブのファイルを適切な場所に名前変更する前に、宛先パスにあるファイルを削除します。

マニフェストコミッターではオプションであり、オプションmapreduce.manifest.committer.delete.target.filesで設定され、デフォルト値はfalseです。

これによりパフォーマンスが向上し、ジョブによって作成されたすべてのファイルに一意のファイル名が付けられている場合に安全に使用できます。

Apache Sparkは、SPARK-8406 *出力ファイル名にUUIDを追加して、誤って上書きされるのを防ぐ*以降、ORCとParquetに対して一意のファイル名を作成します。

ターゲットファイルのチェック/削除を回避することで、コミットされるファイルごとに1回の削除呼び出しが節約されるため、ストレージI/Oを大幅に節約できます。

既存のテーブルに追加する場合、ORCおよびparquet以外の形式を使用する場合、一意の識別子が各ファイル名に追加されていることを確信していない限り、ターゲットファイルの削除を有効にします。

spark.hadoop.mapreduce.manifest.committer.delete.target.files true

注記1:コミッターは、ファイルを名前変更するディレクトリを作成した場合、削除操作をスキップします。これにより、少なくともデータを追加するジョブが新しいパーティションを作成して書き込んでいる場合、効率がわずかに向上します。

注記2:コミッターは、単一のジョブ内のタスクが一意のファイルを作成する必要があるという前提条件を依然として必要としています。これは、任意のジョブが正しいデータを作成するための基礎となります。

Spark動的パーティションの上書き

Sparkには「動的パーティション上書き」という機能があります。

これはSQLで開始できます。

INSERT OVERWRITE TABLE ...

または、モードがoverwriteで、パーティションが既存のテーブルのパーティションと一致するDataSet書き込みを介して開始できます。

sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// followed by an overwrite of a Dataset into an existing partitioned table.
eventData2
  .write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(existingDir)

この機能はSparkに実装されており、1. ジョブに新しいデータを一時ディレクトリに書き込むように指示します 1. ジョブコミットが完了した後、出力をスキャンして、データが書き込まれたリーフディレクトリ「パーティション」を特定します。 1. 宛先テーブルのそれらのディレクトリのコンテンツを削除します 1. 新しいファイルをパーティションに名前変更します。

これらはすべてSparkで行われ、中間出力ツリーのスキャン、パーティションの削除、新しいファイルの名前変更というタスクを引き継ぎます。

この機能により、ジョブは宛先テーブルの完全に外部にデータを書き込むこともできます。これは、1. 作業ディレクトリに新しいファイルを作成することによって行われ、1. Sparkがジョブコミット時にそれらを最終的な宛先に移動します。

マニフェストコミッターは、AzureおよびGoogle Cloud Storageでの動的パーティション上書きと互換性があり、拡張機能のコア要件を満たしています。1. getWorkPath()で返される作業ディレクトリは、最終出力と同じファイルシステム上にあります。2. rename()は、ジョブをコミットする際に安全かつ高速に使用できるO(1)操作です。

S3Aコミッターのどれもこれに対応していません。条件(1)はステージングコミッターでは満たされず、(2)はS3自体では満たされません。

動的パーティション上書きでマニフェストコミッターを使用するには、SparkバージョンにSPARK-40034 *動的パーティション上書きで動作するPathOutputCommitters*が含まれている必要があります。

多くのファイルの名前が変更される場合、操作の名前変更フェーズは遅くなります(これは順次実行されます)。並列名前変更はこれを高速化しますが、*マニフェストコミッターがリスクを最小限に抑え、リカバリをサポートするように設計されているabfs過負荷の問題をトリガーする可能性があります*。

コミット操作のSpark側は、一時出力ディレクトリをリスト/ツリーウォーキング(多少のオーバーヘッド)した後、従来のファイルシステムrename()呼び出しを使用して実行されるファイル昇格を行います。ここでは明示的なレート制限はありません。

これは何を意味しますか?

これは、_動的パーティショニングは、数千ものファイルが作成されるSQLクエリ/Spark DataSet操作に対してAzure Storageで使用しないでください_ことを意味します。これらがスロットリングスケール問題が発生する前にパフォーマンスの問題に悩まされるという事実は、警告とみなすべきです。

_SUCCESSファイルのジョブサマリー

元のHadoopコミッターは、無効にされていない限り、出力ディレクトリのルートに0バイトの_SUCCESSファイルを作成します。

このコミッターは、次の情報を含むJSONサマリーを書き込みます。* コミッターの名前。* 診断情報。* 作成されたファイルの一部(テスト用。完全なリストは除外されます。大きくなる可能性があるため)。* I/O統計。

クエリを実行した後、この_SUCCESSファイルの長さが0バイトの場合、_新しいコミッターは使用されていません_。

空でない場合は、検査できます。

ManifestPrinterツールを使用した_SUCCESSファイルの表示。

サマリーファイルはJSONであり、任意のテキストエディタで表示できます。

統計のより良い表示を含む、より簡潔なサマリーについては、ManifestPrinterツールを使用してください。

hadoop org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter <path>

これは、出力ディレクトリのベースに保存されたファイルと、レポートディレクトリに保存されたレポートの両方で機能します。

ジョブサマリーの収集 mapreduce.manifest.committer.summary.report.directory

オプションmapreduce.manifest.committer.summary.report.directoryにファイルシステムパスを設定することにより、コミッターはジョブの成功または失敗に関係なく、_SUCCESSサマリーファイルをレポートディレクトリに保存するように設定できます。

パスは、作業の宛先と同じストア/ファイルシステム上にある必要はありません。たとえば、ローカルファイルシステムを使用できます。

XML

<property>
  <name>mapreduce.manifest.committer.summary.report.directory</name>
  <value>file:///tmp/reports</value>
</property>

spark-defaults.conf

spark.hadoop.mapreduce.manifest.committer.summary.report.directory file:///tmp/reports

これにより、_SUCCESSマーカーの保存が有効になっているかどうかに関係なく、ジョブの統計を収集でき、クエリチェーンによるマーカーの上書きによって発生する問題を回避できます。

クリーンアップ

ジョブクリーンアップは、クラウドストレージで発生する可能性のあるいくつかの問題に対処するように設計されているため、複雑です。

  • ディレクトリの削除のパフォーマンスが遅い。
  • 非常に深く幅広いディレクトリツリーを削除するときのタイムアウト。
  • クリーンアップの問題がジョブの失敗にエスカレートすることへの一般的な耐性。
オプション 意味 デフォルト値
mapreduce.fileoutputcommitter.cleanup.skipped _temporaryディレクトリのクリーンアップをスキップする false
mapreduce.fileoutputcommitter.cleanup-failures.ignored クリーンアップ中のエラーを無視する false
mapreduce.manifest.committer.cleanup.parallel.delete タスク試行ディレクトリの並列削除 true

アルゴリズムは

if `mapreduce.fileoutputcommitter.cleanup.skipped`:
  return
if `mapreduce.manifest.committer.cleanup.parallel.delete`:
  attempt parallel delete of task directories; catch any exception
if not `mapreduce.fileoutputcommitter.cleanup.skipped`:
  delete(`_temporary`); catch any exception
if caught-exception and not `mapreduce.fileoutputcommitter.cleanup-failures.ignored`:
  throw caught-exception

少し複雑ですが、目標は高速/スケーラブルな削除を実行し、それが機能しなかった場合は意味のある例外をスローすることです。

ABFSとGCSを使用する場合は、通常、これらの設定はそのままにしておく必要があります。何らかの理由でクリーンアップ中にエラーが発生した場合は、失敗を無視するオプションを有効にすると、ジョブは引き続き完了します。クリーンアップを無効にすると、クリーンアップのオーバーヘッドも回避できますが、定期的にすべての_temporaryディレクトリをクリーンアップするワークフローまたは手動操作が必要です。

Azure ADLS Gen2ストレージでの作業

マニフェストコミッターに切り替えるには、abfs:// URLを持つ宛先のコミッターファクトリを、アプリケーションまたはクラスタ全体に対してマニフェストコミッターファクトリに切り替える必要があります。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>

これにより、コミッター内からADLS Gen2固有のパフォーマンスと整合性ロジックを使用できます。具体的には:* Etagヘッダーをリストで収集し、ジョブコミットフェーズで使用できます。* I/O名前変更操作はレート制限されています * スロットリングが名前変更の失敗をトリガーした場合、リカバリが試みられます。

警告 このコミッターは、古いAzureストレージサービス(WASBまたはADLS Gen 1)と互換性がありません。

Azureに最適化されたコアオプションセットは次のようになります。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>

<property>
  <name>spark.hadoop.fs.azure.io.rate.limit</name>
  <value>10000</value>
</property>

デバッグ/パフォーマンス分析のためのオプション設定

<property>
  <name>mapreduce.manifest.committer.summary.report.directory</name>
  <value>abfs:// Path within same store/separate store</value>
  <description>Optional: path to where job summaries are saved</description>
</property>

SparkのABFSオプションの完全なセット

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.fs.azure.io.rate.limit 10000
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

実験的:ABFS名前変更レート制限 fs.azure.io.rate.limit

ストアのスロットリングとバックオフ遅延、およびその他のスロットリング関連の失敗状態を回避するために、ジョブコミット中のファイルの名前変更は、「レートリミッター」を介してスロットリングされます。これは、ABFSファイルシステムクライアントの単一インスタンスが発行できる1秒あたりの名前変更操作の数を制限します。

オプション 意味
fs.azure.io.rate.limit I/O操作のレート制限(操作/秒)。

すべてのレート制限を削除するには、オプションを0に設定します。

デフォルト値は10000に設定されており、これはADLSストレージアカウントのデフォルトのI/O容量です。

<property>
  <name>fs.azure.io.rate.limit</name>
  <value>10000</value>
  <description>maximum number of renames attempted per second</description>
</property>

この容量はファイルシステムクライアントのレベルで設定されるため、単一のアプリケーション内のすべてのプロセス間、さらには同じストレージアカウントを共有する他のアプリケーション間で共有されません。

同じSparkドライバによってコミットされているすべてのジョブと共有されます。これらは、ファイルシステムコネクタを共有するためです。

レート制限が課せられる場合、統計store_io_rate_limitedは、ファイルをコミットするための許可を取得する時間を報告します。

サーバー側スロットリングが発生した場合は、次の兆候を確認できます。* ストレージサービスのログとそのスロットリングステータスコード(通常は503または500)。* ジョブ統計commit_file_rename_recovered。この統計は、ADLSスロットリングが名前変更の失敗として発生し、コミッターでリカバリされたことを示しています。

これらが表示される場合、または同時に実行されている他のアプリケーションがスロットリング/スロットリングによってトリガーされた問題が発生した場合は、fs.azure.io.rate.limitの値を小さくするか、Microsoftからより高いI/O容量を要求することを検討してください。

重要 Microsoftから追加の容量を取得し、それをジョブコミットの高速化に使用したい場合は、クラスタ全体で、または追加の優先順位を割り当てたいジョブに対して具体的にfs.azure.io.rate.limitの値を増やします。

これはまだ開発中であり、単一のファイルシステムインスタンスによって実行されるすべてのI/O操作をサポートするように拡張される可能性があります。

Google Cloud Storageでの作業

マニフェストコミッターは、Googleのgcs-connectorライブラリ(gsスキーマのHadoopファイルシステムクライアントを提供する)を介してGoogle Cloud Storageと互換性があり、テストされています。

Google Cloud Storageは、コミットプロトコルが安全に動作するために必要なセマンティクスを備えています。

このコミッターに切り替えるためのSparkの設定は次のとおりです。

spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

ストアのディレクトリの削除操作はO(files)であるため、mapreduce.manifest.committer.cleanup.parallel.deleteの値はデフォルトのtrueのままにする必要があります。

MapReduceの場合、core-site.xmlまたはmapred-site.xmlでバインディングを宣言します。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.gcs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

HDFSでの作業

このコミッターはHDFSでも動作しますが、一部の操作(特にリストと名前変更)のパフォーマンスが低下し、従来のFileOutputCommitterが依存するにはセマンティクスが不十分であるオブジェクトストアを対象としています(特にGCS)。

HDFSで使用するには、hdfs:// URLのコミッターファクトリとしてManifestCommitterFactoryを設定します。

HDFSは高速なディレクトリの削除を行うため、クリーンアップ中にタスク試行ディレクトリの削除を並列化する必要はありません。そのため、mapreduce.manifest.committer.cleanup.parallel.deletefalseに設定します。

最終的なSparkのバインディングは次のようになります。

spark.hadoop.mapreduce.outputcommitter.factory.scheme.hdfs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete false
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

詳細トピック

詳細な設定オプション

本番環境ではなく、開発とテストを目的とした高度なオプションがいくつかあります。

オプション 意味 デフォルト値
mapreduce.manifest.committer.store.operations.classname Manifest Store Operationsのクラス名 ""
mapreduce.manifest.committer.validate.output 出力検証を実行しますか? false
mapreduce.manifest.committer.writer.queue.capacity 中間ファイル書き込みのキュー容量 32

出力の検証mapreduce.manifest.committer.validate.output

オプションmapreduce.manifest.committer.validate.outputは、名前が変更されたすべてのファイルをチェックして、期待される長さを持っていることを確認します。

これにより、ファイルごとにHEADリクエストのオーバーヘッドが追加されるため、テストのみに推奨されます。

実際のコンテンツの検証はありません。

ストレージ統合の制御mapreduce.manifest.committer.store.operations.classname

マニフェストコミッターは、ManifestStoreOperationsインターフェースの実装を通じてファイルシステムと対話します。ストア固有の機能のカスタム実装を提供できます。ABFSにはこれがあり、abfs固有のコミッターファクトリが使用されると、これは自動的に設定されます。

明示的に設定することもできます。

<property>
  <name>mapreduce.manifest.committer.store.operations.classname</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AbfsManifestStoreOperations</value>
</property>

デフォルトの実装も設定できます。

<property>
  <name>mapreduce.manifest.committer.store.operations.classname</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem</value>
</property>

ストアがコミッターに追加の統合サポートを提供する場合を除き、これらの値を変更する必要はありません。これは、他のストアに新しい実装を作成する場合にのみ必要です。

同じディレクトリへの同時ジョブのサポート

同じディレクトリツリーを対象とする複数のジョブを実行できる場合があります。

これを実現するには、いくつかの条件を満たす必要があります。

  • Sparkを使用する場合、一意のジョブIDを設定する必要があります。これは、SparkディストリビューションにSPARK-33402SPARK-33230のパッチが含まれている必要があることを意味します。
  • mapreduce.fileoutputcommitter.cleanup.skippedtrueに設定して、_temporaryディレクトリのクリーンアップを無効にする必要があります。
  • すべてのジョブ/タスクは、一意のファイル名を持つファイルを作成する必要があります。
  • すべてのジョブは、同じディレクトリパーティション構造で出力を作成する必要があります。
  • ジョブ/クエリは、Spark Dynamic Partitioningの「INSERT OVERWRITE TABLE」を使用してはなりません。データが失われる可能性があります。これは、マニフェストコミッターだけでなく、すべてのコミッターに当てはまります。
  • 後で_temporaryディレクトリを削除することを忘れないでください!

これはテストされていません