クラス org.apache.hadoop.fs.FutureDataInputStreamBuilder

JavaのFuture参照をFSDataInputStreamとそのサブクラスに作成するためのビルダーパターンのインターフェースを提供します。既存のファイルを読み取りのために開く(非同期になる可能性のある)操作を開始するために使用されます。

履歴

Hadoop 3.3.0: API導入

HADOOP-15229 createFile()に合わせるために、FileSystemビルダーベースのopenFile() APIを追加する

  • opt(String key, long value)メソッドは使用できませんでした。
  • withFileStatus(status)呼び出しには、null以外の引数が必要でした。
  • オプションとファイルの状態を処理する唯一のファイルシステムはS3Aでした。
  • S3固有のオプションは、S3 selectとfs.s3a.experimental.input.fadviseのみでした。
  • ファイルの状態が渡され、filestatusのパスがopenFile(path)呼び出しのパスと一致しない場合、S3AファイルシステムはIllegalArgumentExceptionをスローしました。

これは基本的な実装です。このバージョンに対して確実にコンパイルされるコードを作成するには、opt(String, String)およびmust(String, String)メソッドを使用し、数値を明示的に文字列に変換します。

fs.open("s3a://bucket/file")
  .opt("fs.option.openfile.length", Long.toString(length))
  .build().get()

Hadoop 3.3.5: 標準化と拡張

HADOOP-16202 オブジェクトストアに対する読み取りパフォーマンスを向上させるためのopenFile()の強化

  • withFileStatus(null)を受け入れる(および無視する)必要がありました。
  • 指定されたFileStatusパスのファイル名部分のみが、openFile(path)で渡されたファイル名と一致する必要があります。
  • opt(String key, long value)オプションが追加されました。*これは、回帰を引き起こしたため、現在非推奨です*
  • 標準のfs.option.openfileオプションが定義されました。
  • openfileの長さオプションを使用するS3A FS。シーク開始/終了オプションはまだ使用されていません。
  • Azure ABFSコネクタは提供されたVersionedFileStatusを受け取り、オブジェクトのHEADプローブを省略します。

Hadoop 3.3.6: 演算子オーバーロードのバグに対処するためのAPI変更。

新しいoptLong()optDouble()mustLong()mustDouble()ビルダーメソッド。

  • HADOOP-18724 S3AFileSystemの場合、NumberFormatExceptionでファイルのオープンが失敗するを参照してください。これは、オーバーロードされたopt(long)によって何らかの原因で発生しました。
  • 解析できない数値は「設定されていない」ものとして処理され、代わりにデフォルト値が使用されることが仕様で宣言されました。

不変条件

FutureDataInputStreamBuilderインターフェースは、build()が呼び出されるまで、または非同期オープン操作自体の間、パラメータまたはFileSystemの状態を必要としません。

ファイルシステムの状態のいくつかの側面は、openFile()build().get()シーケンスの間に変化しない不変量であることがわかっている場合、初期のopenFile()呼び出しでチェックされる場合があります。たとえば、パスの検証など。

実装に依存しないパラメータ。

FutureDataInputStreamBuilder bufferSize(int bufSize)

使用するバッファのサイズを設定します。

FutureDataInputStreamBuilder withFileStatus(FileStatus status)

開かれているファイルを参照するFileStatusインスタンス。

実装で使用して、ファイルのチェックをショートカットし、特にオブジェクトストアへのリモート呼び出しを削減できます。

要件

  • status != null
  • status.getPath().getName() == 開かれているファイルの名前。

ストアがファイルを開く際にFileStatusを使用する場合、パス検証は行われる必要があり、それ以外の場合は行われる場合があります。検証は、build()操作まで延期する必要があります。

この操作は、ファイルシステムへのヒントと見なされるべきです。

ファイルシステムの実装が返されたFileStatusを拡張する場合、ファイルを開く際にこの情報を使用できます。

これは、バージョン/etag情報を返すストアに関連しています。これを使用して、開いたファイルがリストで返されたファイルと正確に一致することを保証できます。

提供されたstatusの最終的なstatus.getPath().getName()要素は、openFile(path)呼び出しに渡されたパスのname値と一致する必要があります。

