Hadoop: YARNアプリケーションの作成

目的

このドキュメントでは、YARN用の新しいアプリケーションを実装する方法を大まかに説明します。

概念とフロー

一般的な概念は、アプリケーション送信クライアントがYARNのResourceManager(RM)にアプリケーションを送信することです。これは、YarnClientオブジェクトをセットアップすることで実行できます。YarnClientが起動されると、クライアントはアプリケーションコンテキストをセットアップし、ApplicationMaster(AM)を含むアプリケーションの最初のコンテナを準備し、アプリケーションを送信できます。アプリケーションを実行するために必要なローカルファイル/jarの詳細、実行する必要のある実際のコマンド(必要なコマンドライン引数付き)、OS環境設定(オプション)などの情報を提供する必要があります。実際には、ApplicationMasterのために起動する必要があるUnixプロセスを記述する必要があります。

次に、YARN ResourceManagerは、割り当てられたコンテナ上でApplicationMaster(指定されたとおり)を起動します。ApplicationMasterはYARNクラスターと通信し、アプリケーションの実行を処理します。非同期的に操作を実行します。アプリケーションの起動時、ApplicationMasterの主なタスクは、a)将来のコンテナのリソースをネゴシエートして割り当てるためにResourceManagerと通信すること、およびb)コンテナ割り当て後、YARN NodeManager(NM)と通信して、それらにアプリケーションコンテナを起動することです。タスクa)は、AMRMClientAsyncオブジェクトを介して非同期的に実行でき、AMRMClientAsync.CallbackHandlerタイプのイベントハンドラーで指定されたイベント処理メソッドを使用します。イベントハンドラーは、クライアントに明示的に設定する必要があります。タスクb)は、コンテナが割り当てられたときにコンテナを起動する実行可能なオブジェクトを起動することによって実行できます。このコンテナの起動の一部として、AMは、コマンドライン仕様、環境などの起動情報を持つContainerLaunchContextを指定する必要があります。

アプリケーションの実行中、ApplicationMasterはNMClientAsyncオブジェクトを介してNodeManagerと通信します。すべてのコンテナイベントは、NMClientAsyncに関連付けられたNMClientAsync.CallbackHandlerによって処理されます。一般的なコールバックハンドラーは、クライアントの開始、停止、ステータスの更新、およびエラーを処理します。ApplicationMasterは、AMRMClientAsync.CallbackHandlergetProgress()メソッドを処理することにより、ResourceManagerに実行の進捗状況を報告します。

非同期クライアント以外に、特定のワークフロー用の同期バージョン(AMRMClientおよびNMClient)があります。(主観的に)よりシンプルな使用法のため、非同期クライアントをお勧めします。この記事では主に非同期クライアントについて説明します。同期クライアントの詳細については、AMRMClientおよびNMClientを参照してください。

インターフェース

以下は、重要なインターフェースです。

  • クライアント<-->ResourceManager

    YarnClientオブジェクトを使用します。

  • ApplicationMaster<-->ResourceManager

    AMRMClientAsyncオブジェクトを使用し、AMRMClientAsync.CallbackHandlerによってイベントを非同期的に処理します。

  • ApplicationMaster<-->NodeManager

    コンテナを起動します。NMClientAsyncオブジェクトを使用してNodeManagerと通信し、NMClientAsync.CallbackHandlerによってコンテナイベントを処理します。

注意

  • YARNアプリケーションの3つの主要プロトコル(ApplicationClientProtocol、ApplicationMasterProtocol、およびContainerManagementProtocol)は、引き続き保持されます。3つのクライアントは、これらの3つのプロトコルをラップして、YARNアプリケーションのよりシンプルなプログラミングモデルを提供します。

  • 非常にまれな状況では、プログラマーは3つのプロトコルを直接使用してアプリケーションを実装したい場合があります。ただし、このような動作は、一般的なユースケースでは推奨されなくなりました

シンプルなYARNアプリケーションの作成

