インターフェース org.apache.hadoop.fs.MultipartUploader

MultipartUploader は、Hadoop がサポートするファイルシステムに複数の部分をまとめてファイルをアップロードできます。マルチパートアップロードの利点は、複数のクライアントまたはプロセスから並列でファイルをアップロードでき、complete関数が呼び出されるまで、その結果は他のクライアントには表示されないことです。

オブジェクトストアによって実装されている場合、アップロードされたデータは、ファイルシステムに表示される前であっても、ストレージ料金が発生する可能性があります。このAPIのユーザーは注意深く、常にアップロードの完了または中断を最善を尽くして実行する必要があります。abortUploadsUnderPath(path)操作はここで役立ちます。

不変条件

有効なMultipartUploaderのすべての要件は、暗黙的な条件と事後条件と見なされます。

単一マルチパートアップロードの操作は、異なるマルチパートアップローダーのインスタンス、異なるプロセス、およびホストにまたがって実行される可能性があります。したがって、次のことが必須です。

  1. パートのアップロード、アップロードの完了、またはアップロードの中断に必要なすべての状態は、アップロードハンドル内に含まれているか、またはアップロードハンドルから取得できる必要があります。

  2. そのハンドルはシリアライズ可能でなければなりません。Hadoopのまったく同じバージョンを実行している異なるプロセスで逆シリアライズ可能でなければなりません。

  3. 異なるホスト/プロセスは、シーケンシャルまたは同時に異なる部分をアップロードできます。それらがファイルシステムにアップロードされる順序は、データが最終ファイルに保存される順序を制約してはなりません。

  4. アップロードは、パーツをアップロードしたインスタンスとは異なるインスタンスで完了できます。

  5. アップロードの出力は、アップロードが完了するまで、最終的な宛先では表示されません。

  6. 単一のマルチパートアップローダーインスタンスが、ストアが同時アップロードをサポートしているかどうかに関係なく、同じ宛先に複数のファイルをシーケンシャルに開始または完了してもエラーではありません。

並行性

複数のプロセスがマルチパートアップロードのパーツを同時にアップロードできます。

アクティブなアップロードが進行中の宛先にstartUpload(path)が呼び出された場合、実装は2つの操作のいずれかを実行する必要があります。

  • 呼び出しを重複として拒否する。
  • 両方の処理を許可し、ファイルの最終出力は*2つのアップロードのいずれか1つのみ*のものになるようにする。

どちらのアップロードが成功するかは未定義です。ユーザーは、ファイルシステム間、ファイルシステムインスタンス間、*または異なるリクエスト間でさえ*一貫した動作を期待してはなりません。

パーツのアップロード中にマルチパートアップロードが完了または中断された場合、進行中のアップロードは、完了していない場合は、全体的にも部分的にも最終ファイルに含まれてはなりません。実装は、putPart()操作でエラーを発生させるべきです。

シリアライゼーションの互換性

ユーザーは、シリアライズされたPathHandleバージョンが*異なるマルチパートアップローダーの実装間で、*同じ実装の異なるバージョン間で互換性があるとは期待してはなりません。

つまり、すべてのクライアントはHadoopのまったく同じバージョンを使用する必要があります。

モデル

