このドキュメントでは、Manifest Committerの使用方法について説明します。
Manifestコミッタは、実世界のクエリでABFSのパフォーマンスを提供し、GCSのパフォーマンスと正確性を提供するワーク用のコミッタです。HDFSを含む他のファイルシステムでも機能しますが、リスト操作が遅く高価なオブジェクトストレージ用に最適化された設計です。
コミッタのアーキテクチャと実装については、Manifest Committerアーキテクチャを参照してください。
プロトコルとその正確性については、Manifest Committerプロトコルを参照してください。
2022年3月に追加され、初期リリースでは不安定とみなされる可能性があります。
SparkからAzure ADLS Gen 2「abfs://」ストレージへのワークの唯一安全に使用できるコミッタは、「v1ファイルコミッタ」です。
これは、タスク試行が失敗した場合、その出力は最終出力に含まれないことが保証されているという意味で「正しい」ものです。「v2」コミットアルゴリズムは、その保証を満たすことができないため、デフォルトではなくなりました。
しかし、出力の深いディレクトリツリーが使用されるジョブでは特に遅いです。なぜ遅いのでしょうか?特に、FileOutputCommitter
にインストルメンテーションがないため、特定の原因を指摘するのは困難です。実行中のジョブのスタックトレースには一般的にrename()
が表示されますが、リスト操作も表示されます。
Google GCSでは、v1アルゴリズムとv2アルゴリズムのどちらも安全ではありません。Googleファイルシステムには、v1アルゴリズムに必要なアトミックディレクトリのリネームがないためです。
さらに、AzureとGCSストレージの両方で、多くの子孫を持つディレクトリを削除するときにスケール問題が発生する可能性があります。FileOutputCommitter
はジョブの後処理がdelete("_temporary", true)
への高速呼び出しであると想定しているため、タイムアウトが発生する可能性があります。
中間マニフェストコミッタは、ABFSの実世界のクエリでパフォーマンスを提供し、GCSのパフォーマンスと正確性を提供する新しいワーク用のコミッタです。
このコミッタは、S3Aコミッタ用に導入された拡張ポイントを使用します。ユーザーは、abfs://とgcs:// URLに対して新しいコミッタファクトリを宣言できます。適切に構成されたSparkデプロイメントは、新しいコミッタを検出します。
ジョブクリーンアップにおけるディレクトリのパフォーマンスの問題は、2つのオプションによって解決できます。1. コミッタは、_temporary
ディレクトリを削除する前に、タスク試行ディレクトリの削除を並列化します。1. クリーンアップを無効にできます。
このコミッタは、「リアル」なfile rename()操作を持つファイルシステムクライアントで使用できます。リストとファイルプローブが高価なリモートオブジェクトストレージ用に最適化されています。設計上、HDFSではそれほど大きな速度向上は期待できませんが、並列リネーム操作により、従来のv1アルゴリズムと比較してジョブが高速化されます。
詳細はManifest Committerアーキテクチャを参照してください。
S3Aコミッタをサポートするために導入されたフックは、すべてのファイルシステムスキーマが独自のコミッタを提供できるように設計されています。S3Aコミッタへの切り替えを参照してください。
abfsスキーマのファクトリはmapreduce.outputcommitter.factory.scheme.abfs
で定義され、gcs
についても同様です。
特にParquetバインディングについては、対応するSpark設定の変更が必要になります。mapred-default.xml
JARで定義されていない場合は、core-site.xml
で行うことができます。
<property> <name>mapreduce.outputcommitter.factory.scheme.abfs</name> <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value> </property> <property> <name>mapreduce.outputcommitter.factory.scheme.gs</name> <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value> </property>
Apache Sparkでは、コマンドラインオプション(「–conf」の後)またはspark-defaults.conf
ファイルを使用して設定を行うことができます。以下は、内部的にファクトリメカニズムを使用するParquetコミッタのサブクラスを含むParquetの設定例を示したspark-defaults.conf
の使用例です。
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
committerinfo
コマンドを使用してコミッタバインドを調査する。Hadoopコミッタの設定は、cloudstoreの最近のビルドとそのcommitterinfo
コマンドで検証できます。このコマンドは、MRおよびSparkジョブで使用されるものと同じファクトリメカニズムを使用してそのパスに対するコミッタをインスタンス化し、そのtoString
値を出力します。
hadoop jar cloudstore-1.0.jar committerinfo abfs://testing@ukwest.dfs.core.windows.net/ 2021-09-16 19:42:59,731 [main] INFO commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer Committer factory for path abfs://testing@ukwest.dfs.core.windows.net/ is org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7 (classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory) 2021-09-16 19:43:00,897 [main] INFO manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://testing@ukwest.dfs.core.windows.net/ Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter: ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://testing@ukwest.dfs.core.windows.net/, role='task committer', taskAttemptDir=abfs://testing@ukwest.dfs.core.windows.net/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1, createJobMarker=true, jobUniqueId='job__0000', jobUniqueIdSource='JobID', jobAttemptNumber=0, jobAttemptId='job__0000_0', taskId='task__0000_r_000000', taskAttemptId='attempt__0000_r_000000_1'}, iostatistics=counters=(); gauges=(); minimums=(); maximums=(); means=(); }
新しいコミッタは、統計情報を含む操作のサマリーをJSON形式で_SUCCESS
ファイルに書き込みます。
このファイルが存在し、サイズがゼロバイトの場合、従来のFileOutputCommitter
が使用されました。
このファイルが存在し、サイズがゼロバイトより大きい場合、manifestコミッタが使用されたか、S3Aファイルシステムの場合、S3Aコミッタのいずれかが使用されました。すべて同じJSON形式を使用します。
以下は、コミッタの主な設定オプションです。
オプション | 意味 | デフォルト値 |
---|---|---|
mapreduce.manifest.committer.delete.target.files |
ターゲットファイルを削除しますか? | false |
mapreduce.manifest.committer.io.threads |
並列操作のスレッド数 | 64 |
mapreduce.manifest.committer.summary.report.directory |
レポートを保存するディレクトリ。 | "" |
mapreduce.manifest.committer.cleanup.parallel.delete |
一時ディレクトリを並列で削除する | true |
mapreduce.fileoutputcommitter.cleanup.skipped |
_temporary ディレクトリのクリーンアップをスキップする |
false |
mapreduce.fileoutputcommitter.cleanup-failures.ignored |
クリーンアップ中のエラーを無視する | false |
mapreduce.fileoutputcommitter.marksuccessfuljobs |
処理が正常に完了したら、_SUCCESS マーカーファイルを作成します。(ジョブセットアップ時に既存のファイルがあれば削除します) |
true |
その他にもいくつかあります。詳細は「詳細」セクション[#advanced]を参照してください。
mapreduce.manifest.committer.io.threads
このコミッターが従来のFileOutputCommitter
よりも高速である主な理由は、ジョブコミット中に可能な限り多くのファイルI/Oを並列化しようとする点にあります。具体的には、
これらの操作はすべて、mapreduce.manifest.committer.io.threads
オプションでサイズを設定する同じスレッドプールで実行されます。
より大きな値を使用できます。
Hadoop XML 設定
<property> <name>mapreduce.manifest.committer.io.threads</name> <value>32</value> </property>
spark-defaults.conf
内
spark.hadoop.mapreduce.manifest.committer.io.threads 32
MapReduce AMまたはSpark Driverに割り当てられたコア数よりも大きな値を使用しても、CPUが直接過負荷になることはありません。スレッドは通常、オブジェクトストア/ファイルシステムに対する(遅い)I/Oの完了を待機しているためです。
ジョブコミットにおけるマニフェストの読み込みはメモリを大量に消費する可能性があります。スレッド数が多いほど、同時に読み込まれるマニフェスト数が増えます。
注意点 * Sparkでは、複数のジョブが同じプロセスでコミットされる可能性があり、それぞれがジョブコミットまたはクリーンアップ中に独自のサーレッドプールを作成します。* ストアのI/Oリクエストが多すぎると、Azureのレート制限がトリガーされる可能性があります。レート制限オプションmapreduce.manifest.committer.io.rate
を使用すると、これを回避できます。
mapreduce.manifest.committer.writer.queue.capacity
これは2番目のスケールオプションです。ターゲットファイルシステムから読み込まれたマニフェスト、ワーカースレッドプールから読み込まれたマニフェスト、および各マニフェストのエントリをローカルファイルシステムの中間ファイルに保存する単一のスレッドから名前変更するファイルのリストを格納するためのキューのサイズを制御します。
キューがいっぱいになると、すべてのマニフェスト読み込みスレッドがブロックされます。
<property> <name>mapreduce.manifest.committer.writer.queue.capacity</name> <value>32</value> </property>
ローカルファイルシステムへの書き込みは、通常、クラウドストレージよりもはるかに高速であるため、このキューサイズはマニフェスト読み込みパフォーマンスの制限にはなりません。
ジョブコミット中のマニフェスト読み込み時に消費されるメモリの量を制限するのに役立ちます。読み込まれたマニフェストの最大数は
mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads
従来のFileOutputCommitter
は、ジョブのファイルを適切な場所に名前変更する前に、宛先パスにあるファイルを削除します。
マニフェストコミッターではオプションであり、オプションmapreduce.manifest.committer.delete.target.files
で設定され、デフォルト値はfalse
です。
これによりパフォーマンスが向上し、ジョブによって作成されたすべてのファイルに一意のファイル名が付けられている場合に安全に使用できます。
Apache Sparkは、SPARK-8406 *出力ファイル名にUUIDを追加して、誤って上書きされるのを防ぐ*以降、ORCとParquetに対して一意のファイル名を作成します。
ターゲットファイルのチェック/削除を回避することで、コミットされるファイルごとに1回の削除呼び出しが節約されるため、ストレージI/Oを大幅に節約できます。
既存のテーブルに追加する場合、ORCおよびparquet以外の形式を使用する場合、一意の識別子が各ファイル名に追加されていることを確信していない限り、ターゲットファイルの削除を有効にします。
spark.hadoop.mapreduce.manifest.committer.delete.target.files true
注記1:コミッターは、ファイルを名前変更するディレクトリを作成した場合、削除操作をスキップします。これにより、少なくともデータを追加するジョブが新しいパーティションを作成して書き込んでいる場合、効率がわずかに向上します。
注記2:コミッターは、単一のジョブ内のタスクが一意のファイルを作成する必要があるという前提条件を依然として必要としています。これは、任意のジョブが正しいデータを作成するための基礎となります。
Sparkには「動的パーティション上書き」という機能があります。
これはSQLで開始できます。
INSERT OVERWRITE TABLE ...
または、モードがoverwrite
で、パーティションが既存のテーブルのパーティションと一致するDataSet書き込みを介して開始できます。
sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") // followed by an overwrite of a Dataset into an existing partitioned table. eventData2 .write .mode("overwrite") .partitionBy("year", "month") .format("parquet") .save(existingDir)
この機能はSparkに実装されており、1. ジョブに新しいデータを一時ディレクトリに書き込むように指示します 1. ジョブコミットが完了した後、出力をスキャンして、データが書き込まれたリーフディレクトリ「パーティション」を特定します。 1. 宛先テーブルのそれらのディレクトリのコンテンツを削除します 1. 新しいファイルをパーティションに名前変更します。
これらはすべてSparkで行われ、中間出力ツリーのスキャン、パーティションの削除、新しいファイルの名前変更というタスクを引き継ぎます。
この機能により、ジョブは宛先テーブルの完全に外部にデータを書き込むこともできます。これは、1. 作業ディレクトリに新しいファイルを作成することによって行われ、1. Sparkがジョブコミット時にそれらを最終的な宛先に移動します。
マニフェストコミッターは、AzureおよびGoogle Cloud Storageでの動的パーティション上書きと互換性があり、拡張機能のコア要件を満たしています。1. getWorkPath()
で返される作業ディレクトリは、最終出力と同じファイルシステム上にあります。2. rename()
は、ジョブをコミットする際に安全かつ高速に使用できるO(1)
操作です。
S3Aコミッターのどれもこれに対応していません。条件(1)はステージングコミッターでは満たされず、(2)はS3自体では満たされません。
動的パーティション上書きでマニフェストコミッターを使用するには、SparkバージョンにSPARK-40034 *動的パーティション上書きで動作するPathOutputCommitters*が含まれている必要があります。
多くのファイルの名前が変更される場合、操作の名前変更フェーズは遅くなります(これは順次実行されます)。並列名前変更はこれを高速化しますが、*マニフェストコミッターがリスクを最小限に抑え、リカバリをサポートするように設計されているabfs過負荷の問題をトリガーする可能性があります*。
コミット操作のSpark側は、一時出力ディレクトリをリスト/ツリーウォーキング(多少のオーバーヘッド)した後、従来のファイルシステムrename()
呼び出しを使用して実行されるファイル昇格を行います。ここでは明示的なレート制限はありません。
これは何を意味しますか?
これは、_動的パーティショニングは、数千ものファイルが作成されるSQLクエリ/Spark DataSet操作に対してAzure Storageで使用しないでください_ことを意味します。これらがスロットリングスケール問題が発生する前にパフォーマンスの問題に悩まされるという事実は、警告とみなすべきです。
_SUCCESS
ファイルのジョブサマリー元のHadoopコミッターは、無効にされていない限り、出力ディレクトリのルートに0バイトの_SUCCESS
ファイルを作成します。
このコミッターは、次の情報を含むJSONサマリーを書き込みます。* コミッターの名前。* 診断情報。* 作成されたファイルの一部(テスト用。完全なリストは除外されます。大きくなる可能性があるため)。* I/O統計。
クエリを実行した後、この_SUCCESS
ファイルの長さが0バイトの場合、_新しいコミッターは使用されていません_。
空でない場合は、検査できます。
ManifestPrinter
ツールを使用した_SUCCESS
ファイルの表示。サマリーファイルはJSONであり、任意のテキストエディタで表示できます。
統計のより良い表示を含む、より簡潔なサマリーについては、ManifestPrinter
ツールを使用してください。
hadoop org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter <path>
これは、出力ディレクトリのベースに保存されたファイルと、レポートディレクトリに保存されたレポートの両方で機能します。
mapreduce.manifest.committer.summary.report.directory
オプションmapreduce.manifest.committer.summary.report.directory
にファイルシステムパスを設定することにより、コミッターはジョブの成功または失敗に関係なく、_SUCCESS
サマリーファイルをレポートディレクトリに保存するように設定できます。
パスは、作業の宛先と同じストア/ファイルシステム上にある必要はありません。たとえば、ローカルファイルシステムを使用できます。
XML
<property> <name>mapreduce.manifest.committer.summary.report.directory</name> <value>file:///tmp/reports</value> </property>
spark-defaults.conf
spark.hadoop.mapreduce.manifest.committer.summary.report.directory file:///tmp/reports
これにより、_SUCCESS
マーカーの保存が有効になっているかどうかに関係なく、ジョブの統計を収集でき、クエリチェーンによるマーカーの上書きによって発生する問題を回避できます。
ジョブクリーンアップは、クラウドストレージで発生する可能性のあるいくつかの問題に対処するように設計されているため、複雑です。
オプション | 意味 | デフォルト値 |
---|---|---|
mapreduce.fileoutputcommitter.cleanup.skipped |
_temporary ディレクトリのクリーンアップをスキップする |
false |
mapreduce.fileoutputcommitter.cleanup-failures.ignored |
クリーンアップ中のエラーを無視する | false |
mapreduce.manifest.committer.cleanup.parallel.delete |
タスク試行ディレクトリの並列削除 | true |
アルゴリズムは
if `mapreduce.fileoutputcommitter.cleanup.skipped`: return if `mapreduce.manifest.committer.cleanup.parallel.delete`: attempt parallel delete of task directories; catch any exception if not `mapreduce.fileoutputcommitter.cleanup.skipped`: delete(`_temporary`); catch any exception if caught-exception and not `mapreduce.fileoutputcommitter.cleanup-failures.ignored`: throw caught-exception
少し複雑ですが、目標は高速/スケーラブルな削除を実行し、それが機能しなかった場合は意味のある例外をスローすることです。
ABFSとGCSを使用する場合は、通常、これらの設定はそのままにしておく必要があります。何らかの理由でクリーンアップ中にエラーが発生した場合は、失敗を無視するオプションを有効にすると、ジョブは引き続き完了します。クリーンアップを無効にすると、クリーンアップのオーバーヘッドも回避できますが、定期的にすべての_temporary
ディレクトリをクリーンアップするワークフローまたは手動操作が必要です。
マニフェストコミッターに切り替えるには、abfs://
URLを持つ宛先のコミッターファクトリを、アプリケーションまたはクラスタ全体に対してマニフェストコミッターファクトリに切り替える必要があります。
<property> <name>mapreduce.outputcommitter.factory.scheme.abfs</name> <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value> </property>
これにより、コミッター内からADLS Gen2固有のパフォーマンスと整合性ロジックを使用できます。具体的には:* Etagヘッダーをリストで収集し、ジョブコミットフェーズで使用できます。* I/O名前変更操作はレート制限されています * スロットリングが名前変更の失敗をトリガーした場合、リカバリが試みられます。
警告 このコミッターは、古いAzureストレージサービス(WASBまたはADLS Gen 1)と互換性がありません。
Azureに最適化されたコアオプションセットは次のようになります。
<property> <name>mapreduce.outputcommitter.factory.scheme.abfs</name> <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value> </property> <property> <name>spark.hadoop.fs.azure.io.rate.limit</name> <value>10000</value> </property>
デバッグ/パフォーマンス分析のためのオプション設定
<property> <name>mapreduce.manifest.committer.summary.report.directory</name> <value>abfs:// Path within same store/separate store</value> <description>Optional: path to where job summaries are saved</description> </property>
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory spark.hadoop.fs.azure.io.rate.limit 10000 spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries)
fs.azure.io.rate.limit
ストアのスロットリングとバックオフ遅延、およびその他のスロットリング関連の失敗状態を回避するために、ジョブコミット中のファイルの名前変更は、「レートリミッター」を介してスロットリングされます。これは、ABFSファイルシステムクライアントの単一インスタンスが発行できる1秒あたりの名前変更操作の数を制限します。
オプション | 意味 |
---|---|
fs.azure.io.rate.limit |
I/O操作のレート制限(操作/秒)。 |
すべてのレート制限を削除するには、オプションを0
に設定します。
デフォルト値は10000に設定されており、これはADLSストレージアカウントのデフォルトのI/O容量です。
<property> <name>fs.azure.io.rate.limit</name> <value>10000</value> <description>maximum number of renames attempted per second</description> </property>
この容量はファイルシステムクライアントのレベルで設定されるため、単一のアプリケーション内のすべてのプロセス間、さらには同じストレージアカウントを共有する他のアプリケーション間で共有されません。
同じSparkドライバによってコミットされているすべてのジョブと共有されます。これらは、ファイルシステムコネクタを共有するためです。
レート制限が課せられる場合、統計store_io_rate_limited
は、ファイルをコミットするための許可を取得する時間を報告します。
サーバー側スロットリングが発生した場合は、次の兆候を確認できます。* ストレージサービスのログとそのスロットリングステータスコード(通常は503または500)。* ジョブ統計commit_file_rename_recovered
。この統計は、ADLSスロットリングが名前変更の失敗として発生し、コミッターでリカバリされたことを示しています。
これらが表示される場合、または同時に実行されている他のアプリケーションがスロットリング/スロットリングによってトリガーされた問題が発生した場合は、fs.azure.io.rate.limit
の値を小さくするか、Microsoftからより高いI/O容量を要求することを検討してください。
重要 Microsoftから追加の容量を取得し、それをジョブコミットの高速化に使用したい場合は、クラスタ全体で、または追加の優先順位を割り当てたいジョブに対して具体的にfs.azure.io.rate.limit
の値を増やします。
これはまだ開発中であり、単一のファイルシステムインスタンスによって実行されるすべてのI/O操作をサポートするように拡張される可能性があります。
マニフェストコミッターは、Googleのgcs-connectorライブラリ(gs
スキーマのHadoopファイルシステムクライアントを提供する)を介してGoogle Cloud Storageと互換性があり、テストされています。
Google Cloud Storageは、コミットプロトコルが安全に動作するために必要なセマンティクスを備えています。
このコミッターに切り替えるためのSparkの設定は次のとおりです。
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries)
ストアのディレクトリの削除操作はO(files)
であるため、mapreduce.manifest.committer.cleanup.parallel.delete
の値はデフォルトのtrue
のままにする必要があります。
MapReduceの場合、core-site.xml
またはmapred-site.xml
でバインディングを宣言します。
<property> <name>mapreduce.outputcommitter.factory.scheme.gcs</name> <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value> </property>
このコミッターはHDFSでも動作しますが、一部の操作(特にリストと名前変更)のパフォーマンスが低下し、従来のFileOutputCommitter
が依存するにはセマンティクスが不十分であるオブジェクトストアを対象としています(特にGCS)。
HDFSで使用するには、hdfs://
URLのコミッターファクトリとしてManifestCommitterFactory
を設定します。
HDFSは高速なディレクトリの削除を行うため、クリーンアップ中にタスク試行ディレクトリの削除を並列化する必要はありません。そのため、mapreduce.manifest.committer.cleanup.parallel.delete
をfalse
に設定します。
最終的なSparkのバインディングは次のようになります。
spark.hadoop.mapreduce.outputcommitter.factory.scheme.hdfs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete false spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries)
本番環境ではなく、開発とテストを目的とした高度なオプションがいくつかあります。
オプション | 意味 | デフォルト値 |
---|---|---|
mapreduce.manifest.committer.store.operations.classname |
Manifest Store Operationsのクラス名 | "" |
mapreduce.manifest.committer.validate.output |
出力検証を実行しますか? | false |
mapreduce.manifest.committer.writer.queue.capacity |
中間ファイル書き込みのキュー容量 | 32 |
mapreduce.manifest.committer.validate.output
オプションmapreduce.manifest.committer.validate.output
は、名前が変更されたすべてのファイルをチェックして、期待される長さを持っていることを確認します。
これにより、ファイルごとにHEAD
リクエストのオーバーヘッドが追加されるため、テストのみに推奨されます。
実際のコンテンツの検証はありません。
mapreduce.manifest.committer.store.operations.classname
マニフェストコミッターは、ManifestStoreOperations
インターフェースの実装を通じてファイルシステムと対話します。ストア固有の機能のカスタム実装を提供できます。ABFSにはこれがあり、abfs固有のコミッターファクトリが使用されると、これは自動的に設定されます。
明示的に設定することもできます。
<property> <name>mapreduce.manifest.committer.store.operations.classname</name> <value>org.apache.hadoop.fs.azurebfs.commit.AbfsManifestStoreOperations</value> </property>
デフォルトの実装も設定できます。
<property> <name>mapreduce.manifest.committer.store.operations.classname</name> <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem</value> </property>
ストアがコミッターに追加の統合サポートを提供する場合を除き、これらの値を変更する必要はありません。これは、他のストアに新しい実装を作成する場合にのみ必要です。
同じディレクトリツリーを対象とする複数のジョブを実行できる場合があります。
これを実現するには、いくつかの条件を満たす必要があります。
mapreduce.fileoutputcommitter.cleanup.skipped
をtrue
に設定して、_temporary
ディレクトリのクリーンアップを無効にする必要があります。_temporary
ディレクトリを削除することを忘れないでください!これはテストされていません