出力: OutputStreamSyncable、および StreamCapabilities

はじめに

このドキュメントでは、Hadoopファイルシステム仕様のコンテキスト内での出力ストリームについて説明します。

Hadoopファイルシステムのモデルで定義されたファイルシステムモデルと、表記法で定義された表記法を使用します。

対象読者は次のとおりです。1. APIのユーザー。java.io.OutputStreamは標準インターフェースですが、このドキュメントではHDFSなどでどのように実装されているかを明確にします。Hadoop固有のインターフェースであるSyncableStreamCapabilitiesは新しいものです。Syncableは、OutputStreamの保証を超える耐久性と可視性の保証を提供する点で注目に値します。 1. ファイルシステムとクライアントの実装者。

ファイルシステムへのデータの書き込み方法

HadoopファイルシステムAPIを通じてファイルにデータを書き込むためのコアメカニズムは、FileSystem.create()FileSystem.append()、またはFSDataOutputStreamBuilder.build()の呼び出しを通じて取得されるOutputStreamサブクラスを使用することです。

これらはすべてFSDataOutputStreamのインスタンスを返し、さまざまなwrite()メソッドを通じてデータを書き込むことができます。ストリームのclose()メソッドが呼び出された後、ストリームに書き込まれたすべてのデータは、ファイルシステムに永続化され、FileSystem.open()を介してそのパスからデータを読み取ろうとする他のすべてのクライアントに可視化される必要があります。

データの書き込み操作に加えて、HadoopのOutputStream実装は、バッファリングされたデータをファイルシステムにフラッシュして、データが確実に永続化され、他の呼び出し元に可視化されるようにするためのメソッドを提供します。これは、Syncableインターフェースを介して行われます。もともと、このインターフェースの存在は、ストリームがそのメソッドをサポートしているという保証として解釈できることが意図されていました。ただし、インターフェースの静的な性質が、ストア/パスに基づいて同期能力セマンティクスが異なる可能性のあるファイルシステムと互換性がないため、これを保証することは不可能であることが証明されています。例として、HDFS内のイレイジャーコーディングされたファイルは、Syncableである出力ストリームのサブクラスとして実装されていても、同期操作をサポートしていません。

新しいインターフェース: StreamCapabilities。これにより、呼び出し元は、ストリームのチェーンを介してでも、ストリームの正確な機能をプローブできます。

出力ストリームモデル

この仕様では、出力ストリームはクライアントに保存されたバイトのリストと見なすことができます。hsync()hflush()は、ファイルの他の読み取り元に表示されるように、データを伝播するアクションであり、/または永続化するアクションです。

buffer: List[byte]

フラグ、openは、ストリームが開いているかどうかを追跡します。ストリームが閉じられた後、それ以上データを書き込むことはできません

open: bool
buffer: List[byte]

ストリームの宛先パスであるpathは、トリプルpath、open、bufferを形成するために追跡できます

Stream = (path: Path, open: Boolean, buffer: byte[])

フラッシュされたデータの可視性

ファイルシステムにデータをフラッシュするSyncable操作の(直後)、ストリームの宛先パスにあるデータは、bufferのデータと一致する必要があります。つまり、次の条件を満たす必要があります

FS'.Files(path) == buffer

パスのデータを読み取るクライアントは、新しいデータを表示する必要があります。Syncable操作は、データの可視性ではなく、耐久性の保証が異なります。

Filesystem.create()後のストリームとファイルシステムの状態

ファイルシステムFS内のFileSystem.create(path)またはFileSystem.createFile(path).build()によって返される出力ストリームは、データのない空の配列を含むトリプルとしてモデル化できます

Stream' = (path, true, [])

ファイルシステムFS'には、パスに0バイトのファイルが含まれている必要があります

FS' = FS where data(FS', path) == []

したがって、Stream'.bufferの初期状態は、ファイルシステムのデータと暗黙的に一致します。

オブジェクトストア: 下の「オブジェクトストア」セクションの注意事項を参照してください。

Filesystem.append()後のストリームとファイルシステムの状態

ファイルシステムFS内のFileSystem.append(path, buffersize, progress)の呼び出しから返される出力ストリームは、そのbufferが元のファイルのbufferに初期化されるストリームとしてモデル化できます

Stream' = (path, true, data(FS, path))

データの永続化

ストリームが、サポートされているフラッシュ操作、close()操作、またはストリームが選択するその他のタイミングでデータをストアに書き戻す場合、ファイルの内容は現在のバッファーで置き換えられます

Stream' = (path, true, buffer)
FS' = FS where data(FS', path) == buffer

close()の呼び出し後、ストリームはclose()以外のすべての操作で閉じられます。これらの操作は、IOExceptionまたはRuntimeExceptionで失敗する可能性があります。

Stream' = (path, false, [])