マルチパートアップロードをサポートするFileSystem / FileContextは、既存のモデル(ディレクトリ、ファイル、シンボリックリンク)(ディレクトリ、ファイル、シンボリックリンク、アップロード)アップロードタイプMap[UploadHandle -> Map[PartHandle -> UploadPart]に拡張します。

状態タプルのアップロード要素は、すべてのアクティブなアップロードのマップです。

Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`

UploadHandleは、空ではないバイトのリストです。

UploadHandle: List[byte]
len(UploadHandle) > 0

クライアントはこれを不透明なデータとして扱う必要があります。この機能設計の中核となるのは、ハンドルがクライアント間で有効であることです。ハンドルはホストhostAでシリアライズされ、hostBで逆シリアライズされ、それでもアップロードの拡張または完了に使用できます。

UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])

同様に、PartHandle型も空ではない不透明なバイトのリストであり、これもホスト間でマーシャリング可能です。

PartHandle: List[byte]

FS.Uploads内の各UploadHandleが一意であることは暗黙的です。同様に、[PartHandle -> UploadPart]のマップ内の各PartHandleも一意である必要があります。

  1. Part Handleがアップロード間で一意であるという要件はありません。
  2. Upload Handleが時間的に一意であるという要件はありません。ただし、Part Handleが急速にリサイクルされる場合、名目上べき等な操作abort(FS, uploadHandle)が、同じUpload Handleを使用した後続の操作を意図せずキャンセルするリスクがあります。

非同期API

すべての操作はCompletableFuture<>型を返し、その戻り値を取得するために後で評価する必要があります。

  1. 操作の実行は、呼び出しスレッドでブロック操作になる可能性があります。
  2. そうでない場合は、別々のスレッドで実行され、将来の評価が戻るまでに完了する必要があります。
  3. 一部/すべての事前条件は、最初の呼び出し時に評価される場合があります。
  4. その時点で評価されないものはすべて、将来の実行中に評価する必要があります。

これは、実装が高速なファイルシステム/ストアと対話する場合、ファイルの存在を含むすべての事前条件が早期に評価される可能性がある一方、プローブが遅いリモートオブジェクトストアと対話する実装は、非同期フェーズで事前条件、特にリモートストアと対話するものを検証する可能性があることを意味します。

Java CompletableFuturesは、チェック済み例外と連携しません。Hadoopのコードベースはまだ、非同期APIの使用方法が増えるにつれて、ここの例外処理の詳細を進化させています。IOExceptionが発生する必要があると宣言する事前条件の失敗は、将来で評価された場合、何らかの形式のRuntimeExceptionにラップされる可能性があることを想定してください。これは、操作中に発生する他のIOExceptionにも当てはまります。

close()

アプリケーションは、アップローダーを使用した後にclose()を呼び出す必要があります。これは、他のオブジェクトを解放したり、統計情報を更新したりするためです。

状態変更操作

CompletableFuture<UploadHandle> startUpload(Path)

マルチパートアップロードを開始し、最終的に後続の操作で使用するためのUploadHandleを返します。

事前条件

if path == "/" : raise IOException

if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException

ファイルシステムが宛先への同時アップロードをサポートしていない場合、次の事前条件が追加されます。

if path in values(FS.Uploads) raise PathExistsException, IOException

事後条件

初期化操作が完了すると、ファイルシステムの状態は、新しいアクティブなアップロードと新しいハンドルで更新され、このハンドルが呼び出し元に返されます。

handle' = UploadHandle where not handle' in keys(FS.Uploads)
FS' = FS where FS'.Uploads(handle') == {}
result = handle'

CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)

特定のマルチパートアップロードのパーツをアップロードし、最終的に指定されたアップロードのこの部分を表す不透明なパーツハンドルを返します。

事前条件

uploadHandle in keys(FS.Uploads)
partNumber >= 1
lengthInBytes >= 0
len(inputStream) >= lengthInBytes

事後条件

data' = inputStream(0..lengthInBytes)
partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts)
FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
result = partHandle'

データは、完了するまでファイルシステムに保存されます。宛先パスでは表示されません。ファイルシステムのどこかの一時パスに表示される場合があります。これは実装固有であり、依存してはなりません。

CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)

マルチパートアップロードを完了します。

ファイルシステムは、アップロードされた最後の部分を除く各部分の最小サイズを適用する場合があります。

パーツがこの範囲外の場合、IOExceptionが発生する必要があります。

事前条件

uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
FS.Uploads(uploadHandle).path == path
if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
parts.size() > 0
forall k in keys(parts): k > 0
forall k in keys(parts):
  not exists(k2 in keys(parts)) where (parts[k] == parts[k2])

すべてのキーは0より大きく、同じparthandleへの重複参照があってはなりません。これらの検証は、操作の任意の時点で実行できます。失敗後、有効なパスのマップを持つこのアップロードのcomplete()呼び出しが完了する保証はありません。このような失敗の後、クリーンアップを確実に実行するために、呼び出し元はabort()を呼び出す必要があります。

このuploadHandleに対してputPart()操作が実行されたが、そのPathHandleハンドルがこのリクエストに含まれていない場合、省略された部分は結果のファイルの一部にはなりません。

MultipartUploader は、そのような未処理のエントリをすべてクリーンアップする必要があります。

ディレクトリをサポートするバックエンドストレージ(ローカルファイルシステム、HDFSなど)の場合、完了時点で宛先にディレクトリが存在する場合は、PathIsDirectoryExceptionまたはその他のIOExceptionをスローする必要があります。

事後条件

UploadData' == ordered concatention of all data in the map of parts, ordered by key
exists(FS', path') and result = PathHandle(path')
FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)

PathHandleは完了操作によって返されるため、後続の操作では、その間にデータが変更されていないことを確認できます。

アップロードされたファイル内のパーツの順序は、マップ内でのパーツの自然な順序です。パート1はパート2より前など。

CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)

マルチパートアップロードを中止します。ハンドルは無効になり、再利用できません。

事前条件

uploadHandle in keys(FS.Uploads) else raise FileNotFoundException

事後条件

アップロードハンドルは認識されなくなります。

FS' = FS where not uploadHandle in keys(FS'.uploads)

ハンドルが再利用されない限り、同じハンドルでabort()を再度呼び出すと失敗します。

CompletableFuture<Integer> abortUploadsUnderPath(Path path)

パス下のすべてのアップロードを可能な限りクリーンアップします。

解決するFutureを返します。

-1 if unsuppported
>= 0 if supported

ベストエフォートであるため、厳密な事後条件を設定することはできません。理想的な事後条件は、パス下のすべてのアップロードが中止され、カウントが中止されたアップロードの数であることです。

FS'.uploads forall upload in FS.uploads:
    not isDescendant(FS, path, upload.path)
return len(forall upload in FS.uploads:
               isDescendant(FS, path, upload.path))