クラス org.apache.hadoop.fs.FSDataInputStream

クラス FSDataInputStream extends DataInputStream

FSDataInputStream のコアとなる動作は、java.io.DataInputStream によって定義されており、システムに重要な前提条件を追加する拡張機能が含まれています。

  1. ソースはローカルまたはリモートのファイルシステムです。
  2. 読み取られるストリームは、有限のバイト配列を参照します。
  3. データの長さは、読み取りプロセス中に変更されません。
  4. データの内容は、プロセス中に変更されません。
  5. ソースファイルは、読み取りプロセス中に存在し続けます。
  6. 呼び出し元は、Seekable.seek() を使用してバイト配列内のオフセットにシークできます。これ以降の読み取りはこのオフセットから開始されます。
  7. 前方シークと後方シークのコストは低いです。
  8. ストリーム実装がスレッドセーフである必要はありません。
  9. ただし、ストリームが PositionedReadable を実装する場合、「ポジショナルリード」はスレッドセーフである必要があります。

ファイルは FileSystem.open(p) を介して開かれ、成功した場合、

result = FSDataInputStream(0, FS.Files[p])

ストリームは次のようにモデル化できます

FSDIS = (pos, data[], isOpen)

アクセス関数を使用

pos(FSDIS)
data(FSDIS)
isOpen(FSDIS)

暗黙の不変条件: データストリームのサイズは、FileSystem.getFileStatus(Path p) によって返されるファイルのサイズと等しくなります

forall p in dom(FS.Files[p]) :
len(data(FSDIS)) == FS.getFileStatus(p).length

Closeable.close()

java.io.Closeable のセマンティクスは、JRE 内のインターフェース定義で定義されています。

操作は冪等でなければなりません。次のシーケンスはエラーではありません

FSDIS.close();
FSDIS.close();

実装に関する注意事項

  • 実装は、障害に対して堅牢である必要があります。内部ストリームが閉じている場合は、最初に null であるかどうかを確認する必要があります。

  • 実装は、この操作中に IOException 例外(またはその他の例外)を発生させてはいけません。クライアントアプリケーションは、これらを無視したり、予期せず失敗したりすることがよくあります。

事後条件

FSDIS' = ((undefined), (undefined), False)

Seekable.getPos()

現在の位置を返します。ストリームが閉じている場合の結果は未定義です。

前提条件

isOpen(FSDIS)

事後条件

result = pos(FSDIS)

InputStream.read()

現在の位置のデータを返します。

  1. ストリームが閉じている場合、実装は失敗する必要があります。
  2. read() の完了にかかる時間に制限はありません。

前提条件

isOpen(FSDIS)

事後条件

if ( pos < len(data) ):
   FSDIS' = (pos + 1, data, True)
   result = data[pos]
else
    result = -1

InputStream.read(buffer[], offset, length)

オフセット offset から始まる宛先バッファに length バイトのデータを読み取ります。データのソースは、pos に暗黙的に設定されているストリームの現在の位置です。

前提条件

isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
pos >= 0 else raise EOFException, IOException

前提条件の失敗時に発生する可能性のある例外は次のとおりです。

InvalidArgumentException
ArrayIndexOutOfBoundsException
RuntimeException

すべてのファイルシステムが isOpen 状態をチェックするわけではありません。

事後条件

if length == 0 :
  result = 0

else if pos > len(data):
  result = -1

else
  let l = min(length, len(data)-length) :
    buffer' = buffer where forall i in [0..l-1]:
       buffer'[o+i] = data[pos+i]
    FSDIS' = (pos+l, data, true)
    result = l

java.io APIでは、読み取るデータ量(つまり、length)が指定されている場合、使用可能なデータ量がゼロを超えるまで、つまりデータがいくつかあるまで、呼び出しはブロックする必要があると規定されています。呼び出しは、バッファがいっぱいになったとき、またはストリームにデータがなくなるまでブロックする必要はありません。

つまり、l が単に min(length, len(data)-length) として定義されるのではなく、厳密には 1..min(length, len(data)-length) の範囲の整数です。呼び出し元はバッファのできるだけ多くの部分が埋められることを期待するかもしれませんが、実装が常に小さい数、おそらく1バイトだけを返すことは仕様の範囲内です。