シンプルなクライアントの作成

  • クライアントが最初に行う必要があるのは、YarnClientを初期化して起動することです。

      YarnClient yarnClient = YarnClient.createYarnClient();
      yarnClient.init(conf);
      yarnClient.start();
    
  • クライアントがセットアップされたら、クライアントはアプリケーションを作成し、そのアプリケーションIDを取得する必要があります。

      YarnClientApplication app = yarnClient.createApplication();
      GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
    
  • 新しいアプリケーションに対するYarnClientApplicationからの応答には、クラスターの最小/最大リソース機能など、クラスターに関する情報も含まれています。これは、ApplicationMasterが起動されるコンテナの仕様を正しく設定できるようにするために必要です。詳細については、GetNewApplicationResponseを参照してください。

  • クライアントの主な要点は、RMがAMを起動するために必要なすべての情報を定義するApplicationSubmissionContextをセットアップすることです。クライアントは、コンテキストに次のものを設定する必要があります。

  • アプリケーション情報:ID、名前

  • キュー、優先度情報:アプリケーションが送信されるキュー、アプリケーションに割り当てる優先度。

  • ユーザー:アプリケーションを送信するユーザー

  • ContainerLaunchContext:AMが起動および実行されるコンテナを定義する情報。前述のように、ContainerLaunchContextは、ローカルResources(バイナリ、jar、ファイルなど)、Environment設定(CLASSPATHなど)、実行するCommand、セキュリティTokens(RECT)など、アプリケーションの実行に必要なすべての情報を定義します。

// set the application submission context
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();

appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);

// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
    localResources, null);

// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
  addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
      localResources, null);
}

// The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
  Path shellSrc = new Path(shellScriptPath);
  String shellPathSuffix =
      appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
  Path shellDst =
      new Path(fs.getHomeDirectory(), shellPathSuffix);
  fs.copyFromLocalFile(false, true, shellSrc, shellDst);
  hdfsShellScriptLocation = shellDst.toUri().toString();
  FileStatus shellFileStatus = fs.getFileStatus(shellDst);
  hdfsShellScriptLen = shellFileStatus.getLen();
  hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}

if (!shellCommand.isEmpty()) {
  addToLocalResources(fs, null, shellCommandPath, appId.toString(),
      localResources, shellCommand);
}

if (shellArgs.length > 0) {
  addToLocalResources(fs, null, shellArgsPath, appId.toString(),
      localResources, StringUtils.join(shellArgs, " "));
}

// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();

// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
  .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
    YarnConfiguration.YARN_APPLICATION_CLASSPATH,
    YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
  classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
  classPathEnv.append(c.trim());
}
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
  "./log4j.properties");

// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);

// Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));

for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
if (debugFlag) {
  vargs.add("--debug");
}

vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}

LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());

// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);

// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);

// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);

// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
  // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
  Credentials credentials = new Credentials();
  String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
  if (tokenRenewer == null | | tokenRenewer.length() == 0) {
    throw new IOException(
      "Can't get Master Kerberos principal for the RM to use as renewer");
  }

  // For now, only getting tokens for the default file-system.
  final Token<?> tokens[] =
      fs.addDelegationTokens(tokenRenewer, credentials);
  if (tokens != null) {
    for (Token<?> token : tokens) {
      LOG.info("Got dt for " + fs.getUri() + "; " + token);
    }
  }
  DataOutputBuffer dob = new DataOutputBuffer();
  credentials.writeTokenStorageToStream(dob);
  ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  amContainer.setTokens(fsTokens);
}

appContext.setAMContainerSpec(amContainer);
  • セットアッププロセスが完了すると、クライアントは指定された優先度とキューでアプリケーションを送信する準備が整います。
// Set the priority for the application master
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);

// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);

// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);

yarnClient.submitApplication(appContext);
  • この時点で、RMはアプリケーションを受け入れ、バックグラウンドで、必要な仕様でコンテナを割り当てるプロセスを実行し、最終的に割り当てられたコンテナでAMをセットアップして起動します。

  • クライアントが実際のタスクの進捗状況を追跡する方法は複数あります。

  • RMと通信し、YarnClientgetApplicationReport()メソッドを介してアプリケーションのレポートを要求できます。
// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);

RMから受信したApplicationReportは、次のもので構成されています。

  • 一般的なアプリケーション情報:アプリケーションID、アプリケーションが送信されたキュー、アプリケーションを送信したユーザー、およびアプリケーションの開始時間。

  • ApplicationMasterの詳細:AMが実行されているホスト、クライアントからのリクエストをリッスンしているRPCポート(存在する場合)、およびクライアントがAMと通信するために必要なトークン。

  • アプリケーション追跡情報:アプリケーションが何らかの形式の進捗状況追跡をサポートしている場合、クライアントが監視できるApplicationReportgetTrackingUrl()メソッドを介して利用可能な追跡URLを設定できます。

  • アプリケーションステータス:ResourceManagerによって表示されるアプリケーションの状態は、ApplicationReport#getYarnApplicationStateを介して利用できます。YarnApplicationStateFINISHEDに設定されている場合、クライアントはApplicationReport#getFinalApplicationStatusを参照して、アプリケーションタスク自体の実際の成功/失敗を確認する必要があります。失敗した場合、ApplicationReport#getDiagnosticsが失敗に関する詳細情報を提供するのに役立つ場合があります。

  • ApplicationMasterがサポートしている場合、クライアントはアプリケーションレポートから取得したhost:rpcport情報を介して、進捗状況の更新についてAM自体を直接クエリできます。レポートから取得した追跡URL(利用可能な場合)を使用することもできます。
  • 特定の状況では、アプリケーションの時間がかかりすぎたり、その他の要因により、クライアントがアプリケーションをkillしたい場合があります。YarnClientは、クライアントがResourceManagerを介してAMにkillシグナルを送信できるようにするkillApplication呼び出しをサポートしています。ApplicationMasterは、そのRPCレイヤーを介してクライアントが活用できる中止呼び出しをサポートする場合もあります。
      yarnClient.killApplication(appId);
    

ApplicationMaster(AM)の作成

  • AMはジョブの実際の所有者です。RMによって起動され、クライアントを介して、監督および完了を委任されたジョブに関する必要なすべての情報とリソースが提供されます。

  • AMは、他のコンテナと物理ホストを共有している可能性のある(おそらくそうなる)コンテナ内で起動されるため、マルチテナントの性質など、さまざまな問題により、リッスンできる事前構成済みのポートのようなものを想定することはできません。

  • AMが起動すると、いくつかのパラメーターが環境を介して利用可能になります。これには、AMコンテナのContainerId、アプリケーションの送信時間、およびApplicationMasterを実行しているNM(NodeManager)ホストに関する詳細が含まれます。パラメーター名については、ApplicationConstantsを参照してください。

  • RMとのすべての対話には、ApplicationAttemptIdが必要です(障害が発生した場合、アプリケーションごとに複数の試行が存在する可能性があります)。ApplicationAttemptIdは、AMのコンテナIDから取得できます。環境から取得した値をオブジェクトに変換するためのヘルパーAPIがあります。

Map<String, String> envs = System.getenv();
String containerIdString =
    envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
  // container id should always be set in the env by the framework
  throw new IllegalArgumentException(
      "ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
  • AMが完全に初期化されたら、ResourceManagerとNodeManagerの2つのクライアントを開始できます。カスタマイズしたイベントハンドラーを設定します。これらのイベントハンドラーについては、この記事の後半で詳しく説明します。
  AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
  amRMClient.init(conf);
  amRMClient.start();

  containerListener = createNMCallbackHandler();
  nmClientAsync = new NMClientAsyncImpl(containerListener);
  nmClientAsync.init(conf);
  nmClientAsync.start();
  • AMは、AMが稼働中でまだ実行中であることをRMに知らせ続けるために、RMにハートビートを発行する必要があります。RMでのタイムアウト有効期限間隔は、YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MSを介してアクセス可能な構成設定によって定義され、デフォルトはYarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MSによって定義されます。ApplicationMasterは、ハートビートを開始するためにResourceManagerに自身を登録する必要があります。
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
        appMasterTrackingUrl);
  • 登録の応答には、含まれている場合は最大リソース能力が含まれます。これを使用して、アプリケーションのリクエストを確認できます。
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);

