我是spark的新手。我们有从mysql读取数据的遗留java spark代码。但是,它在“主”节点加载数据,然后根据划分的组将数据广播到工作节点。代码如下所示:
Map<Integer, ObjectModel> allGroupInputModels = loadAllDataByGroups();
Broadcast<Map<Integer, ObjectModel>> partialObjectModel = sc.broadcast(allGroupInputObjectModels);
eventDF = sparkHelper.getEventPresenterDataFrame(groupIds, minDate, maxDate);
eventProductDF = sparkHelper.getEventProductDataFrame(groupIds, minDate, maxDate);
JavaPairRDD<Integer, List<Event>> eventsPairRDD = sparkHelper.getCombinedEventRDD(eventDF, eventProductDF).repartition(numPartition);
Map<Integer, ObjectModel> allGroupInputModels = loadAllDataByGroups();
Broadcast<Map<Integer, ObjectModel>> partialModel = sc.broadcast(allGroupInputObjectModels);
eventDF = sparkHelper.getEventPresenterDataFrame(groupIds, minDate, maxDate);
eventProductDF = sparkHelper.getEventProductDataFrame(groupIds, minDate, maxDate);
JavaPairRDD<Integer, List<Event>> eventsPairRDD = sparkHelper.getCombinedEventRDD(eventDF, eventProductDF).repartition(numPartition);
JavaRDD<ResultObject> resultJavaRDD = eventsPairRDD.map(r -> {
Integer groupId = r._1;
System.out.println("Processing Group: " + groupId);
List<Event> groupEvents = r._2;
Map<Integer, ObjectModel> allGroupModel = partialModel.getValue();
ObjectModel groupModel = allGroupModel.get(groupId);
groupModel.setEvents(groupEvents);
// process to get the results using the groupModel
.....
return result;
});请注意,我们在map函数的外部加载所有组的数据,我认为这意味着所有数据加载都在主节点上完成,并将广播发送到工作节点进行计算。我的理解正确吗?如果是,那么我担心数据太大,所以主内存将不够用。有什么方法可以将这个数据加载步骤移到worker节点中吗?请给我建议。谢谢。
发布于 2021-09-16 20:11:01
我不认为有太多东西可以从代码中拯救出来。传统的JDBC摄取看起来像这样:
SparkSession spark = SparkSession.builder()
.appName(
"MySQL to Dataframe using a JDBC Connection")
.master("local")
.getOrCreate();
// Using properties
Properties props = new Properties();
props.put("user", "root");
props.put("password", "Spark<3Java");
props.put("useSSL", "false");
Dataset<Row> df = spark.read().jdbc(
"jdbc:mysql://localhost:3306/sakila?serverTimezone=EST",
"actor", props);
df = df.orderBy(df.col("last_name"));
// Displays the dataframe and some of its metadata
df.show(5);
df.printSchema();
System.out.println("The dataframe contains " + df
.count() + " record(s).");一旦您在数据帧(Dataset<Row>)中摄取了数据,您就可以执行所有操作。这里的司机什么也不做。不需要广播。
与其关联的pox.xml包含:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<spark.version>3.0.0</spark.version>
<mysql.version>8.0.16</mysql.version>
...
</properties>
<dependencies>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
...https://stackoverflow.com/questions/69212605
复制相似问题