首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >spark-sql 批量全量抽取MySQL数据至hive ODS层

spark-sql 批量全量抽取MySQL数据至hive ODS层

原创
作者头像
码农GT038527
修改2024-11-28 13:26:37
修改2024-11-28 13:26:37
5650
举报
文章被收录于专栏:sparkspark

环境准备

  • 搭建好Hadoop、spark、hive、mysql等组件
  • mysql基础数据源,hive基本分层
  • Maven 配置文件
代码语言:xml
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.smy</groupId>
    <artifactId>national_competition_sample_paper_2023</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.0</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <mysqlconnect.version>5.1.37</mysqlconnect.version>
        <clickhouse.version>0.3.2</clickhouse.version>
        <hdfs.version>3.1.3</hdfs.version>
        <spark.version>3.1.1</spark.version>
        <hbase.version>2.2.3</hbase.version>
        <kafka.version>2.4.1</kafka.version>
        <hudi.version>0.12.0</hudi.version>
        <lang3.version>3.9</lang3.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${lang3.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysqlconnect.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-exec</artifactId>
                </exclusion>
            </exclusions>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-exec</artifactId>
                </exclusion>
            </exclusions>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hdfs.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hdfs.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <!--该pom文件依赖已过时-->
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${clickhouse.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--hudi-->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
            <version>${hudi.version}</version>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/scala</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • 在ide写好代码,粘贴至spark-shell运行,也可使用spark提交命令进行运行,这里展示使用spark-shell运行

需求

  • 将以下MySQL表全量抽取到hive表中
代码语言:txt
复制
order_master, 
order_detail, 
coupon_info, 
coupon_use,
product_browse, 
product_info, 
customer_inf, 
customer_login_log,
order_cart, 
customer_level_inf, 
customer_addr

代码

代码语言:scala
复制
import org.apache.spark.sql._
import java.time.LocalDate

object AllExtract {
  def main(args: Array[String]): Unit = {
    // TODO 创建SparkSession对象;设置Spark运行模式为本地模式,使用所有可用的核心;
    // TODO 设置Spark SQL的存储分配策略为LEGACY模式;设置应用程序的名称为"Input";用于与Spark进行交互启用对Hive的支持
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.sql.storeAssignmentPolicy", "LEGACY")
      .appName("Input")
      .enableHiveSupport()
      .getOrCreate()

    // TODO 设置Spark上下文的日志级别为ERROR,只显示错误信息,减少日志输出量
    spark.sparkContext.setLogLevel("ERROR")

    // TODO 创建一个包含连接MySQL数据库所需信息的映射
    val jdbcMap = Map(
      "user" -> "root",
      "password" -> "000000",
      "url" -> "jdbc:mysql://master:3306/ds_db?useSSL=false",
      "driver" -> "com.mysql.cj.jdbc.Driver"
    )

    // TODO 定义一个包含多个MySQL表名的数组
    val MysqlTables = Array("order_master", "order_detail", "coupon_info", "coupon_use",
      "product_browse", "product_info", "customer_inf", "customer_login_log",
      "order_cart", "customer_level_inf", "customer_addr")
    // TODO Hive中的表名与MySQL中的表名相同
    val HiveTables = MysqlTables

    // TODO 获取当前日期(例如:20241121)
    val date = LocalDate.now().plusDays(-1).toString.replace("-", "")

    // TODO zip使将Hive表名和MySQL表名进行一一配对
    for ((hiveTable, mysqlTable) <- HiveTables.zip(MysqlTables)) {
      // TODO 读取MySQL数据
      val mysqlDF = spark.read.format("jdbc").options(jdbcMap).option("dbtable", mysqlTable).load()
      // TODO 创建一个临时视图
      mysqlDF.createOrReplaceTempView("df")

      // TODO 全量数据抽取至hive
      spark.sql(
        s"""
           |insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
           |select * from df
          """.stripMargin)
          
      println(s"=======================成功抽取 ${hiveTable} 表=======================")
    }

    spark.stop()
  }
}

执行

  • 打开spark-shell,输入:paste
  • 然后直接粘贴代码,之后按住快捷键Ctrl + D
  • 执行AllExtract.main(Array.empty[String])

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境准备
  • 需求
  • 代码
  • 执行
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档