重要なのは、宛先バッファサイズが0でない限り、少なくとも1バイトが返されるまで呼び出しがブロックする必要があるということです。したがって、長さ0を超えるデータソースの場合、この read() 操作を繰り返し呼び出すと、最終的にすべてのデータが読み取られます。

Seekable.seek(s)

前提条件

すべてのサブクラスがシーク操作を実装しているわけではありません

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

操作がサポートされている場合、ファイルは開いている必要があります

isOpen(FSDIS)

一部のファイルシステムは、このチェックを実行せず、閉じたストリームの読み取りを拒否する read() コントラクトに依存しています(例:RawLocalFileSystem)。

seek(0) は常に成功する必要があります。シーク位置は正で、ストリームの長さよりも小さくなければならないためです。

s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]

一部のファイルシステムは、この条件が満たされていない場合でも例外を発生させません。代わりに、読み取り時に len(data(FSDIS)) < pos(FSDIS) である read() 操作で-1を返します。

シークに失敗した後、pos(FSDIS) の値が変更される場合があります。例として、EOFを超えてシークすると、読み取り位置がファイルの最後に移動するだけでなく、EOFException が発生する可能性があります。

事後条件

FSDIS' = (s, data, True)

暗黙の不変条件があります。現在の位置へのシークは操作なしです

seek(getPos())

実装はこの操作を認識し、他のすべての前提条件チェックをバイパスして、入力ストリームを変更しないままにすることができます。

最新のオブジェクトストアコネクタはすべて、ある種の「レイジーシーク」を実装しています。seek() 呼び出しはストリームを更新しているように見え、getPos() の値は更新されますが、データが実際に読み取られるまでファイルは開かれず/再び開かれません。レイジーシークの実装では、ファイルの既知の長さに対して新しいシーク位置を検証する必要があります。ただし、ファイルの状態(つまり、ファイルが存在するかどうか、現在の長さ)をこの時点で更新する必要はありません。ファイルが削除または切り捨てられたという事実は、その read() 呼び出しまで表面化しない場合があります。

Seekable.seekToNewSource(offset)

この操作は、ソースに対して、現在のソースとは異なるソースから data[] を取得するように指示します。これは、ファイルシステムがファイルの複数のレプリカをサポートし、オフセット offset にデータのレプリカが複数存在する場合にのみ関連します。

前提条件

すべてのサブクラスがこの操作を実装しているわけではなく、例外を発生させるか、False を返すサブクラスもあります。

supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]

例: CompressionInputStreamHttpFSFileSystem

サポートされている場合、ファイルは開いている必要があります

isOpen(FSDIS)

事後条件

この操作を実装しないサブクラスの大部分は、単に失敗します。

if not supported(FSDIS, Seekable.seekToNewSource(s)):
    result = False

例: RawLocalFileSystemHttpFSFileSystem

操作がサポートされており、データの新しい場所がある場合

    FSDIS' = (pos, data', true)
    result = True

新しいデータは元のデータ(または以下の整合性セクションで説明されているように、更新されたバージョン)ですが、offset にデータを含むブロックは異なるレプリカから取得されます。

他にコピーがない場合、FSDIS は更新されません。レスポンスはこのことを示します

    result = False

テストメソッド以外では、このメソッドは主に {{FSInputChecker}} クラスで使用されます。このクラスは、読み取り中のチェックサムエラーに対して、他の場所からデータを取得しようと試みることで対応できます。新しいソースが見つかった場合、ファイルのその部分を再読み込みして再チェックしようとします。

CanUnbuffer.unbuffer()

この操作は、ソースに対して、現在保持しているシステムリソース(バッファ、ソケット、ファイル記述子など)をすべて解放するように指示します。後続のIO操作では、これらのリソースを再取得する必要がある可能性があります。バッファの解放は、ストリームを開いたままにする必要があるが、近い将来にストリームからのIO操作が予想されない状況(ファイルハンドルキャッシュなど)で役立ちます。

前提条件