close()操作は、最初の呼び出しで行われたデータ書き込みの唯一の試行でべき等である必要があります。

  1. close()が成功した場合、後続の呼び出しはno-opです。
  2. close()が失敗した場合も、後続の呼び出しはno-opです。それらは以前の例外を再スローする可能性がありますが、書き込みを再試行してはなりません。

クラスFSDataOutputStream

public class FSDataOutputStream
  extends DataOutputStream
  implements Syncable, CanSetDropBehind, StreamCapabilities {
 // ...
}

FileSystem.create()FileSystem.append()、およびFSDataOutputStreamBuilder.build()の呼び出しは、java.io.OutputStreamのサブクラスであるクラスFSDataOutputStreamのインスタンスを返します。

基本クラスは、SyncableCanSetDropBehind、およびStreamCapabilitiesを実装する可能性のあるOutputStreamインスタンスをラップします。

このドキュメントでは、このような実装の要件について説明します。

HDFSのFileSystem実装であるDistributedFileSystemは、HdfsDataOutputStreamのインスタンスを返します。この実装には、少なくとも2つの動作があり、基本のJava実装では明示的に宣言されていません

  1. 書き込みは同期されます。複数のスレッドが同じ出力ストリームに書き込むことができます。これは、HBaseが依存する使用パターンです。

  2. ファイルが閉じている場合、OutputStream.flush()はno-opです。Apache Druidは過去にこれを呼び出しました HADOOP-14346

HDFS実装はファイルシステムAPIの事実上の仕様と見なされているため、write()がスレッドセーフであるという事実は重要です。

互換性を維持するために、他のFSクライアントはスレッドセーフである必要があるだけでなく、暗号化やイレイジャーコーディングなどの新しいHDFS機能も、コアHDFS出力ストリームと一貫した動作を実装する必要があります。

言い換えると

出力ストリームがjava.io.OutputStreamのコアセマンティクスを実装するだけでは十分ではありません。特にHBaseが正しく機能するためには、HdfsDataOutputStreamの追加のセマンティクスを実装する必要があります。

同時write()呼び出しは、Java仕様の最も重要な強化です。

クラスjava.io.OutputStream

Java の OutputStream は、アプリケーションがバイト列を宛先に書き込むことを可能にします。Hadoop ファイルシステムでは、その宛先はファイルシステム内のパスの下のデータです。

public abstract class OutputStream implements Closeable, Flushable {
  public abstract void write(int b) throws IOException;
  public void write(byte b[]) throws IOException;
  public void write(byte b[], int off, int len) throws IOException;
  public void flush() throws IOException;
  public void close() throws IOException;
}

write(Stream, data)

ストリームに1バイトのデータを書き込みます。

前提条件

Stream.open else raise ClosedChannelException, PathIOException, IOException

HDFS 出力ストリームで、クローズされたファイルに書き込もうとすると、例外 java.nio.channels.ClosedChannelExceptionn が発生します。この例外には宛先パスが含まれていません。また、Exception.getMessage()null です。したがって、スタックトレースにおける価値は限定的です。実装者は、PathIOException など、より詳細な例外を発生させることをお勧めします。

事後条件

バッファには、data 引数の下位 8 ビットが付加されます。

Stream'.buffer = Stream.buffer + [data & 0xff]

キャッシュされたデータのサイズには、明示的な制限、または宛先ファイルシステムの利用可能な容量に基づく暗黙的な制限がある場合があります。制限に達した場合、write()IOException で失敗する必要があります (SHOULD)。

write(Stream, byte[] data, int offset, int len)

前提条件

前提条件はすべて OutputStream.write() で定義されています。

Stream.open else raise ClosedChannelException, PathIOException, IOException
data != null else raise NullPointerException
offset >= 0 else raise IndexOutOfBoundsException
len >= 0 else raise IndexOutOfBoundsException
offset < data.length else raise IndexOutOfBoundsException
offset + len < data.length else raise IndexOutOfBoundsException

操作が完了した後、バッファは再利用される可能性があります。write() 操作の進行中にバッファを更新した場合の結果は未定義です。

事後条件

Stream'.buffer = Stream.buffer + data[offset...(offset + len)]

write(byte[] data)

これは、次のものと同等に定義されます。

write(data, 0, data.length)

flush()

データがフラッシュされるように要求します。ObjectStream.flush() の仕様では、これは「意図した宛先」にデータを書き込む必要がある (SHOULD) と宣言されています。

耐久性については、いかなる保証も明示的に排除します。

そのため、このドキュメントでは動作に関する規範的な仕様は提供しません。

前提条件

なし。

事後条件

なし。

実装がストリームフラッシュ操作を実装する場合、データは他のユーザーに表示されるようにファイルシステムに保存される可能性があります。

