首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >你能以编程方式使用spark-shell吗?

你能以编程方式使用spark-shell吗?
EN

Stack Overflow用户
提问于 2015-11-03 13:22:15
回答 2查看 2.4K关注 0票数 3

可以从java或scala程序运行spark-shell吗?换句话说,在java程序中启动一个spark-shell会话,将spark代码传递给它并读回响应,然后在代码中继续交互。

EN

回答 2

Stack Overflow用户

发布于 2015-11-05 20:36:55

如果你想使用spark-shell,你总是可以从java中调用它,然后捕获它的stdin和stdout来传递文本和获取响应。

代码语言:javascript
复制
OutputStream stdin = null;
InputStream stderr = null;
InputStream stdout = null;

Process process = Runtime.getRuntime ().exec ("spark-shell");
stdin = process.getOutputStream ();
stderr = process.getErrorStream ();
stdout = process.getInputStream ();

但实际上没有理由这样做。Spark-Shell主要用于学习和测试。您可以从shell中执行的所有操作都可以从Java应用程序中执行,甚至可以交互方式执行。

考虑以下示例:您希望计算错误数,如果错误数超过100,则询问用户是否希望在控制台中显示这些错误。如果它们小于100,无论如何都要显示它们:

代码语言:javascript
复制
JavaRDD<String> lines = sc.textFile("hdfs://log.txt").filter(s -> s.contains("error"));
if(lines.count() > 100)
{
    System.out.println("Errors are more than 100 do you wish to display them? (y/n)");

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    if(br.readLine().equals("y"))
    {
        List<String> errors = lines.collect();
        for(String s : errors)
            System.out.println(s);
    }
}
else
{
    List<String> errors = lines.collect();
    for(String s : errors)
        System.out.println(s);
}
票数 1
EN

Stack Overflow用户

发布于 2016-07-19 21:10:49

这是一个基于Spark 1.6.0Scala 2.10的有效解决方案。使用Settings创建SparkIMain,并对与类型关联的变量和值执行bind操作。

代码语言:javascript
复制
import org.apache.spark.repl.SparkIMain
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.tools.nsc.GenericRunnerSettings
class TestMain {
  def exec(): Unit = {
    val settings = new GenericRunnerSettings( println _ )
        settings.usejavacp.value = true
    val interpreter = new SparkIMain(settings)

    val conf = new SparkConf().setAppName("TestMain").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val methodChain =
      """
        val df = sqlContext.read
              .format("com.databricks.spark.csv")
              .option("header", "false")
              .option("inferSchema", "true")
              .option("treatEmptyValuesAsNulls", "true")
              .option("parserLib", "univocity")
              .load("example-data.csv")

        df.show()

      """
    interpreter.bind("sqlContext" ,"org.apache.spark.sql.SQLContext", sqlContext)
    val resultFlag = interpreter.interpret(methodChain)
  }
}

object TestInterpreter{

    def main(args: Array[String]) {
      val testMain = new TestMain()
      testMain.exec()
      System.exit(0)
    }}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33491887

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档