すべてのサブクラスがこの操作を実装しているわけではありません。CanUnbuffer を実装することに加えて、サブクラスは StreamCapabilities インターフェースを実装する必要があり、StreamCapabilities.hasCapability(UNBUFFER) は true を返す必要があります。サブクラスが CanUnbuffer を実装しているが、StreamCapabilities を介して機能を報告しない場合、unbuffer の呼び出しは何もしません。サブクラスが UNBUFFER を実装すると報告しているが、CanUnbuffer インターフェースを実装していない場合、UnsupportedOperationException がスローされます。

supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)

このメソッドはスレッドセーフではありません。read の実行中に unbuffer が呼び出された場合、結果は未定義です。

unbuffer は閉じられたファイルに対して呼び出すことができます。この場合、unbuffer は何もしません。

事後条件

この操作を実装しないサブクラスの大部分は、単に何もしません。

操作がサポートされている場合、unbuffer はストリームに関連付けられたすべてのシステムリソースを解放します。 これらのリソースの正確なリストは、一般的には実装に依存しますが、一般的には、バッファ、ソケット、ファイル記述子などが含まれる場合があります。

インターフェース PositionedReadable

PositionedReadable 操作は、「位置指定読み取り」(「pread」)を提供します。データストリーム内の特定の位置からバッファにデータを読み取る機能を提供します。位置指定読み取りは、Seekable.seek を特定のオフセットで実行し、その後に InputStream.read(buffer[], offset, length) を実行することと同じですが、seekread の代わりに、メソッドの呼び出しが1回だけ行われ、2つの位置指定読み取りは、FSDataInputStream ストリームの単一のインスタンス上で*オプションで*並行して実行できます。

インターフェースは、位置指定読み取りをスレッドセーフとして宣言します(実装の一部はこの保証に従っていません)。

ストリーム操作(Seekable.seekSeekable.getPos()InputStream.read() など)と並行して実行される位置指定読み取りは、分離して実行する*必要*があります。相互干渉があってはなりません。

並行した位置指定読み取りとストリーム操作は、直列化可能である*必要*があります。スループットと「liveness」を向上させるために、並行して実行*する必要があります*が、一方が他方をブロックして直列に実行される場合があります。

pos1len1 のバッファ dest1 への並列位置指定読み取りと、pos2len2 のバッファ dest2 への別の並列位置指定読み取り、および pos3 へのシーク後に実行される並列ストリーム読み取りが与えられた場合、結果のバッファは、基になるストリームで読み取りが重複する場合でも、次のように埋められる*必要*があります

// Positioned read #1
read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] =
  [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1]

// Positioned read #2
read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] =
  [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1]

// Stream read
seek(pos3);
read(dest3, ... len3) -> dest3[0..len3 - 1] =
  [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]

実装はアトミックである必要はありません。操作の中間状態(getPos() の値の変更)が表示される場合があります。

実装の前提条件

すべての FSDataInputStream 実装がこれらの操作をサポートしているわけではありません。Seekable.seek() を実装していない実装は、PositionedReadable インターフェースを実装していません。

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

これは明らかであると考えられます。ストリームが Seekable でない場合、クライアントは場所にシークできません。また、Seekable.seek() を使用する基底クラスの実装の副作用でもあります。

**暗黙の不変条件**: すべての PositionedReadable 操作について、pos の値は操作の最後に変更されません

pos(FSDIS') == pos(FSDIS)

失敗状態

失敗した操作について、宛先 buffer の内容は未定義です。実装では、障害を報告する前にバッファの一部またはすべてを上書きする場合があります。

int PositionedReadable.read(position, buffer, offset, length)

割り当てられたバッファスペースにできるだけ多くのデータを読み取ります。

前提条件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
length >= 0
offset >= 0

事後条件

読み取られるデータ量は、長さまたは指定された位置から利用可能なデータ量の少ない方です

let available = min(length, len(data)-position)
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available
  1. 戻り値 -1 は、ストリームに利用可能なデータがなくなったことを意味します。
  2. length==0 の呼び出しは、暗黙的にデータを読み取りません。実装では、操作をショートカットしてIOを省略する場合があります。このような場合、ファイルの終わりにあるストリームのチェックは省略される場合があります。
  3. 読み取り操作中にIO例外が発生した場合、buffer の最終状態は未定義です。

void PositionedReadable.readFully(position, buffer, offset, length)

バッファに正確に length バイトのデータを読み込み、利用可能なデータが足りない場合は失敗します。

前提条件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
length >= 0
offset >= 0
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
(position + length) <= len(data) else raise [EOFException, IOException]

読み取り操作中にIO例外が発生した場合、buffer の最終状態は未定義です。

リクエストを満たすのに十分なデータが入力ストリームにない場合、buffer の最終状態は未定義です。

事後条件

オフセット offset からのバッファは、position から始まるデータで埋められます

buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]