ファイルシステムは、パスの残りの部分を検証してはなりません。これは、スキーマとパスが異なるviewfsやその他のマウントポイントラッパーファイルシステムをサポートするために必要です。これらは多くの場合、独自のFileStatus結果を作成します。

前提条件

status == null or status.getPath().getName() == path.getName()

ファイルシステムは、statusのクラスが、ファイルステータス/リスト操作で実装が返す特定のサブクラスと一致する必要はありません。これは、ラッパーファイルシステムとステータスのシリアル化/デシリアル化をサポートするためです。

オプションまたは必須パラメータの設定

FutureDataInputStreamBuilder opt(String key, String value)
FutureDataInputStreamBuilder opt(String key, int value)
FutureDataInputStreamBuilder opt(String key, boolean value)
FutureDataInputStreamBuilder optLong(String key, long value)
FutureDataInputStreamBuilder optDouble(String key, double value)
FutureDataInputStreamBuilder must(String key, String value)
FutureDataInputStreamBuilder must(String key, int value)
FutureDataInputStreamBuilder must(String key, boolean value)
FutureDataInputStreamBuilder mustLong(String key, long value)
FutureDataInputStreamBuilder mustDouble(String key, double value)

ビルダーにオプションまたは必須パラメータを設定します。opt()またはmust()を使用すると、クライアントはFileSystemの具体的な型を検査せずに、FS固有のパラメータを指定できます。

out = fs.openFile(path)
    .must("fs.option.openfile.read.policy", "random")
    .optLong("fs.http.connection.timeout", 30_000L)
    .withFileStatus(statusFromListing)
    .build()
    .get();

ここで、randomの読み取りポリシーが指定されており、ファイルシステムの実装がオプションを理解する必要があることを要求しています。http固有のオプションが提供されており、任意のストアで解釈できます。ファイルを開くファイルシステムがオプションを認識しない場合、安全に無視できます。

optmustを使用するタイミング

optmustの違いは、ファイルシステムが認識しないオプションに対して、ファイルをオープンする際にどのように反応するかです。

def must(name, value):
  if not name in known_keys:
    raise IllegalArgumentException
  if not name in supported_keys:
    raise UnsupportedException


def opt(name, value):
  if not name in known_keys:
     # ignore option

既知のキーについて、(キー、値)ペアがどのように宣言されたかに関係なく、value引数の検証はMUST同一でなければなりません。

  1. ファイルシステム固有のオプションの場合、エントリの検証方法は実装の選択に委ねられます。
  2. 標準オプションの場合、有効なvalueの仕様はこのファイルシステム仕様で定義されており、契約テストによって検証されます。

実装に関する注記

サポートされているオプションの確認は、build()操作で行う必要があります。

  1. must(key, value)で宣言された必須パラメータが認識されない場合、IllegalArgumentExceptionをMUSTスローする必要があります。

  2. must(key, value)で宣言された必須パラメータが、特定のFileSystem/FileContextインスタンスで認識されているがサポートされていない機能に依存している場合、UnsupportedExceptionをMUSTスローする必要があります。

数値の解析では、文字列のトリミングを行うSHOULDし、値が数値として解析できない場合は、指定されたデフォルト値にダウングレードする必要があります。これは、HADOOP-18724 Open file fails with NumberFormatException for S3AFileSystemに対応するためです。これは、長い値が渡されたときに、オーバーロードされたopt()ビルダーパラメータがopt(String, long)ではなくopt(String, double)にバインドされたことが原因でした。

ビルダーメソッド(例: bufferSize())とopt()/must()によって設定されたパラメータ間の競合の解決方法は次のとおりです。

最後に指定されたオプションが、値とそのオプション/必須状態を定義します。

withFileStatus()で渡されたFileStatusオプションが使用される場合、実装はFileStatusのすべてのサブクラス(LocatedFileStatusを含む)を、実装によって実装されたFS固有のサブクラス(例: S3AFileStatus)だけでなく、MUST受け入れる必要があります。実装固有のサブクラスではないものは、MAY単に無視しても構いません。