FS' = FS where data(FS', path) == buffer

ストリームが閉じられると、flush() は、まだそうでない場合は、何もしない操作にダウングレードされる必要があります (SHOULD)。これは、まさにこの方法で呼び出すことができるアプリケーションおよびライブラリで動作するためです。

課題: flush()hflush() に転送する必要がありますか?

いいえ。または、少なくともオプションにします。

flush() はコストが低く、出力のすべての行を書き込んだ後、小さな 4KB ブロックなどを書き込んだ後に呼び出す必要があると仮定しているアプリケーションコードがたくさんあります。

これを分散ファイルシステム全体へのフルフラッシュに転送したり、さらに悪いことに、遠隔のオブジェクトストアに転送したりするのは非常に非効率的です。flush()hflush() に変換するファイルシステムクライアントは、最終的にその機能をロールバックする必要があります: HADOOP-16548.

close()

close() 操作は、すべてのデータをファイルシステムに保存し、データの書き込みに使用されるリソースを解放します。

close() 呼び出しは、書き込みが完了するまで(Syncable.hflush() の場合と同様に)、場合によっては永続ストレージに書き込まれるまでブロックすることが予想されます。

close() が完了した後、ファイル内のデータは、最後に書き込まれたデータと一致し、一貫性がある必要があります。ファイルのメタデータは、データおよび書き込み履歴自体(つまり、更新された変更時間フィールド)と一貫性がある必要があります。

close() が呼び出された後、ストリームに対する後続のすべての write() 呼び出しは、IOException で失敗する必要があります。

すべてのロック/リース保持メカニズムは、そのロック/リースを解放する必要があります。

Stream'.open = false
FS' = FS where data(FS', path) == buffer

close() 呼び出しは、その操作中に失敗する可能性があります。

  1. API の呼び出し元は、close() の一部の呼び出しが失敗することを予期する必要があり、適切にコーディングする必要があります (SHOULD)。例外をキャッチして無視することは一般的ですが、常に理想的な解決策であるとは限りません。
  2. 失敗した後でも、close() はストリームをクローズ状態にする必要があります。後続の close() の呼び出しは無視され、他のメソッドの呼び出しは拒否されます。つまり、呼び出し元は、成功するまで close() を繰り返し呼び出すことを期待できません。
  3. close() 操作の期間は未定義です。永続性保証を満たすためにリモートシステムからの確認応答に依存する操作は、暗黙的にこれらの確認応答を待機する必要があります。一部のオブジェクトストア出力ストリームは、close() 操作ですべてのデータファイルをアップロードします。これには長い時間がかかる可能性があります。多くのユーザーアプリケーションが close() は高速であり、失敗しないと想定しているという事実は、この動作が危険であることを意味します。

呼び出し元による安全な使用のための推奨事項

  • 例外がキャッチおよびログに記録されるか、例外がさらに上にスローされるように、例外が発生することを計画してください。例外をキャッチして黙って無視すると、深刻な問題が隠される可能性があります。
  • ハートビート操作は別のスレッドで実行される必要があります (SHOULD)。これにより、close() の長い遅延がスレッドを非常に長くブロックし、ハートビートがタイムアウトしないようにします。

実装者

  • close の複雑さの例については、HADOOP-16785 を参照してください。
  • close 操作の前にブロックを段階的に書き込むと、クライアントの期待により適合する動作になります。書き込みの失敗を早期に表面化させ、close を実際のアップロードよりもハウスキーピングに近づけることができます。
  • ブロックアップロードが別のスレッドで実行される場合、出力ストリーム close() 呼び出しは、すべての非同期アップロードが完了するまでブロックする必要があります。発生したエラーはすべて報告する必要があります。複数のエラーが発生した場合、ストリームは伝播するエラーを選択できます。重要なことは、close() がエラーなしで戻ったとき、アプリケーションはデータが正常に書き込まれたことを期待することです。

HDFS と OutputStream.close()

dfs.datanode.synconclose が true に設定されていない限り、HDFS は OutputStream.close() で書き込まれたファイルの出力をディスクにすぐに sync() しません。これにより、一部のアプリケーションで問題が発生しています

ファイルが永続化されたという保証が絶対に必要なアプリケーションは、ファイルを閉じるSyncable.hsync() を呼び出す必要があります。

org.apache.hadoop.fs.Syncable

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Syncable {


  /** Flush out the data in client's user buffer. After the return of
   * this call, new readers will see the data.
   * @throws IOException if any error occurs
   */
  void hflush() throws IOException;

  /** Similar to posix fsync, flush out the data in client's user buffer
   * all the way to the disk device (but the disk may have it in its cache).
   * @throws IOException if error occurs
   */
  void hsync() throws IOException;
}

Syncable インターフェースの目的は、データの可視性と耐久性の両方のためにファイルシステムにデータが書き込まれるという保証を提供することです。

