org.apache.hadoop.fs.MultipartUploader
MultipartUploader
は、Hadoop がサポートするファイルシステムに複数の部分をまとめてファイルをアップロードできます。マルチパートアップロードの利点は、複数のクライアントまたはプロセスから並列でファイルをアップロードでき、complete
関数が呼び出されるまで、その結果は他のクライアントには表示されないことです。
オブジェクトストアによって実装されている場合、アップロードされたデータは、ファイルシステムに表示される前であっても、ストレージ料金が発生する可能性があります。このAPIのユーザーは注意深く、常にアップロードの完了または中断を最善を尽くして実行する必要があります。abortUploadsUnderPath(path)
操作はここで役立ちます。
有効なMultipartUploader
のすべての要件は、暗黙的な条件と事後条件と見なされます。
単一マルチパートアップロードの操作は、異なるマルチパートアップローダーのインスタンス、異なるプロセス、およびホストにまたがって実行される可能性があります。したがって、次のことが必須です。
パートのアップロード、アップロードの完了、またはアップロードの中断に必要なすべての状態は、アップロードハンドル内に含まれているか、またはアップロードハンドルから取得できる必要があります。
そのハンドルはシリアライズ可能でなければなりません。Hadoopのまったく同じバージョンを実行している異なるプロセスで逆シリアライズ可能でなければなりません。
異なるホスト/プロセスは、シーケンシャルまたは同時に異なる部分をアップロードできます。それらがファイルシステムにアップロードされる順序は、データが最終ファイルに保存される順序を制約してはなりません。
アップロードは、パーツをアップロードしたインスタンスとは異なるインスタンスで完了できます。
アップロードの出力は、アップロードが完了するまで、最終的な宛先では表示されません。
単一のマルチパートアップローダーインスタンスが、ストアが同時アップロードをサポートしているかどうかに関係なく、同じ宛先に複数のファイルをシーケンシャルに開始または完了してもエラーではありません。
複数のプロセスがマルチパートアップロードのパーツを同時にアップロードできます。
アクティブなアップロードが進行中の宛先にstartUpload(path)
が呼び出された場合、実装は2つの操作のいずれかを実行する必要があります。
どちらのアップロードが成功するかは未定義です。ユーザーは、ファイルシステム間、ファイルシステムインスタンス間、*または異なるリクエスト間でさえ*一貫した動作を期待してはなりません。
パーツのアップロード中にマルチパートアップロードが完了または中断された場合、進行中のアップロードは、完了していない場合は、全体的にも部分的にも最終ファイルに含まれてはなりません。実装は、putPart()
操作でエラーを発生させるべきです。
ユーザーは、シリアライズされたPathHandleバージョンが*異なるマルチパートアップローダーの実装間で、*同じ実装の異なるバージョン間で互換性があるとは期待してはなりません。
つまり、すべてのクライアントはHadoopのまったく同じバージョンを使用する必要があります。
マルチパートアップロードをサポートするFileSystem / FileContextは、既存のモデル(ディレクトリ、ファイル、シンボリックリンク)
を(ディレクトリ、ファイル、シンボリックリンク、アップロード)
のアップロード
タイプMap[UploadHandle -> Map[PartHandle -> UploadPart]
に拡張します。
状態タプルのアップロード
要素は、すべてのアクティブなアップロードのマップです。
Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`
UploadHandleは、空ではないバイトのリストです。
UploadHandle: List[byte] len(UploadHandle) > 0
クライアントはこれを不透明なデータとして扱う必要があります。この機能設計の中核となるのは、ハンドルがクライアント間で有効であることです。ハンドルはホストhostA
でシリアライズされ、hostB
で逆シリアライズされ、それでもアップロードの拡張または完了に使用できます。
UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])
同様に、PartHandle
型も空ではない不透明なバイトのリストであり、これもホスト間でマーシャリング可能です。
PartHandle: List[byte]
FS.Uploads
内の各UploadHandle
が一意であることは暗黙的です。同様に、[PartHandle -> UploadPart]
のマップ内の各PartHandle
も一意である必要があります。
abort(FS, uploadHandle)
が、同じUpload Handleを使用した後続の操作を意図せずキャンセルするリスクがあります。すべての操作はCompletableFuture<>
型を返し、その戻り値を取得するために後で評価する必要があります。
これは、実装が高速なファイルシステム/ストアと対話する場合、ファイルの存在を含むすべての事前条件が早期に評価される可能性がある一方、プローブが遅いリモートオブジェクトストアと対話する実装は、非同期フェーズで事前条件、特にリモートストアと対話するものを検証する可能性があることを意味します。
Java CompletableFuturesは、チェック済み例外と連携しません。Hadoopのコードベースはまだ、非同期APIの使用方法が増えるにつれて、ここの例外処理の詳細を進化させています。IOException
が発生する必要があると宣言する事前条件の失敗は、将来で評価された場合、何らかの形式のRuntimeException
にラップされる可能性があることを想定してください。これは、操作中に発生する他のIOException
にも当てはまります。
close()
アプリケーションは、アップローダーを使用した後にclose()
を呼び出す必要があります。これは、他のオブジェクトを解放したり、統計情報を更新したりするためです。
CompletableFuture<UploadHandle> startUpload(Path)
マルチパートアップロードを開始し、最終的に後続の操作で使用するためのUploadHandle
を返します。
if path == "/" : raise IOException if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
ファイルシステムが宛先への同時アップロードをサポートしていない場合、次の事前条件が追加されます。
if path in values(FS.Uploads) raise PathExistsException, IOException
初期化操作が完了すると、ファイルシステムの状態は、新しいアクティブなアップロードと新しいハンドルで更新され、このハンドルが呼び出し元に返されます。
handle' = UploadHandle where not handle' in keys(FS.Uploads) FS' = FS where FS'.Uploads(handle') == {} result = handle'
CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)
特定のマルチパートアップロードのパーツをアップロードし、最終的に指定されたアップロードのこの部分を表す不透明なパーツハンドルを返します。
uploadHandle in keys(FS.Uploads) partNumber >= 1 lengthInBytes >= 0 len(inputStream) >= lengthInBytes
data' = inputStream(0..lengthInBytes) partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts) FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data' result = partHandle'
データは、完了するまでファイルシステムに保存されます。宛先パスでは表示されません。ファイルシステムのどこかの一時パスに表示される場合があります。これは実装固有であり、依存してはなりません。
CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)
マルチパートアップロードを完了します。
ファイルシステムは、アップロードされた最後の部分を除く各部分の最小サイズを適用する場合があります。
パーツがこの範囲外の場合、IOException
が発生する必要があります。
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException FS.Uploads(uploadHandle).path == path if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException parts.size() > 0 forall k in keys(parts): k > 0 forall k in keys(parts): not exists(k2 in keys(parts)) where (parts[k] == parts[k2])
すべてのキーは0より大きく、同じparthandleへの重複参照があってはなりません。これらの検証は、操作の任意の時点で実行できます。失敗後、有効なパスのマップを持つこのアップロードのcomplete()
呼び出しが完了する保証はありません。このような失敗の後、クリーンアップを確実に実行するために、呼び出し元はabort()
を呼び出す必要があります。
このuploadHandle
に対してputPart()
操作が実行されたが、そのPathHandle
ハンドルがこのリクエストに含まれていない場合、省略された部分は結果のファイルの一部にはなりません。
MultipartUploader は、そのような未処理のエントリをすべてクリーンアップする必要があります。
ディレクトリをサポートするバックエンドストレージ(ローカルファイルシステム、HDFSなど)の場合、完了時点で宛先にディレクトリが存在する場合は、PathIsDirectoryException
またはその他のIOException
をスローする必要があります。
UploadData' == ordered concatention of all data in the map of parts, ordered by key exists(FS', path') and result = PathHandle(path') FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
PathHandle
は完了操作によって返されるため、後続の操作では、その間にデータが変更されていないことを確認できます。
アップロードされたファイル内のパーツの順序は、マップ内でのパーツの自然な順序です。パート1はパート2より前など。
CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)
マルチパートアップロードを中止します。ハンドルは無効になり、再利用できません。
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
アップロードハンドルは認識されなくなります。
FS' = FS where not uploadHandle in keys(FS'.uploads)
ハンドルが再利用されない限り、同じハンドルでabort()
を再度呼び出すと失敗します。
CompletableFuture<Integer> abortUploadsUnderPath(Path path)
パス下のすべてのアップロードを可能な限りクリーンアップします。
解決するFutureを返します。
-1 if unsuppported >= 0 if supported
ベストエフォートであるため、厳密な事後条件を設定することはできません。理想的な事後条件は、パス下のすべてのアップロードが中止され、カウントが中止されたアップロードの数であることです。
FS'.uploads forall upload in FS.uploads: not isDescendant(FS, path, upload.path) return len(forall upload in FS.uploads: isDescendant(FS, path, upload.path))