int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);

// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
  LOG.info("Container memory specified above max threshold of cluster."
      + " Using max value." + ", specified=" + containerMemory + ", max="
      + maxMem);
  containerMemory = maxMem;
}

if (containerVirtualCores > maxVCores) {
  LOG.info("Container virtual cores specified above max threshold of  cluster."
    + " Using max value." + ", specified=" + containerVirtualCores + ", max="
    + maxVCores);
  containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
    response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
        + " previous AM's running containers on AM registration.");
  • タスクの要件に基づいて、AMはタスクを実行するための一連のコンテナを要求できます。必要なコンテナの数を計算し、それらのコンテナを要求できます。
List<Container> previousAMRunningContainers =
    response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
    + " previous AM's running containers on AM registration.");

int numTotalContainersToRequest =
    numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainersToRequest; ++i) {
  ContainerRequest containerAsk = setupContainerAskForRM();
  amRMClient.addContainerRequest(containerAsk);
}
  • setupContainerAskForRM()では、次の2つのものを設定する必要があります。
  • リソース能力:現在、YARNはメモリベースのリソース要件をサポートしているため、リクエストでは必要なメモリ量を定義する必要があります。値はMB単位で定義され、クラスターの最大能力より小さく、最小能力の正確な倍数でなければなりません。メモリリソースは、タスクコンテナに課せられる物理メモリ制限に対応します。コードに示すように、計算ベースのリソース(vCore)もサポートします。

  • 優先度:コンテナのセットを要求する場合、AMは各セットに異なる優先度を定義できます。たとえば、Map-Reduce AMは、Mapタスクに必要なコンテナにはより高い優先度を、Reduceタスクのコンテナにはより低い優先度を割り当てることができます。

private ContainerRequest setupContainerAskForRM() {
  // setup requirements for hosts
  // using * as any host will do for the distributed shell app
  // set the priority for the request
  Priority pri = Priority.newInstance(requestPriority);

  // Set up resource type requirements
  // For now, memory and CPU are supported so we set memory and cpu requirements
  Resource capability = Resource.newInstance(containerMemory,
    containerVirtualCores);

  ContainerRequest request = new ContainerRequest(capability, null, null,
      pri);
  LOG.info("Requested container ask: " + request.toString());
  return request;
}
  • コンテナ割り当て要求がアプリケーションマネージャーによって送信されると、コンテナはAMRMClientAsyncクライアントのイベントハンドラーによって非同期的に起動されます。ハンドラーは、AMRMClientAsync.CallbackHandlerインターフェースを実装する必要があります。
  • コンテナが割り当てられると、ハンドラーはコンテナを起動するコードを実行するスレッドを設定します。ここでは、LaunchContainerRunnableという名前を使用して説明します。LaunchContainerRunnableクラスについては、この記事の次の部分で説明します。
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
  LOG.info("Got response from RM for container ask, allocatedCnt="
      + allocatedContainers.size());
  numAllocatedContainers.addAndGet(allocatedContainers.size());
  for (Container allocatedContainer : allocatedContainers) {
    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener);
    Thread launchThread = new Thread(runnableLaunchContainer);

    // launch and start the container on a separate thread to keep
    // the main thread unblocked
    // as all containers may not be allocated at one go.
    launchThreads.add(launchThread);
    launchThread.start();
  }
}
  • ハートビートでは、イベントハンドラーはアプリケーションの進行状況を報告します。
@Override
public float getProgress() {
  // set progress to deliver to RM on next heartbeat
  float progress = (float) numCompletedContainers.get()
      / numTotalContainers;
  return progress;
}
  • コンテナ起動スレッドは、実際にはNMでコンテナを起動します。コンテナがAMに割り当てられたら、割り当てられたコンテナで実行される最終的なタスクのためにContainerLaunchContextを設定する際に、クライアントが従ったのと同様のプロセスに従う必要があります。ContainerLaunchContextが定義されたら、AMはNMClientAsyncを通じて起動できます。
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);

// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!scriptPath.isEmpty()) {
  vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
    : ExecShellStringPath);
}

// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}

List<String> commands = new ArrayList<String>();
commands.add(command.toString());

// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.

// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
  localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
  • NMClientAsyncオブジェクトは、そのイベントハンドラーとともに、コンテナの開始、停止、ステータス更新、およびエラー発生を含むコンテナイベントを処理します。

  • ApplicationMasterは作業が完了したと判断した後、AM-RMクライアントを通じて自身の登録を解除し、クライアントを停止する必要があります。

try {
  amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
  LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
  LOG.error("Failed to unregister application", e);
}

amRMClient.stop();

よくある質問

アプリケーションのJARを、YARNクラスター内で必要なすべてのノードに配布するにはどうすればよいですか?

LocalResourceを使用して、アプリケーションリクエストにリソースを追加できます。これにより、YARNはリソースをApplicationMasterノードに配布します。リソースがtgz、zip、またはjarの場合、YARNで解凍できます。次に、解凍されたフォルダーをクラスパスに追加するだけです。たとえば、アプリケーションリクエストを作成するとき

File packageFile = new File(packagePath);
URL packageUrl = ConverterUtils.getYarnUrlFromPath(
    FileContext.getFileContext().makeQualified(new Path(packagePath)));

packageResource.setResource(packageUrl);
packageResource.setSize(packageFile.length());
packageResource.setTimestamp(packageFile.lastModified());
packageResource.setType(LocalResourceType.ARCHIVE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);

resource.setMemory(memory);
containerCtx.setResource(resource);
containerCtx.setCommands(ImmutableList.of(
    "java -cp './package/*' some.class.to.Run "
    + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
    + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
containerCtx.setLocalResources(
    Collections.singletonMap("package", packageResource));
appCtx.setApplicationId(appId);
appCtx.setUser(user.getShortUserName);
appCtx.setAMContainerSpec(containerCtx);
yarnClient.submitApplication(appCtx);

ご覧のとおり、setLocalResourcesコマンドは、名前とリソースのマッピングを受け取ります。名前はアプリケーションのcwd内のシンボリックリンクになるため、./package/*を使用して内部の成果物を参照できます。

:Javaのクラスパス(cp)引数は非常に敏感です。構文が正確であることを確認してください。

パッケージがAMに配布されたら、AMが新しいコンテナを起動するたびに(リソースをコンテナに送信する場合)、同じプロセスに従う必要があります。このコードは同じです。AMにパッケージパス(HDFSまたはローカル)を与えるだけで、コンテナctxとともにリソースURLを送信できます。

ApplicationMasterのApplicationAttemptIdを取得するにはどうすればよいですか?

ApplicationAttemptIdは、環境を介してAMに渡され、環境からの値は、ConverterUtilsヘルパー関数を使用してApplicationAttemptIdオブジェクトに変換できます。

コンテナがNodeManagerによって強制終了されるのはなぜですか?

これは、リクエストしたコンテナメモリサイズを超えるメモリ使用量が多いことが原因である可能性があります。これにはいくつかの理由が考えられます。まず、NodeManagerがコンテナを強制終了するときにダンプするプロセスツリーを確認してください。関心のある2つの点は、物理メモリと仮想メモリです。物理メモリ制限を超えている場合は、アプリが物理メモリを使いすぎています。Javaアプリを実行している場合は、-hprofを使用してヒープ内の領域を占有しているものを確認できます。仮想メモリを超えている場合は、クラスター全体の構成変数yarn.nodemanager.vmem-pmem-ratioの値を増やす必要がある場合があります。

ネイティブライブラリを含めるにはどうすればよいですか?

コンテナの起動中にコマンドラインで-Djava.library.pathを設定すると、Hadoopで使用されるネイティブライブラリが正しくロードされず、エラーが発生する可能性があります。代わりにLD_LIBRARY_PATHを使用する方がクリーンです。

便利なリンク

サンプルコード

YARN分散シェル:開発環境を設定した後、hadoop-yarn-applications-distributedshellプロジェクト内にあります。