これは、機能の安全な使用を確保するために重要です。ディレクトリのリスト/ステータスのシリアル化/デシリアル化の結果、withFileStatus()引数が、ファイルシステムインスタンス自身のgetFileStatus()listFiles()listLocatedStatus()などの呼び出しによって返されるカスタムサブクラスにならない可能性があります。

このような状況では、実装はMUST

  1. status.getPath().getName()が現在のpath.getName()値と一致することを検証する必要があります。パスに残りの部分はMUST検証してはなりません。
  2. 必要に応じてステータスのフィールドを使用します - 例えば、ファイルの長さ。

ステータスの値が使用されない場合でも、引数の存在は、呼び出し元がファイルが存在し、指定されたサイズであると確信していると宣言していると解釈できます。

ビルダーインターフェース

CompletableFuture<FSDataInputStream> build()

正常に完了した場合、ファイルシステムからデータを読み取ることができる入力ストリームを返すCompletableFuture<FSDataInputStream>を返します。

build()操作は、ファイルの存在、種類などを検証し、ディレクトリまたは存在しないファイルからの読み取りを試行するのを拒否するMAYがあります。あるいは、* ファイルの存在/状態チェックは、返されたCompletableFuture<>内で非同期的に実行されるMAYがあります。* ファイルの存在/状態チェックは、read()PositionedReadなどの読み取り操作で最初のバイトが読み取られるまで延期されるMAYがあります。

つまり、事前条件exists(FS, path)isFile(FS, path)は、返されたfutureに対して呼び出されたget()の後、ストリームの読み取りが試みられた後にのみ満たされていることが保証されます。

したがって、ファイルが存在しない場合、またはファイルではなくディレクトリの場合でも、次の呼び出しはMUST成功し、評価されるCompletableFutureを返す必要があります。

Path p = new Path("file://tmp/file-which-does-not-exist");

CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
      .openFile(p)
      .build();

ファイルへのアクセス/読み取りができない場合、futureのget()呼び出し、または遅延バインディング操作の場合、データを読み取る操作が呼び出されたときに、IOExceptionまたはそのサブクラスをMUST発生させる必要があります。

したがって、前の例で返されたfutureに対して呼び出された場合、次のシーケンスはSHALL失敗します。

  future.get().read();

アクセス許可チェックには、同じ可視性の要件があります。許可の失敗は、get()呼び出しまで遅延されるMUSTであり、後続の操作まで遅延されるMAYがあります。

注: seek()など、入力ストリームの一部の操作では、まったくIOを試行しない場合があります。そのような操作は、存在しない/読み取り不可能なファイルと対話する際に、MAY例外を発生させません。

Hadoop 3.3.3以降の標準的なopenFile()オプション

これらは、FileSystemFileContextの実装がMUST認識し、必要に応じて入力ストリームの動作を変更することでMAYサポートするオプションです。

Hadoop 3.3.0ではopenFile()APIが追加されました。これらの標準オプションは、後のリリースで定義されました。したがって、「よく知られている」ものの、アプリケーションがオプションを知っているHadoopのリリースに対してのみ実行されることが確信できない限り、アプリケーションはmust()ではなくopt()呼び出しを使用してオプションを設定するSHOULD必要があります。

openFile()ビルダーAPIを使用してファイルをオープンする場合、呼び出し元は.opt(key, value).must(key, value)の両方の呼び出しを使用して、標準オプションとファイルシステム固有のオプションを設定するMAYがあります。

opt()パラメータとして設定された場合、サポートされていない「標準」オプションはMUST無視され、認識されない標準オプションもMUST無視されます。

must()パラメータとして設定された場合、サポートされていない「標準」オプションはMUST無視されます。認識されない標準オプションはMUST拒否されます。

標準のopenFile()オプションはorg.apache.hadoop.fs.OpenFileOptionsで定義されています。それらはすべてfs.option.openfile.で始まるSHALL必要があります。

すべてのFileSystem/FileContextインスタンスは、must()宣言が失敗しない程度にこれらのオプションをサポートするSHALL必要がありますが、実装は値を解釈する程度にそれらをサポートするMAYがあります。これは、ストアが実際に読み取りポリシーまたはファイル長の値を読み取って、ファイルをオープンするときにそれらを使用することが必須ではないことを意味します。

