首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >每当发生事件时触发火花作业

每当发生事件时触发火花作业
EN

Stack Overflow用户
提问于 2016-09-12 13:12:18
回答 1查看 2.6K关注 0票数 2

我有一个火花应用程序,当它收到有关某个主题的kafka消息时,它应该运行。

我不会收到超过5-6条信息,所以我不想采取火花流的方法。相反,我尝试使用SparkLauncher提交应用程序,但我不喜欢这种方法,因为我必须在代码中以编程方式设置spark和Java,以及所有必要的星星之火属性,如执行器内核、执行器内存等。

如何触发火花应用程序从spark-submit运行,但让它等待直到收到消息?

任何指示都是非常有用的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-09-12 13:28:46

您可以使用shell脚本方法和nohup命令提交这样的作业.

"nohup spark-submit shell script <parameters> 2>&1 < /dev/null &

无论何时,您都会收到消息,然后可以轮询该事件并调用此shell脚本。

下面是这样做的代码片段..。进一步看一看https://en.wikipedia.org/wiki/Nohup

-使用RunTime

代码语言:javascript
复制
/**
     * This method is to spark submit
     * <pre> You can call spark-submit or mapreduce job on the fly like this.. by calling shell script... </pre>
     * @param commandToExecute String 
     */
    public static Boolean executeCommand(final String commandToExecute) {
        try {
            final Runtime rt = Runtime.getRuntime();
            // LOG.info("process command -- " + commandToExecute);
            final String[] arr = { "/bin/sh", "-c", commandToExecute};
            final Process proc = rt.exec(arr);
            // LOG.info("process started ");
            final int exitVal = proc.waitFor();
            LOG.trace(" commandToExecute exited with code: " + exitVal);
            proc.destroy();

        } catch (final Exception e) {
            LOG.error("Exception occurred while Launching process : " + e.getMessage());
            return Boolean.FALSE;
        }
             return Boolean.TRUE;
    }

-使用ProcessBuilder -另一种方式

代码语言:javascript
复制
private static void executeProcess(Operation command, String database) throws IOException,
            InterruptedException {

        final File executorDirectory = new File("src/main/resources/");

private final static String shellScript = "./sparksubmit.sh";
ProcessBuilder processBuilder = new ProcessBuilder(shellScript, command.getOperation(), "argument-one");

        processBuilder.directory(executorDirectory);
          Process process = processBuilder.start();
          try {
            int shellExitStatus = process.waitFor();
            if (shellExitStatus != 0) {
                logger.info("Successfully executed the shell script");
            }
        } catch (InterruptedException ex) {
            logger.error("Shell Script process was interrupted");
        }
      }

-第三条路: jsch

使用JSch在SSH上运行一个命令

- YarnClient 类-fourth方法

我最喜欢的一本书Data algorithms使用了这种方法

代码语言:javascript
复制
// import required classes and interfaces
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;

public class SubmitSparkJobToYARNFromJavaCode {

   public static void main(String[] arguments) throws Exception {

       // prepare arguments to be passed to 
       // org.apache.spark.deploy.yarn.Client object
       String[] args = new String[] {
           // the name of your application
           "--name",
           "myname",

           // memory for driver (optional)
           "--driver-memory",
           "1000M",

           // path to your application's JAR file 
           // required in yarn-cluster mode      
           "--jar",
           "/Users/mparsian/zmp/github/data-algorithms-book/dist/data_algorithms_book.jar",

           // name of your application's main class (required)
           "--class",
           "org.dataalgorithms.bonus.friendrecommendation.spark.SparkFriendRecommendation",

           // comma separated list of local jars that want 
           // SparkContext.addJar to work with      
           "--addJars",
           "/Users/mparsian/zmp/github/data-algorithms-book/lib/spark-assembly-1.5.2-hadoop2.6.0.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/log4j-1.2.17.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/junit-4.12-beta-2.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/jsch-0.1.42.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/JeraAntTasks.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/jedis-2.5.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/jblas-1.2.3.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/hamcrest-all-1.3.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/guava-18.0.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-math3-3.0.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-math-2.2.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-logging-1.1.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-lang3-3.4.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-lang-2.6.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-io-2.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-httpclient-3.0.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-daemon-1.0.5.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-configuration-1.6.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-collections-3.2.1.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/commons-cli-1.2.jar,/Users/mparsian/zmp/github/data-algorithms-book/lib/cloud9-1.3.2.jar",

           // argument 1 to your Spark program (SparkFriendRecommendation)
           "--arg",
           "3",

           // argument 2 to your Spark program (SparkFriendRecommendation)
           "--arg",
           "/friends/input",

           // argument 3 to your Spark program (SparkFriendRecommendation)
           "--arg",
           "/friends/output",

           // argument 4 to your Spark program (SparkFriendRecommendation)
           // this is a helper argument to create a proper JavaSparkContext object
           // make sure that you create the following in SparkFriendRecommendation program
           // ctx = new JavaSparkContext("yarn-cluster", "SparkFriendRecommendation");
           "--arg",
           "yarn-cluster"
       };

       // create a Hadoop Configuration object
       Configuration config = new Configuration();

       // identify that you will be using Spark as YARN mode
       System.setProperty("SPARK_YARN_MODE", "true");

       // create an instance of SparkConf object
       SparkConf sparkConf = new SparkConf();

       // create ClientArguments, which will be passed to Client
       ClientArguments cArgs = new ClientArguments(args, sparkConf); 

       // create an instance of yarn Client client
       Client client = new Client(cArgs, config, sparkConf); 

       // submit Spark job to YARN
       client.run(); 
   }
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39451176

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档