PositionedReadable.readFully(position, buffer)

これの意味は、次のものとまったく同じです

readFully(position, buffer, 0, len(buffer))

つまり、バッファは位置 position からの入力ソースの内容で完全に埋められます

default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)

範囲のリストのデータを非同期に完全に読み取ります。デフォルトの実装では、範囲を反復処理し、`minSeekForVectorReads` と `maxReadSizeForVectorReads` の値に基づいて範囲を結合しようと試み、次に各マージされた範囲を同期的に読み取りますが、サブクラスは効率的な実装を実装することを意図しています。ダイレクトバイトバッファとヒープバイトバッファの両方での読み取りがサポートされています。また、クライアントは、バッファの割り当てに `WeakReferencedElasticByteBufferPool` を使用することをお勧めします。これにより、ダイレクトバッファも参照されなくなったときにガベージコレクションされます。

`readVectored()` の後に `getPos()` によって返される位置は未定義です。

`readVectored()` 操作の実行中にファイルが変更された場合、出力は未定義です。一部の範囲には古いデータが含まれている場合があり、一部には新しいデータが含まれている場合があり、一部には両方が含まれている場合がああります。

`readVectored()` 操作の実行中は、通常の読み取りAPI呼び出しがブロックされる場合があります。

注:HADOOP-18296で説明されているように、メモリ фрагментацияにつながる可能性があるため、ChecksumFileSystemからの読み取りにダイレクトバッファを使用しないでください。

前提条件

要求された各範囲について

range.getOffset >= 0 else raise IllegalArgumentException
range.getLength >= 0 else raise EOFException

事後条件

要求された各範囲について

range.getData() returns CompletableFuture<ByteBuffer> which will have data
from range.getOffset to range.getLength.

minSeekForVectorReads()

妥当な最小シーク。最初の範囲の終わりと次の範囲の始まりの差がこの値よりも大きい場合、2つの範囲はマージされません。

maxReadSizeForVectorReads()

範囲をマージした後に一度に読み取ることができる最大バイト数。読み取るデータの合計がこの値を超える場合、2つの範囲はマージされません。基本的にこれを0に設定すると、範囲のマージが無効になります。

整合性

  • FileSystem.open(p) から提供されるデータストリームFSDISのすべてのリーダー(ローカルとリモート)は、開く時点で FS.Files[p] のデータへのアクセスを受け取ることが期待されます。
  • 読み取りプロセス中に基になるデータが変更された場合、これらの変更は表示される場合と表示されない場合があります。
  • 表示される変更は、部分的に表示される場合があります。

時刻 t0 に

FSDIS0 = FS'read(p) = (0, data0[])

時刻 t1 に

FS' = FS' where FS'.Files[p] = data1

時刻 t >= t1 から、FSDIS0 の値は未定義です。

変更されない場合があります

FSDIS0.data == data0

forall l in len(FSDIS0.data):
  FSDIS0.read() == data0[l]

新しいデータを取得する場合があります

FSDIS0.data == data1

forall l in len(FSDIS0.data):
  FSDIS0.read() == data1[l]

オフセットの読み取りがいずれかのデータセットからのデータを返すなど、不整合になる場合があります

forall l in len(FSDIS0.data):
  (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))

つまり、読み取られるすべての値は、元のファイルまたは更新されたファイルからのものです。

また、同じオフセットの繰り返し読み取りで不整合になる場合もあります。つまり、時刻 t2 > t1

r2 = FSDIS0.read(l)

時刻 t3 > t2

r3 = FSDIS0.read(l)

r3 != r2 となる場合があります。(つまり、データの一部がキャッシュまたは複製され、後続の読み取りで、ファイルの内容の異なるバージョンが返される場合があります)。

同様に、パス p のデータが削除された場合、この変更は FSDIS0 で実行される読み取り操作中に表示される場合と表示されない場合があります。