特に指定がない限り、それらはヒントとしてSHOULD見なされるべきです。

注: 設定されているがサポートされていない場合にエラーになるような標準オプションが追加された場合、実装はそれを拒否するSHALL必要があります。たとえば、S3Aファイルシステムクライアントは、SQLコマンドをプッシュダウンする機能をサポートしています。そのようなものが標準化された場合、opt()またはmust()引数のいずれかでのオプションの使用は、機能をサポートしていないファイルシステムに対してMUST拒否されます。

オプション: fs.option.openfile.buffer.size

バイト単位の読み取りバッファサイズ。

これは、オプションio.file.buffer.sizeで設定された構成のデフォルト値をオーバーライドします。

FileSystem.open(path, buffersize)を介してストリーム固有のバッファサイズを設定できるすべてのファイルシステムクライアントによってサポートされています。

オプション: fs.option.openfile.read.policy

入力ストリームの読み取りポリシーを宣言します。これは、入力ストリームの予想される読み取りパターンについてのヒントです。これにより、先読み、バッファリング、その他の最適化が制御されるMAYがあります。

シーケンシャル読み取りは、データをプリフェッチしたり、より大きなブロックでデータを読み取ったりすることで最適化できます。一部のアプリケーション(例: distCp)は、列指向データ上でもシーケンシャルIOを実行します。

対照的に、ランダムIOは、seek()/read()のシーケンスを使用するか、PositionedReadableまたはByteBufferPositionedReadable APIを使用して、ファイルの異なる部分のデータを読み取ります。

プリフェッチがほとんど/全く行われない場合、ランダムIOのパフォーマンスは最適になる可能性があり、その他の最適化も可能です。

Apache ORCやApache Parquetなどの列指向形式に対するクエリは、このようなランダムIOを実行します。他のデータ形式は、シーケンシャルまたはファイル全体のポリシーで読み取ると最適な場合があります。

重要なのは、シーケンシャル読み取りの読み取りを最適化すると、ランダムパフォーマンスが低下する可能性があることです。そしてその逆も同様です。

  1. シークポリシーはヒントです。must()オプションとして宣言された場合でも、ファイルシステムはそれを無視するMAYがあります。
  2. ポリシーの解釈/実装はファイルシステム固有の動作であり、Hadoopのリリースや特定のストレージサブシステムによって変更される可能性があります。
  3. ポリシーが認識されない場合、ファイルシステムクライアントはMUSTそれを無視します。
ポリシー 意味
adaptive ストアによって実装された適応型ポリシー。
default このストアのデフォルトポリシー。一般的には「adaptive」。
random ランダムアクセス用に最適化。
sequential シーケンシャルアクセス用に最適化。
vector ベクトル化IO APIを使用することを意図しています。
whole-file ファイル全体が読み取られます。

入力ソースに間違った読み取りポリシーを選択すると、非効率になる可能性があります。

読み取りポリシーのリストを指定するMAYがあります。ファイルシステムによって認識/サポートされる最初のポリシーが使用されます。これにより、カスタムポリシー(例: HBase HFileに最適化されたhbase-hfileポリシー)をサポートできます。

S3AとABFSの入力ストリームは両方ともIOStatisticsSource APIを実装しており、IOパフォーマンスについて問い合わせることができます。

ヒント: DEBUGで入力ストリームのtoString()値をログに記録します。S3AとABFSの入力ストリームは読み取り統計をログに記録するため、読み取りが効率的に実行されているかどうかについての洞察を得ることができます。

さらに読む

読み取りポリシーadaptive

アプリケーションの読み取りパターンにシークポリシーを適応させるように試みます。

S3Aクライアントのnormalポリシーとwasb:クライアントによってサポートされる唯一のポリシーは両方とも適応型です。それらはシーケンシャルIOを想定していますが、後方シーク/位置指定読み取り呼び出しが行われると、ストリームはランダムIOに切り替わります。

他のファイルシステムの実装では、同様の戦略を採用したり、前方シークを検出するアルゴリズムを拡張したり、より効率的とみなされる場合はランダムIOからシーケンシャルIOに切り替えたりすることができます。