SYNC-1: Syncable を実装し、呼び出し時に UnsupportedOperationException を発生させない OutputStream は、その保証を満たすことができることを明示的に宣言しています。

SYNC-2: ストリームが、インターフェースが実装されていると宣言しているが、耐久性を提供しない場合、インターフェースのメソッドは UnsupportedOperationException を発生させる必要があります。

Syncable インターフェースは、OutputStream のサブクラス以外のクラス (org.apache.hadoop.io.SequenceFile.Writer など) によって実装されています。

SYNC-3 クラスが Syncable を実装しているという事実は、extends OutputStream が保持されることを保証するものではありません。

つまり、任意のクラス C について: (C instanceof Syncable)(C instanceof OutputStream) を意味しません。

この仕様は、Syncable を実装する OutputStream サブクラスの必要な動作のみを対象としています。

SYNC-4: FileSystem.create(Path) の戻り値は FSDataOutputStream のインスタンスです。

SYNC-5: FSDataOutputStream は Syncable を実装します

SYNC-5 と SYNC-1 は、FileSystem.create(Path) で作成できるすべての出力ストリームが Syncable のセマンティクスをサポートする必要があることを意味します。これは明らかに真実ではありません。FSDataOutputStream は、ラップされたストリームが Syncable でない場合、単に flush() にダウングレードします。したがって、宣言 SYNC-1 と SYNC-2 は保持されません。Syncable を信頼することはできません。

言い換えれば、呼び出し元は、Syncable のセマンティクスがサポートされている証拠としてインターフェースの存在に依存してはなりません。代わりに、利用可能な場合は StreamCapabilities インターフェースを使用して動的にプローブする必要があります。

Syncable.hflush()

クライアントのユーザーバッファ内のデータをフラッシュします。この呼び出しの戻り後、新しいリーダーはデータを確認できます。hflush() 操作には、データの耐久性に関する保証は含まれていません。可視性のみです。

したがって、実装では、書き込まれたデータをメモリにキャッシュする場合があります。すべてのユーザーに表示されますが、まだ永続化されていません。

前提条件

hasCapability(Stream, "hflush")
Stream.open else raise IOException

事後条件

FS' = FS where data(path) == cache

呼び出しが戻った後、データは、FileSystem.open(path) および FileSystem.openFile(path).build() の新しいすべての呼び出し元に表示される必要があります。

(FS, path) の呼び出しによって作成された既存の DataInputStream を持つクライアントが、更新されたデータを表示するという要件または保証はありません。また、現在または後続の読み取りで表示されないという保証もありません。

実装上の注意: 正しい hsync() 実装は hflush() 呼び出しのすべてのセマンティクスも提供する必要があるため、hflush() の実装は hsync() を呼び出すだけでよい場合があります。

public void hflush() throws IOException {
  hsync();
}

hflush() パフォーマンス

hflush() 呼び出しは、ストアがデータを受信し、他のユーザーに表示されるようになったことを確認するまでブロックする必要があります。これには、クライアントからの未処理データのアップロード時間と、ファイルシステム自体がそれを処理する時間が含まれるため、時間がかかる可能性があります。

多くの場合、ファイルシステムは Syncable.hsync() の保証 (永続性と可視性) のみを提供します。これは、戻るまでの時間がさらに長くなる可能性があることを意味します。

アプリケーションコードは、すべての行の終わり、または WAL を書き込む場合を除き、すべてのレコードの終わりに hflush() または hsync() を呼び出してはなりません (MUST NOT)。注意して使用してください。

Syncable.hsync()

POSIX fsync() と同様に、この呼び出しは、クライアントのユーザーバッファ内のデータをすべてディスクデバイスに保存します (ただし、ディスクにはキャッシュがある場合があります)。

つまり、基盤となる FS は、すべてのデータをディスクハードウェア自体に保存する必要があり、そこで耐久性が期待されます。

前提条件

hasCapability(Stream, "hsync")
Stream.open else raise IOException

事後条件

FS' = FS where data(path) == buffer

実装は、その書き込みがストアによって確認されるまでブロックする必要があります。

これにより、呼び出し元は、呼び出しが正常に返されたら、データが書き込まれたことを確信できます。

インターフェース StreamCapabilities

@InterfaceAudience.Public
@InterfaceStability.Evolving

org.apache.hadoop.fs.StreamCapabilities インターフェースは、呼び出し元がストリームの動作を動的に判断できるようにするために存在します。

  public boolean hasCapability(String capability) {
    switch (capability.toLowerCase(Locale.ENGLISH)) {
      case StreamCapabilities.HSYNC:
      case StreamCapabilities.HFLUSH:
        return supportFlush;
      default:
        return false;
    }
  }

