我需要一个应用程序,使工人之间的通信。假设worker 1正在处理job 1,它将生成其他works所依赖的数据输出。此外,这个过程应该重复多次,这意味着每当worker 1生成新的数据集时,其他worker都应该开始输入该数据集并完成自己的工作。spark能做到吗?到目前为止,我已经看到了spark流实时处理,但流通信似乎没有发生在工人之间?任何方向或建议都将不胜感激。
发布于 2015-07-07 15:10:47
您必须在1个Spark Streaming Job中逐个定义所需的操作。
虽然我还没有尝试过,但是你也可以尝试使用一些工作流组件,比如Oozie来配置你的标准Spark批处理作业(而不是流)。
最近,Spring XD还引入了与Spark Jobs的集成。这也可能行得通-- http://www.slideshare.net/mark_fisher/spark-meets-spring。
发布于 2020-06-11 05:49:51
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.json4s.jackson.Json;
public class lotWeather {
public static void main(String[] args) throws StreamingQueryException {
System.setProperty("hadoop.home.dir", "C:\\hadoop-common-2.2.0-bin-master");
SparkSession sparkSession = SparkSession.builder().appName("SparkStreamingMessageListener").master("local").getOrCreate();
enter code here
StructType weatherType= new StructType().add("quarter","String").add("heatType", "string").add("heat","integer")
.add("windType","string").add("wind","integer");
Dataset<Row> rawData = sparkSession.readStream().schema(weatherType).option("sep", ",")
.csv("C:\\Users\\sorun\\OneDrive\\Masaüstü\\bigdata\\sparkstreaming\\*");
Dataset<Row> heatData = rawData.select("quarter", "heat").where("heat>29");
StreamingQuery start = heatData.writeStream().outputMode("append").format("console").start();
start.awaitTermination();
}
}https://stackoverflow.com/questions/31250214
复制相似问题