適応型読み取りポリシーは、open()APIでシークポリシーを宣言する機能がないことを意味するため、構成可能であれば、クラスタ/アプリケーションの構成で宣言する必要があります。ただし、シーケンシャルシークポリシーからランダムシークポリシーへの切り替えはコストがかかる可能性があります。

アプリケーションが読み取り計画を知っている場合、fs.option.openfile.read.policyオプションを明示的に設定するときは、最も適切なポリシーを宣言するSHOULD必要があります。

読み取りポリシー ``

ファイルシステムインスタンスのデフォルトポリシー。実装/インストール固有。

読み取りポリシーsequential

最初のバイトからファイルの最後まで/ストリームが閉じられるまで、シーケンシャル読み取りを期待します。

読み取りポリシーrandom

seek()/read()シーケンス、またはPositionedReadableまたはByteBufferPositionedReadable APIの使用を期待します。

読み取りポリシーvector

これは、呼び出し元がHADOOP-11867 Add a high-performance vectored read APIのベクトル化読み取りAPIを使用することを意図していることを宣言します。

これはヒントです。APIを使用する場合の必須事項ではありません。これは、そのような機能が実装されている場合、ストリームを最適なベクトル化IOパフォーマンスに構成する必要があることを実装に通知します。

排他的ではありません。同じストリームを従来のInputStreamPositionedRead API呼び出しにも引き続き使用できます。実装は、これらの操作でrandom読み取りポリシーを使用するSHOULD必要があります。

読み取りポリシーwhole-file

これは、ファイルを最初から最後まで読み込むことを宣言します。ファイルシステムクライアントは、パフォーマンスを最大化するあらゆる戦略を自由に有効化できます。特に、範囲の広い読み取り/GETは、ソケット/TLSのセットアップコストを削減し、TCPフロー制御が最適なダウンロードレートを決定するのに十分な長さの接続を提供することにより、高い帯域幅を実現できます。

戦略には以下が含まれます。

  • openFile()操作でファイル全体のHTTP GETを開始します。
  • データを大きなブロックで、できれば並列読み取り操作でプリフェッチします。

開かれたストリームからファイル全体を読み込むことを知っているアプリケーションは、この読み取りポリシーを宣言する必要があります。

オプション: fs.option.openfile.length

ファイルの長さを宣言します。

これは、クライアントがファイルをオープンする際に、リモートストアにファイルのサイズ/存在を問い合わせる手順をスキップするために使用できます。これは、withFileStatus()オプションによるファイルステータスの宣言と同様です。

ファイルシステムコネクタでサポートされている場合、このオプションはファイルの最小長を宣言するものとして解釈する必要があります。

  1. 値が負の場合、オプションは設定されていないものとみなされます。
  2. ファイルの実際の長さがこの値よりも大きい場合でも、エラーとはなりません。
  3. read()seek()、および位置指定された読み取り呼び出しでは、この長さ以上でファイルの実際の長さ以下の位置を使用できます。実装では、このような場合にEOFExceptionsを発生させることも、データ返すこともできます。

ファイルシステム実装でこのオプションが使用されている場合

実装者向けノート

  • fs.option.openfile.length < 0 の値は無視する必要があります。
  • fs.opt.openfile.lengthに値と共にファイルステータスが提供されている場合、ファイルステータス値が優先されます。

オプション: fs.option.openfile.split.start および fs.option.openfile.split.end

ファイルが部分的に処理するために分割されている場合、分割の開始と終了を宣言します。

  1. 値が負の場合、オプションは設定されていないものとみなされます。
  2. ファイルシステムは、ファイルの長さがfs.option.openfile.split.endの値以上であると仮定できます。
  3. また、クライアントアプリケーションがfs.option.openfile.split.endに設定された値を超えて読み取る場合、例外を発生させる場合があります。
  4. このオプションのペアを使用して、読み取り計画を最適化できます。たとえば、GETリクエストのコンテンツ範囲を設定したり、分割終了をファイルの保証された最小長の暗黙的な宣言として使用したりできます。
  5. 両方のオプションが設定されていて、分割開始が分割終了よりも大きいと宣言されている場合、操作を拒否するのではなく、分割開始を0にリセットする必要があります。