ストリームが閉じられた後、hasCapability() 呼び出しは次のいずれかを実行する必要があります。

  • 開いているストリームの機能を返します。
  • false を返します。

つまり、ファイルが閉じられたことに関する例外を発生させてはなりません。

PathCapabilities API の詳細については、pathcapabilities を参照してください。要件は似ています。ストリームは、サポートされていない機能に対して true を返してはなりません。これは、以下のいずれかが原因です。

  • 機能が不明である。
  • 機能は既知であるが、サポートされていないことがわかっている。

標準的なストリーム機能は StreamCapabilities で定義されています。オプションの完全なセットについては、javadoc を参照してください。

名前 サポートのプローブ
dropbehind CanSetDropBehind.setDropBehind()
hsync Syncable.hsync()
hflush Syncable.hflush()。非推奨:HSYNCのみをプローブしてください。
in:readahead CanSetReadahead.setReadahead()
in:unbuffer" CanUnbuffer.unbuffer()
in:readbytebuffer ByteBufferReadable#read(ByteBuffer)
in:preadbytebuffer ByteBufferPositionedReadable#read(long, ByteBuffer)

ストリームの実装は、独自のカスタムオプションを追加できます。これらは fs.SCHEMA. というプレフィックスで始まる必要があります。ここで、SCHEMA はファイルシステムのスキーマです。

インターフェース CanSetDropBehind

@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CanSetDropBehind {
  /**
   * Configure whether the stream should drop the cache.
   *
   * @param dropCache     Whether to drop the cache.  null means to use the
   *                      default value.
   * @throws IOException  If there was an error changing the dropBehind
   *                      setting.
   *         UnsupportedOperationException  If this stream doesn't support
   *                                        setting the drop-behind.
   */
  void setDropBehind(Boolean dropCache)
      throws IOException, UnsupportedOperationException;
}

このインターフェースを使用すると、呼び出し元は HDFS 内部で使用されるポリシーを変更できます。

実装は、次の呼び出しに対して true を返す必要があります。

StreamCapabilities.hasCapability("dropbehind");

ストリーム出力の耐久性、並行性、整合性、可視性。

これらは、この(非常に単純な)ファイルシステムモデルでは直接カバーされていないが、本番環境で可視化されるシステム動作の側面です。

耐久性

  1. OutputStream.write() は、データを同期または非同期で永続化する可能性があります。
  2. OutputStream.flush() は、データを宛先にフラッシュします。厳密な永続性の要件はありません。
  3. Syncable.hflush() は、未処理のすべてのデータを宛先ファイルシステムに同期的に送信します。呼び出し元に戻った後、データは他のリーダーから見える必要があります。永続化される可能性があります。つまり、永続化する必要はなく、パスでデータを読み取る新しいストリームを開こうとするすべてのクライアントに一貫して見えることが保証されるだけです。
  4. Syncable.hsync() は、hflush と同じようにデータを送信し、そのデータを基礎となる耐久性のあるストレージに永続化する必要があります。
  5. close() 最初の close() の呼び出しでは、バッファに残っているすべてのデータをフラッシュし、hsync() の呼び出しと同様に永続化する必要があります。

多くのアプリケーションは、書き込まれたすべての行の末尾など、flush() を頻繁に呼び出しすぎます。これにより、永続ストレージ内のデータと付随するメタデータの更新がトリガーされた場合、分散ストアはすぐに過負荷になります。したがって、flush() は、ネットワークバッファにデータをフラッシュする合図として扱われることがよくありますが、データの書き込みをコミットするわけではありません。

保証を提供するインターフェースは Syncable だけです。

2 つの Syncable 操作 hsync()hflush() は、hsync() の追加の保証によってのみ異なります。データは永続化する必要があります。hsync() が実装されている場合、hflush()hsync() を呼び出すだけで簡単に実装できます。

public void hflush() throws IOException {
  hsync();
}

これは実装として完全に許容できます。hflush() のセマンティクスは満たされています。許容できないのは、耐久性の保証が満たされなくなるため、hsync()hflush() にダウングレードすることです。

