org.apache.hadoop.fs.FSDataInputStream
FSDataInputStream extends DataInputStream
FSDataInputStream
のコアとなる動作は、java.io.DataInputStream
によって定義されており、システムに重要な前提条件を追加する拡張機能が含まれています。
Seekable.seek()
を使用してバイト配列内のオフセットにシークできます。これ以降の読み取りはこのオフセットから開始されます。ファイルは FileSystem.open(p)
を介して開かれ、成功した場合、
result = FSDataInputStream(0, FS.Files[p])
ストリームは次のようにモデル化できます
FSDIS = (pos, data[], isOpen)
アクセス関数を使用
pos(FSDIS) data(FSDIS) isOpen(FSDIS)
暗黙の不変条件: データストリームのサイズは、FileSystem.getFileStatus(Path p)
によって返されるファイルのサイズと等しくなります
forall p in dom(FS.Files[p]) : len(data(FSDIS)) == FS.getFileStatus(p).length
Closeable.close()
java.io.Closeable
のセマンティクスは、JRE 内のインターフェース定義で定義されています。
操作は冪等でなければなりません。次のシーケンスはエラーではありません
FSDIS.close(); FSDIS.close();
実装は、障害に対して堅牢である必要があります。内部ストリームが閉じている場合は、最初に null
であるかどうかを確認する必要があります。
実装は、この操作中に IOException
例外(またはその他の例外)を発生させてはいけません。クライアントアプリケーションは、これらを無視したり、予期せず失敗したりすることがよくあります。
FSDIS' = ((undefined), (undefined), False)
Seekable.getPos()
現在の位置を返します。ストリームが閉じている場合の結果は未定義です。
isOpen(FSDIS)
result = pos(FSDIS)
InputStream.read()
現在の位置のデータを返します。
read()
の完了にかかる時間に制限はありません。isOpen(FSDIS)
if ( pos < len(data) ): FSDIS' = (pos + 1, data, True) result = data[pos] else result = -1
InputStream.read(buffer[], offset, length)
オフセット offset
から始まる宛先バッファに length
バイトのデータを読み取ります。データのソースは、pos
に暗黙的に設定されているストリームの現在の位置です。
isOpen(FSDIS) buffer != null else raise NullPointerException length >= 0 offset < len(buffer) length <= len(buffer) - offset pos >= 0 else raise EOFException, IOException
前提条件の失敗時に発生する可能性のある例外は次のとおりです。
InvalidArgumentException ArrayIndexOutOfBoundsException RuntimeException
すべてのファイルシステムが isOpen
状態をチェックするわけではありません。
if length == 0 : result = 0 else if pos > len(data): result = -1 else let l = min(length, len(data)-length) : buffer' = buffer where forall i in [0..l-1]: buffer'[o+i] = data[pos+i] FSDIS' = (pos+l, data, true) result = l
java.io
APIでは、読み取るデータ量(つまり、length
)が指定されている場合、使用可能なデータ量がゼロを超えるまで、つまりデータがいくつかあるまで、呼び出しはブロックする必要があると規定されています。呼び出しは、バッファがいっぱいになったとき、またはストリームにデータがなくなるまでブロックする必要はありません。
つまり、l
が単に min(length, len(data)-length)
として定義されるのではなく、厳密には 1..min(length, len(data)-length)
の範囲の整数です。呼び出し元はバッファのできるだけ多くの部分が埋められることを期待するかもしれませんが、実装が常に小さい数、おそらく1バイトだけを返すことは仕様の範囲内です。
重要なのは、宛先バッファサイズが0でない限り、少なくとも1バイトが返されるまで呼び出しがブロックする必要があるということです。したがって、長さ0を超えるデータソースの場合、この read()
操作を繰り返し呼び出すと、最終的にすべてのデータが読み取られます。
Seekable.seek(s)
すべてのサブクラスがシーク操作を実装しているわけではありません
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
操作がサポートされている場合、ファイルは開いている必要があります
isOpen(FSDIS)
一部のファイルシステムは、このチェックを実行せず、閉じたストリームの読み取りを拒否する read()
コントラクトに依存しています(例:RawLocalFileSystem
)。
seek(0)
は常に成功する必要があります。シーク位置は正で、ストリームの長さよりも小さくなければならないためです。
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
一部のファイルシステムは、この条件が満たされていない場合でも例外を発生させません。代わりに、読み取り時に len(data(FSDIS)) < pos(FSDIS)
である read()
操作で-1を返します。
シークに失敗した後、pos(FSDIS)
の値が変更される場合があります。例として、EOFを超えてシークすると、読み取り位置がファイルの最後に移動するだけでなく、EOFException
が発生する可能性があります。
FSDIS' = (s, data, True)
暗黙の不変条件があります。現在の位置へのシークは操作なしです
seek(getPos())
実装はこの操作を認識し、他のすべての前提条件チェックをバイパスして、入力ストリームを変更しないままにすることができます。
最新のオブジェクトストアコネクタはすべて、ある種の「レイジーシーク」を実装しています。seek()
呼び出しはストリームを更新しているように見え、getPos()
の値は更新されますが、データが実際に読み取られるまでファイルは開かれず/再び開かれません。レイジーシークの実装では、ファイルの既知の長さに対して新しいシーク位置を検証する必要があります。ただし、ファイルの状態(つまり、ファイルが存在するかどうか、現在の長さ)をこの時点で更新する必要はありません。ファイルが削除または切り捨てられたという事実は、その read()
呼び出しまで表面化しない場合があります。
Seekable.seekToNewSource(offset)
この操作は、ソースに対して、現在のソースとは異なるソースから data[]
を取得するように指示します。これは、ファイルシステムがファイルの複数のレプリカをサポートし、オフセット offset
にデータのレプリカが複数存在する場合にのみ関連します。
すべてのサブクラスがこの操作を実装しているわけではなく、例外を発生させるか、False
を返すサブクラスもあります。
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
例: CompressionInputStream
、HttpFSFileSystem
サポートされている場合、ファイルは開いている必要があります
isOpen(FSDIS)
この操作を実装しないサブクラスの大部分は、単に失敗します。
if not supported(FSDIS, Seekable.seekToNewSource(s)): result = False
例: RawLocalFileSystem
、HttpFSFileSystem
操作がサポートされており、データの新しい場所がある場合
FSDIS' = (pos, data', true) result = True
新しいデータは元のデータ(または以下の整合性セクションで説明されているように、更新されたバージョン)ですが、offset
にデータを含むブロックは異なるレプリカから取得されます。
他にコピーがない場合、FSDIS
は更新されません。レスポンスはこのことを示します
result = False
テストメソッド以外では、このメソッドは主に {{FSInputChecker}} クラスで使用されます。このクラスは、読み取り中のチェックサムエラーに対して、他の場所からデータを取得しようと試みることで対応できます。新しいソースが見つかった場合、ファイルのその部分を再読み込みして再チェックしようとします。
CanUnbuffer.unbuffer()
この操作は、ソースに対して、現在保持しているシステムリソース(バッファ、ソケット、ファイル記述子など)をすべて解放するように指示します。後続のIO操作では、これらのリソースを再取得する必要がある可能性があります。バッファの解放は、ストリームを開いたままにする必要があるが、近い将来にストリームからのIO操作が予想されない状況(ファイルハンドルキャッシュなど)で役立ちます。
すべてのサブクラスがこの操作を実装しているわけではありません。CanUnbuffer
を実装することに加えて、サブクラスは StreamCapabilities
インターフェースを実装する必要があり、StreamCapabilities.hasCapability(UNBUFFER)
は true を返す必要があります。サブクラスが CanUnbuffer
を実装しているが、StreamCapabilities
を介して機能を報告しない場合、unbuffer
の呼び出しは何もしません。サブクラスが UNBUFFER
を実装すると報告しているが、CanUnbuffer
インターフェースを実装していない場合、UnsupportedOperationException
がスローされます。
supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
このメソッドはスレッドセーフではありません。read
の実行中に unbuffer
が呼び出された場合、結果は未定義です。
unbuffer
は閉じられたファイルに対して呼び出すことができます。この場合、unbuffer
は何もしません。
この操作を実装しないサブクラスの大部分は、単に何もしません。
操作がサポートされている場合、unbuffer
はストリームに関連付けられたすべてのシステムリソースを解放します。 これらのリソースの正確なリストは、一般的には実装に依存しますが、一般的には、バッファ、ソケット、ファイル記述子などが含まれる場合があります。
PositionedReadable
PositionedReadable
操作は、「位置指定読み取り」(「pread」)を提供します。データストリーム内の特定の位置からバッファにデータを読み取る機能を提供します。位置指定読み取りは、Seekable.seek
を特定のオフセットで実行し、その後に InputStream.read(buffer[], offset, length)
を実行することと同じですが、seek
と read
の代わりに、メソッドの呼び出しが1回だけ行われ、2つの位置指定読み取りは、FSDataInputStream
ストリームの単一のインスタンス上で*オプションで*並行して実行できます。
インターフェースは、位置指定読み取りをスレッドセーフとして宣言します(実装の一部はこの保証に従っていません)。
ストリーム操作(Seekable.seek
、Seekable.getPos()
、InputStream.read()
など)と並行して実行される位置指定読み取りは、分離して実行する*必要*があります。相互干渉があってはなりません。
並行した位置指定読み取りとストリーム操作は、直列化可能である*必要*があります。スループットと「liveness」を向上させるために、並行して実行*する必要があります*が、一方が他方をブロックして直列に実行される場合があります。
pos1
で len1
のバッファ dest1
への並列位置指定読み取りと、pos2
で len2
のバッファ dest2
への別の並列位置指定読み取り、および pos3
へのシーク後に実行される並列ストリーム読み取りが与えられた場合、結果のバッファは、基になるストリームで読み取りが重複する場合でも、次のように埋められる*必要*があります
// Positioned read #1 read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] = [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1] // Positioned read #2 read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] = [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1] // Stream read seek(pos3); read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
実装はアトミックである必要はありません。操作の中間状態(getPos()
の値の変更)が表示される場合があります。
すべての FSDataInputStream
実装がこれらの操作をサポートしているわけではありません。Seekable.seek()
を実装していない実装は、PositionedReadable
インターフェースを実装していません。
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
これは明らかであると考えられます。ストリームが Seekable
でない場合、クライアントは場所にシークできません。また、Seekable.seek()
を使用する基底クラスの実装の副作用でもあります。
**暗黙の不変条件**: すべての PositionedReadable
操作について、pos
の値は操作の最後に変更されません
pos(FSDIS') == pos(FSDIS)
失敗した操作について、宛先 buffer
の内容は未定義です。実装では、障害を報告する前にバッファの一部またはすべてを上書きする場合があります。
int PositionedReadable.read(position, buffer, offset, length)
割り当てられたバッファスペースにできるだけ多くのデータを読み取ります。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] length >= 0 offset >= 0
読み取られるデータ量は、長さまたは指定された位置から利用可能なデータ量の少ない方です
let available = min(length, len(data)-position) buffer'[offset..(offset+available-1)] = data[position..position+available -1] result = available
length==0
の呼び出しは、暗黙的にデータを読み取りません。実装では、操作をショートカットしてIOを省略する場合があります。このような場合、ファイルの終わりにあるストリームのチェックは省略される場合があります。buffer
の最終状態は未定義です。void PositionedReadable.readFully(position, buffer, offset, length)
バッファに正確に length
バイトのデータを読み込み、利用可能なデータが足りない場合は失敗します。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] length >= 0 offset >= 0 len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] (position + length) <= len(data) else raise [EOFException, IOException]
読み取り操作中にIO例外が発生した場合、buffer
の最終状態は未定義です。
リクエストを満たすのに十分なデータが入力ストリームにない場合、buffer
の最終状態は未定義です。
オフセット offset
からのバッファは、position
から始まるデータで埋められます
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
PositionedReadable.readFully(position, buffer)
これの意味は、次のものとまったく同じです
readFully(position, buffer, 0, len(buffer))
つまり、バッファは位置 position
からの入力ソースの内容で完全に埋められます
default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
範囲のリストのデータを非同期に完全に読み取ります。デフォルトの実装では、範囲を反復処理し、`minSeekForVectorReads` と `maxReadSizeForVectorReads` の値に基づいて範囲を結合しようと試み、次に各マージされた範囲を同期的に読み取りますが、サブクラスは効率的な実装を実装することを意図しています。ダイレクトバイトバッファとヒープバイトバッファの両方での読み取りがサポートされています。また、クライアントは、バッファの割り当てに `WeakReferencedElasticByteBufferPool` を使用することをお勧めします。これにより、ダイレクトバッファも参照されなくなったときにガベージコレクションされます。
`readVectored()` の後に `getPos()` によって返される位置は未定義です。
`readVectored()` 操作の実行中にファイルが変更された場合、出力は未定義です。一部の範囲には古いデータが含まれている場合があり、一部には新しいデータが含まれている場合があり、一部には両方が含まれている場合がああります。
`readVectored()` 操作の実行中は、通常の読み取りAPI呼び出しがブロックされる場合があります。
注:HADOOP-18296で説明されているように、メモリ фрагментацияにつながる可能性があるため、ChecksumFileSystemからの読み取りにダイレクトバッファを使用しないでください。
要求された各範囲について
range.getOffset >= 0 else raise IllegalArgumentException range.getLength >= 0 else raise EOFException
要求された各範囲について
range.getData() returns CompletableFuture<ByteBuffer> which will have data from range.getOffset to range.getLength.
minSeekForVectorReads()
妥当な最小シーク。最初の範囲の終わりと次の範囲の始まりの差がこの値よりも大きい場合、2つの範囲はマージされません。
maxReadSizeForVectorReads()
範囲をマージした後に一度に読み取ることができる最大バイト数。読み取るデータの合計がこの値を超える場合、2つの範囲はマージされません。基本的にこれを0に設定すると、範囲のマージが無効になります。
FileSystem.open(p)
から提供されるデータストリームFSDISのすべてのリーダー(ローカルとリモート)は、開く時点で FS.Files[p]
のデータへのアクセスを受け取ることが期待されます。時刻 t0 に
FSDIS0 = FS'read(p) = (0, data0[])
時刻 t1 に
FS' = FS' where FS'.Files[p] = data1
時刻 t >= t1
から、FSDIS0
の値は未定義です。
変更されない場合があります
FSDIS0.data == data0 forall l in len(FSDIS0.data): FSDIS0.read() == data0[l]
新しいデータを取得する場合があります
FSDIS0.data == data1 forall l in len(FSDIS0.data): FSDIS0.read() == data1[l]
オフセットの読み取りがいずれかのデータセットからのデータを返すなど、不整合になる場合があります
forall l in len(FSDIS0.data): (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
つまり、読み取られるすべての値は、元のファイルまたは更新されたファイルからのものです。
また、同じオフセットの繰り返し読み取りで不整合になる場合もあります。つまり、時刻 t2 > t1
に
r2 = FSDIS0.read(l)
時刻 t3 > t2
に
r3 = FSDIS0.read(l)
r3 != r2
となる場合があります。(つまり、データの一部がキャッシュまたは複製され、後続の読み取りで、ファイルの内容の異なるバージョンが返される場合があります)。
同様に、パス p
のデータが削除された場合、この変更は FSDIS0
で実行される読み取り操作中に表示される場合と表示されない場合があります。