分割終了値は、入力ストリームの終了に関するヒントを提供できます。分割開始を使用して、ファイルシステムクライアントの初期読み取りオフセットを最適化できます。

実装者向けノート: アプリケーションは、分割の終了前に始まるレコード/行の最後まで読み取る必要がある場合、分割の終了を超えて読み取ります。

したがって、ファイルが実際にその値よりも長い場合、クライアントはfs.option.openfile.split.endに設定された長さよりも先にseek()/read()を行うことができます。

S3A固有のオプション

S3Aコネクタは、リードアヘッドとシークポリシーのカスタムオプションをサポートしています。

名前 意味
fs.s3a.readahead.range long バイト単位のリードアヘッド範囲
fs.s3a.experimental.input.fadvise String シークポリシー。fs.option.openfile.read.policyによって置き換えられました。
fs.s3a.input.async.drain.threshold long ストリームの非同期ドレインに切り替えるための閾値(3.3.5以降)

オプションセットにfs.s3a.select.sqlステートメントにSQLステートメントが含まれている場合、ファイルはS3 Selectクエリとして開かれます。詳細については、S3Aドキュメントを参照してください。

ABFS固有のオプション

ABFSコネクタは、カスタム入力ストリームオプションをサポートしています。

名前 意味
fs.azure.buffered.pread.disable boolean 位置指定された読み取り操作でのキャッシングを無効にします。

PositionedReadable APIを介して読み取られたデータのキャッシングを無効にします。

詳細については、ABFSドキュメントを参照してください。

ファイルを開く際のシークポリシーと分割制限の宣言

これは、(null許容)ファイルステータスと分割開始/終了を使用する概念実証org.apache.parquet.hadoop.util.HadoopInputFileリーダーの例です。

FileStatus値は常に渡されますが、nullの場合、分割終了を使用してファイルの長さを宣言します。

protected SeekableInputStream newStream(Path path, FileStatus stat,
     long splitStart, long splitEnd)
     throws IOException {

   FutureDataInputStreamBuilder builder = fs.openFile(path)
   .opt("fs.option.openfile.read.policy", "vector, random")
   .withFileStatus(stat);

   builder.optLong("fs.option.openfile.split.start", splitStart);
   builder.optLong("fs.option.openfile.split.end", splitEnd);
   CompletableFuture<FSDataInputStream> streamF = builder.build();
   return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
}

その結果、ファイル一覧によって直接駆動される場合でも、(path, splitStart, splitEnd)のクエリプランからファイルを開く場合でも、ファイルの長さを調べるためにリモートストアをプローブする必要はありません。リモートオブジェクトストアを使用する場合、このようなプローブが非同期で行われたとしても、数十から数百ミリ秒の時間を節約できます。

ファイルの長さと分割終了の両方が設定されている場合、ファイルの長さは「より」権威があるとみなされます。つまり、ファイルの長さを実際に定義する必要があります。分割終了が設定されている場合、呼び出し元はそれを超えて読み取ることができません。

CompressedSplitLineReaderは、圧縮されたレコードを処理中に途中である場合、分割の終了を超えて読み取ることができます。つまり、不完全なレコードの読み取りは、ファイルの長さが分割長よりも大きく、部分的に読み取られたレコード全体を読み取る必要があることを意味すると仮定します。他のリーダーも同様に動作する場合があります。

したがって

  1. FileStatusまたはfs.option.openfile.lengthで提供されるファイルの長さは、ファイルの長さの厳格な上限を設定する必要があります。
  2. fs.option.openfile.split.endに設定された分割終了は、ファイルの厳格な終了ではなく、ヒントとして見なす必要があります。

標準オプションと非標準オプションの両方を使用してファイルを開く

標準オプションと非標準オプションは、同じopenFile()操作で組み合わせることができます。

Future<FSDataInputStream> f = openFile(path)
  .must("fs.option.openfile.read.policy", "random, adaptive")
  .opt("fs.s3a.readahead.range", 1024 * 1024)
  .build();

FSDataInputStream is = f.get();