並行性

  1. 複数のプロセスが同じファイルに書き込む結果は未定義です。

  2. ファイルが書き込み用に開かれる前にファイルを読むために開かれた入力ストリームは、OutputStream への書き込みによって更新されたデータをフェッチする可能性があります。バッファリングとキャッシュのため、これは必須ではありません。また、入力ストリームが更新されたデータを取得する場合、更新されたデータが読み取られるポイントは未定義です。これは、seek() 呼び出しで接続が閉じられて再度開かれると、更新されたデータが取得される可能性があるが、フォワードストリームの読み取りでは取得されないオブジェクトストアで表面化します。同様に、ブロック指向のファイルシステムでは、データは一度に 1 ブロックずつキャッシュされる可能性があり、別のブロックが読み取られた場合にのみ変更が取得されます。

  3. ファイルシステムは、ストリームが書き込み中に宛先パスを操作することを許可する場合があります。たとえば、パスまたは親の rename()、パスまたは親の delete() などです。このような場合、出力ストリームでの将来の書き込み操作の結果は未定義です。一部のファイルシステムは、競合を防ぐためにロックを実装する場合があります。ただし、これらは、文献でよく知られている理由により、分散ファイルシステムではまれな傾向があります。

  4. java.io.OutputStream の Java API 仕様では、クラスのインスタンスがスレッドセーフであることを要求していません。ただし、org.apache.hadoop.hdfs.DFSOutputStream には、より強力なスレッドセーフモデルがあります(意図しない可能性があります)。この事実は、HADOOP-11708 で発見されたように、Apache HBase で依存しています。実装はスレッドセーフである必要があります。: DFSOutputStream の同期モデルでさえ、hsync() 操作でデータノードまたはネームノードへの書き込みからの確認応答を待機中に、出力ストリームで close() が呼び出されることを許可しています。

整合性と可視性

データが他のアプリケーションにすぐに表示される必要はありません。バッファをフラッシュするか、基礎となるストレージメディアに永続化する特定の呼び出しが行われるまでは、表示されません。

FileSystem.create(path, overwrite==true) で出力ストリームが作成され、パスに既存のファイルがある場合、つまり exists(FS, path) が保持されている場合、既存のデータはすぐに利用できなくなります。パスの末尾のデータは、一貫したメタデータを持つ空のバイトシーケンス [] で構成されている必要があります。

