首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何通过Spark WORKER节点从MySQL读取数据?

如何通过Spark WORKER节点从MySQL读取数据?
EN

Stack Overflow用户
提问于 2021-09-16 17:14:26
回答 1查看 25关注 0票数 0

我是spark的新手。我们有从mysql读取数据的遗留java spark代码。但是,它在“主”节点加载数据,然后根据划分的组将数据广播到工作节点。代码如下所示:

代码语言:javascript
复制
    Map<Integer, ObjectModel> allGroupInputModels = loadAllDataByGroups();
    Broadcast<Map<Integer, ObjectModel>> partialObjectModel = sc.broadcast(allGroupInputObjectModels);
    
    eventDF = sparkHelper.getEventPresenterDataFrame(groupIds, minDate, maxDate);
    eventProductDF = sparkHelper.getEventProductDataFrame(groupIds, minDate, maxDate);
    JavaPairRDD<Integer, List<Event>> eventsPairRDD = sparkHelper.getCombinedEventRDD(eventDF, eventProductDF).repartition(numPartition);
    
    Map<Integer, ObjectModel> allGroupInputModels = loadAllDataByGroups();
    Broadcast<Map<Integer, ObjectModel>> partialModel = sc.broadcast(allGroupInputObjectModels);
    
    eventDF = sparkHelper.getEventPresenterDataFrame(groupIds, minDate, maxDate);
    eventProductDF = sparkHelper.getEventProductDataFrame(groupIds, minDate, maxDate);
    JavaPairRDD<Integer, List<Event>> eventsPairRDD = sparkHelper.getCombinedEventRDD(eventDF, eventProductDF).repartition(numPartition);
        
    JavaRDD<ResultObject> resultJavaRDD = eventsPairRDD.map(r -> {
   
                        Integer groupId = r._1;
                        System.out.println("Processing Group: " + groupId);
        
                        List<Event> groupEvents = r._2;
        
                        Map<Integer, ObjectModel> allGroupModel = partialModel.getValue();
        
                        ObjectModel groupModel = allGroupModel.get(groupId);
                
                        groupModel.setEvents(groupEvents);
        
                        // process to get the results using the groupModel
                        .....
        
                        return result;
                    });

请注意,我们在map函数的外部加载所有组的数据,我认为这意味着所有数据加载都在主节点上完成,并将广播发送到工作节点进行计算。我的理解正确吗?如果是,那么我担心数据太大,所以主内存将不够用。有什么方法可以将这个数据加载步骤移到worker节点中吗?请给我建议。谢谢。

EN

回答 1

Stack Overflow用户

发布于 2021-09-16 20:11:01

我不认为有太多东西可以从代码中拯救出来。传统的JDBC摄取看起来像这样:

代码语言:javascript
复制
SparkSession spark = SparkSession.builder()
    .appName(
        "MySQL to Dataframe using a JDBC Connection")
    .master("local")
    .getOrCreate();

// Using properties
Properties props = new Properties();
props.put("user", "root");
props.put("password", "Spark<3Java");
props.put("useSSL", "false");

Dataset<Row> df = spark.read().jdbc(
    "jdbc:mysql://localhost:3306/sakila?serverTimezone=EST",
    "actor", props);
df = df.orderBy(df.col("last_name"));

// Displays the dataframe and some of its metadata
df.show(5);
df.printSchema();
System.out.println("The dataframe contains " + df
    .count() + " record(s).");

一旦您在数据帧(Dataset<Row>)中摄取了数据,您就可以执行所有操作。这里的司机什么也不做。不需要广播。

与其关联的pox.xml包含:

代码语言:javascript
复制
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <java.version>1.8</java.version>
  <scala.version>2.12</scala.version>
  <spark.version>3.0.0</spark.version>
  <mysql.version>8.0.16</mysql.version>
  ...
 </properties>

 <dependencies>
  <!-- Spark -->
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_${scala.version}</artifactId>
   <version>${spark.version}</version>
  </dependency>

  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_${scala.version}</artifactId>
   <version>${spark.version}</version>
   <exclusions>
    <exclusion>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-simple</artifactId>
    </exclusion>
   </exclusions>
  </dependency>

  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>${mysql.version}</version>
  </dependency>
...
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69212605

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档