must()で設定されたオプションは、すべてのファイルシステムで理解されるか、少なくとも認識されて無視される必要があります。この例では、S3A固有のオプションは、他のすべてのファイルシステムクライアントによって無視される場合があります。

古いリリースでファイルを開く

すべてのHadoopリリースがfs.option.openfile.read.policyオプションを認識するわけではありません。

opt()ビルダー引数を使用して追加された場合、アプリケーションコードでオプションを安全に使用できます。これは、不明なオプションキーとして扱われ、破棄されるためです。

Future<FSDataInputStream> f = openFile(path)
  .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
  .build();

FSDataInputStream is = f.get();

注記1: オプション名がorg.apache.hadoop.fs.Options.OpenFileOptionsの定数への参照によって設定されている場合、プログラムは、特定のオプションのないバージョンのHadoopに対してリンクされません。したがって、古いリリースに対する堅牢なリンクのために、値のコピーを使用してください。

注記2: オプションの検証はファイルシステムコネクタで行われるため、複数のHadoopバージョンで動作するように設計されたサードパーティコネクタは、オプションをサポートしない場合があります。

MapReduceへのオプションの渡す

Hadoop MapReduceは、接頭辞mapreduce.job.input.file.option.mapreduce.job.input.file.must.が付いたMRジョブオプションを自動的に読み取り、mapreduce固有の接頭辞を削除した後、それぞれ.opt()must()として適用します。

これにより、MRジョブへのオプションの渡しが簡単になります。たとえば、ジョブがランダムIOを使用してデータを処理することを宣言するには

JobConf jobConf = (JobConf) job.getConfiguration()
jobConf.set(
    "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
    "random");

MapReduce入力形式によるオプションの伝播

レコードリーダーが、開いているファイルにオプションを渡す例です。

  public void initialize(InputSplit genericSplit,
                     TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit)genericSplit;
    Configuration job = context.getConfiguration();
    start = split.getStart();
    end = start + split.getLength();
    Path file = split.getPath();

    // open the file and seek to the start of the split
    FutureDataInputStreamBuilder builder =
      file.getFileSystem(job).openFile(file);
    // the start and end of the split may be used to build
    // an input strategy.
    builder.optLong("fs.option.openfile.split.start", start);
    builder.optLong("fs.option.openfile.split.end", end);
    FutureIO.propagateOptions(builder, job,
        "mapreduce.job.input.file.option",
        "mapreduce.job.input.file.must");

    fileIn = FutureIO.awaitFuture(builder.build());
    fileIn.seek(start)
    /* Rest of the operation on the opened stream */
  }

FileContext.openFile

org.apache.hadoop.fs.AvroFSInputから; ファイルはシーケンシャル入力で開かれます。ファイルの長さは既にプローブされているため、長さは渡されます。

  public AvroFSInput(FileContext fc, Path p) throws IOException {
    FileStatus status = fc.getFileStatus(p);
    this.len = status.getLen();
    this.stream = awaitFuture(fc.openFile(p)
        .opt("fs.option.openfile.read.policy",
            "sequential")
        .optLong("fs.option.openfile.length",
            Long.toString(status.getLen()))
        .build());
    fc.open(p);
  }

この例では、長さはLong.toString()を介してlongとしてではなく文字列として渡されます。これは、入力形式が、opt(String, long)およびmust(String, long)ビルダーパラメーターを持たないバージョンの$Hadoopに対してリンクするようにするためです。同様に、値はオプションとして渡されるため、認識されない場合でもアプリケーションは成功します。

例:ファイル全体の読み取り

これはorg.apache.hadoop.util.JsonSerializationからです。

そのload(FileSystem, Path, FileStatus)メソッドは、*ファイル全体を最初から最後まで読み込むことを宣言します。*ファイルステータスを渡します。

public T load(FileSystem fs,
        Path path,
        status)
        throws IOException {

 try (FSDataInputStream dataInputStream =
          awaitFuture(fs.openFile(path)
              .opt("fs.option.openfile.read.policy", "whole-file")
              .withFileStatus(status)
              .build())) {
   return fromJsonStream(dataInputStream);
 } catch (JsonProcessingException e) {
   throw new PathIOException(path.toString(),
       "Failed to read JSON file " + e, e);
 }
}