exists(FS, path)
(Stream', FS') = create(FS, path)
exists(FS', path)
getFileStatus(FS', path).getLen() = 0

ファイルのメタデータ(特に length(FS, path))は、flush() および sync() 後にファイルの内容と一致する必要があります。

(Stream', FS') = create(FS, path)
(Stream'', FS'') = write(Stream', data)
(Stream''', FS''') hsync(Stream'')
exists(FS''', path)
getFileStatus(FS''', path).getLen() = len(data)

HDFS は、書き込みがブロック境界を越える場合を除き、これを行いません。そうしないと、ネームノードが過負荷になる可能性があります。他のストアは、この動作をコピーする場合があります。

その結果、ファイルの書き込み中に length(Filesystem, Path)data(Filesystem, Path) の長さより小さくなる場合があります。

メタデータは、close() 操作後のファイルの内容と一致する必要があります。

出力ストリームの内容が永続化された後 (hflush()/hsync())、すべての新しい open(FS, Path) 操作は、更新されたデータを返す必要があります。

出力ストリームで close() が呼び出された後、getFileStatus(path) の呼び出しは、長さや変更時刻など、書き込まれたファイルの最終的なメタデータを返す必要があります。FileSystem の list 操作のいずれかで返されるファイルのメタデータは、このメタデータと一致する必要があります。

getFileStatus(path).getModificationTime() の値は、ストリームが書き込まれている間は定義されていません。タイムスタンプは、ファイルの書き込み中、特に Syncable.hsync() 呼び出し後に更新される可能性があります。タイムスタンプは、ファイルが閉じられた後、close() 呼び出し中にサーバーによって観測されたクロックの値に更新される必要があります。これは、クライアントではなく、ファイルシステムの時間とタイムゾーンである可能性が高いです。

正式には、close() 操作がサーバー側の時刻 t1 で開始し、t2 の時刻でファイルが正常に書き込まれて完了したサーバーとのやり取りをトリガーする場合、最後の変更時刻は t1 <= t <= t2 である時間 t である必要があります。

Hadoop 出力ストリームモデルの問題。

Hadoop が提供する出力ストリームモデルには、特にデータが書き込まれて永続化されるタイミングや、メタデータが同期されるタイミングに関する保証に関して、いくつかの既知の問題があります。これらは、HDFS と「ローカル」ファイルシステムの実装側面が、この仕様で使用されるファイルシステムの単純なモデルに従っていない場合です。

HDFS

HDFS: hsync() は最新のブロックのみを同期します

参照実装である DFSOutputStream は、データノードから確認応答を受信するまでブロックします。つまり、レプリカ書き込みチェーン内のすべてのホストがファイルへの書き込みに成功したということです。

つまり、呼び出し元が持つ可能性のある期待は、メソッド呼び出しの戻り値には、他の実装が維持する必要がある可視性と耐久性の保証が含まれているということです。

ただし、参照 DFSOutputStream.hsync() の呼び出しは、実際には現在のブロックのみを永続化することに注意してください。最後の同期以降に一連の書き込みがあり、ブロック境界が交差している場合。hsync() の呼び出しは、最新の書き込みのみを要求します。

DFSOutputStream.hsync(EnumSet<SyncFlag> syncFlags) の javadoc から

現在のブロックのみがディスクデバイスにフラッシュされることに注意してください。ブロック境界を越えて永続的な同期を保証するには、ストリームを {@link CreateFlag#SYNC_BLOCK} で作成する必要があります。

これは重要な HDFS 実装の詳細であり、HDFS に Write-Ahead-Log またはその他のデータベース構造を提供することを依存している人が無視してはなりません。アプリケーションの要件は、「WAL のコミットフラグがフラッシュされる前に、前のすべてのバイトが永続化されている必要がある」ことです。

このトピックに関する議論については、[Stonebraker81]、Michael Stonebraker、データベース管理用のオペレーティングシステムサポート、1981 を参照してください。

非常に大きな書き込みですべてのブロックを同期させた hsync() が必要な場合は、定期的に呼び出してください。

HDFS: メタデータ更新の遅延した可視性。

HDFS ファイルメタデータは、書き込まれているファイルの内容に遅れることがよくありますが、これは誰もが予想していることでも、書き込まれているファイルで更新されたデータを取得しようとするプログラムにとって都合の良いことでもありません。最も目に見えるのは、さまざまな list コマンドと getFileStatus で返されるファイルの長さです。これは多くの場合、古くなっています。

HDFS は出力操作でファイルの増加のみをサポートしているため、メタデータにリストされているファイルのサイズは、利用可能なバイト数以下であり、決して大きくならないことを意味します。これは保持されている保証でもあります。

HDFS でファイルが更新されたかどうかを判断する 1 つのアルゴリズムは次のとおりです。

  1. ファイル内の最後の読み取り位置 pos を記憶します。これが最初の読み取りの場合は 0 を使用します。
  2. getFileStatus(FS, Path) を使用して、メタデータに記録されたファイルの更新された長さをクエリします。
  3. Status.length &gt; pos の場合、ファイルは大きくなっています。
  4. 数が変わらない場合は、
    1. ファイルを再度開きます。
    2. seek(pos) でその位置に移動します。
    3. read() != -1 の場合、新しいデータが存在します。

このアルゴリズムは、メタデータとデータが一致しているファイルシステムと HDFS で機能します。重要なのは、オープンされたファイルに対して getFileStatus(FS, path).getLen() == 0 であっても、data(FS, path) が空であるとは限らないということです。

HDFS の出力ストリームが閉じられると、HDFS が dfs.datanode.synconclose を true に設定してデプロイされていない限り、新しく書き込まれたデータはすぐにディスクに書き込まれません。そうでない場合は、キャッシュされて後でディスクに書き込まれます。

ローカルファイルシステム、file:

LocalFileSystem, file: (または ChecksumFileSystem に基づくその他の FileSystem 実装)には、異なる問題があります。create() から出力ストリームを取得し、FileSystem.setWriteChecksum(false) がファイルシステムで呼び出されていない場合、ストリームは、フルチェックサムされたデータのブロックに書き込むことができるだけのローカルデータのみをフラッシュします。

つまり、hsync/hflush 操作は、ファイルが最終的に閉じられるまで、保留中のすべてのデータを書き込むことを保証しません。

このため、file:// URL を介してアクセスされるローカルファイルシステムは、setWriteChecksum(false) がその FileSystem インスタンスで呼び出され、チェックサムの作成が無効化されていない限り、Syncable をサポートしません。その後、明らかに、チェックサムはどのファイルにも生成されません。

チェックサムされた出力ストリーム

org.apache.hadoop.fs.FSOutputSummer および org.apache.hadoop.fs.ChecksumFileSystem.ChecksumFSOutputSummer は、HDFS およびその他のファイルシステムで使用される基盤となるチェックサム付き出力ストリームを実装しているため、出力ストリームの動作の中核となるセマンティクスの一部を提供します。

  1. close() 呼び出しは非同期であり、再入可能で、ストリームを複数回閉じようとする場合があります。
  2. 閉じられたストリームで write(int) を呼び出すことは可能です(ただし、write(byte[], int, int) は呼び出せません)。
  3. 閉じられたストリームで flush() を呼び出すことは可能です。

動作 1 と 2 は、注意して修正する必要があるバグと考える必要があります。

動作 3 は、他の実装がコピーするための事実上の標準と考える必要があります。

オブジェクトストア

オブジェクトストアストリームは、最終的な close() 操作がデータの単一の PUT と最終出力の具体化をトリガーするまで、ストリーム全体の出力をバッファリングする場合があります。

これにより、POSIX ファイルシステムやこのドキュメントで指定されているものと比較して、動作が大幅に変化します。

新しく作成されたオブジェクトの可視性

出力ストリームが作成された後、出力ストリームのパスでファイルが可視になるという保証はありません。

つまり、create(FS, path, boolean) は新しいストリームを返しますが

Stream' = (path, true, [])

操作のもう 1 つの後条件である data(FS', path) == [] は、保持されない場合があり、その場合

  1. exists(FS, p) は false を返す場合があります。
  2. ファイルが overwrite = True で作成された場合、既存のデータは引き続き表示される可能性があります。data(FS', path) = data(FS, path)
  3. overwrite=False を指定した create() 呼び出しでの既存データのチェックは、create() 呼び出し自体、書き込み前/書き込み中の close() 呼び出し、またはそれらの間のどこかの時点で行われる場合があります。オブジェクトストアがアトミックな PUT 操作をサポートしている特殊なケースでは、既存データの存在チェックと、それに続くパスでのデータ作成に競合状態が含まれます。他のクライアントが、存在チェックとそれに続く書き込みの間にパスにデータを作成する可能性があります。

  4. create(FS, Path, overwrite=false) の呼び出しは、別のストリームが開いていて宛先パスに書き込んでいる場合でも、成功し、新しい OutputStream を返す場合があります。

これにより、次の操作シーケンスが可能になります。これは、HDFS に対して呼び出された場合、2 番目の open() 呼び出しで例外が発生します。

Stream1 = open(FS, path, false)
sleep(200)
Stream2 = open(FS, path, false)
Stream.write('a')
Stream1.close()
Stream2.close()

クライアントが create() 呼び出しで 0 バイトのファイルを作成しない理由を疑問に思っている人のために説明すると、close() 後に問題が発生する可能性があります。マーカーファイルは、最終データの代わりに open() 呼び出しで返される可能性があります。

close() 後のストリーム出力の可視性

オブジェクトストアが満たすべき 1 つの保証は、POSIX ファイルシステムの保証と同じです。ストリームの close() 呼び出しが返された後、データは永続的に持続され、すべての呼び出し元に可視になる必要があります。残念ながら、その保証さえ常に満たされるわけではありません。

  1. パス上の既存のデータは、不定期間表示される場合があります。

  2. ストアに作成の不整合または存在の否定的なプローブのバッファリングの形式がある場合、ストリームの close() 操作が返された後でも、getFileStatus(FS, path) および open(FS, path)FileNotFoundException で失敗する可能性があります。

利点としては、ストアの PUT 操作のアトミシティは、独自の保証を提供します。新しく作成されたオブジェクトは、存在しないか、そのすべてのデータが存在します。オブジェクトのインスタンス化は、作成の不整合を示す可能性はありますが、アトミックです。アプリケーションはその事実を利用できる場合があります。

Abortable インターフェイスは、データが可視になる前に出力ストリームを中止するこの機能を公開しているため、チェックポイントや同様の操作に使用できます。

実装者向けの注意点。

常に Syncable を実装する - UnsupportedOperationException をスローするだけの場合でも

FSDataOutputStream は、Syncable.hflush() および Syncable.hsync()wrappedStream.flush() にサイレントにダウングレードするため、API の呼び出し元は、API をサポートしていないストリームに同期した後、データがフラッシュ/同期されたと誤解する可能性があります。

実装は API を実装する必要がありますが、UnsupportedOperationException をスローする必要があります。

StreamCapabilities

ファイルシステムクライアントの実装者は、StreamCapabilities インターフェイスとその hasCapabilities() メソッドを実装して、出力ストリームが Syncable の可視性と耐久性の保証を提供するかどうかを宣言する必要があります。

StreamCapabilities.hasCapabilities() の実装者は、これが真実ではないストリームで hflush および hsync 機能をサポートすると宣言してはなりません。

場合によっては、ストリームがデータをストアに渡すことがありますが、その終端はすべてをディスクに同期しない可能性があります。それはクライアントが判断できるものではありません。ここで、クライアントコードが hflush/hsync を行っている場合、これらのリクエストを分散 FS に渡す場合、それらをサポートしていると宣言する必要があります。

メタデータの更新

実装者は、すべての hsync() 呼び出し後にファイルのメタデータ (長さ、日付など) を更新しない場合があります。HDFS は、書き込まれたデータがブロック境界を越える場合を除き、更新しません。

close() はデータを同期して永続化しますか?

デフォルトでは、HDFS はストリームが閉じられたときにデータをすぐにディスクに書き込みません。ディスクに非同期的に保存されます。

これは、ユーザーがそれを期待しないという意味ではありません。

実装されている動作は、NFS の キャッシュ のライトバックの側面に似ています。DFSClient.close() は、すべてのデータをデータノードにアップロードするために、クライアントへの hflush() を実行しています。

  1. close() は、hflush() の保証が満たされると、つまりデータが他のユーザーに可視になると、返されます。
  2. 耐久性の保証については、最初に hsync() を呼び出す必要があります。