首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >spark-sql 批量增量抽取MySQL数据至hive ODS层

spark-sql 批量增量抽取MySQL数据至hive ODS层

原创
作者头像
码农GT038527
修改2024-11-23 15:00:40
修改2024-11-23 15:00:40
7720
举报
文章被收录于专栏:sparkspark

环境准备

  • 搭建好Hadoop、spark、hive、mysql等组件
  • mysql基础数据源,hive基本分层
  • Maven 配置文件
代码语言:xml
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.smy</groupId>
    <artifactId>national_competition_sample_paper_2023</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.0</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <mysqlconnect.version>5.1.37</mysqlconnect.version>
        <clickhouse.version>0.3.2</clickhouse.version>
        <hdfs.version>3.1.3</hdfs.version>
        <spark.version>3.1.1</spark.version>
        <hbase.version>2.2.3</hbase.version>
        <kafka.version>2.4.1</kafka.version>
        <hudi.version>0.12.0</hudi.version>
        <lang3.version>3.9</lang3.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${lang3.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysqlconnect.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-exec</artifactId>
                </exclusion>
            </exclusions>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-exec</artifactId>
                </exclusion>
            </exclusions>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hdfs.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hdfs.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <!--该pom文件依赖已过时-->
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${clickhouse.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--hudi-->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
            <version>${hudi.version}</version>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/scala</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • 在ide写好代码,粘贴至spark-shell运行,也可使用spark提交命令进行运行,这里展示使用spark-shell运行

需求

代码语言:shell
复制
1、抽取ds_db库中order_master的增量数据进入Hive的ods库中表order_master。
根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

2、抽取ds_db库中order_detail的增量数据进入Hive的ods库中表order_detail。
根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

3、抽取ds_db库中coupon_info的增量数据进入Hive的ods库中表coupon_info。
根据ods.coupon_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

4、抽取ds_db库中coupon_use的增量数据进入Hive的ods库中表coupon_use。
根据ods.coupon_use表中get_time、used_time、pay_time中的最大者作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

5、抽取ds_db库中product_browse的增量数据进入Hive的ods库中表product_browse。
根据ods.product_browse表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

6、抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。
根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

7、抽取ds_db库中customer_inf的增量数据进入Hive的ods库中表customer_inf。
根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

8、抽取ds_db01库中customer_login_log的增量数据进入Hive的ods库中表customer_login_log。
根据ods.customer_login_log表中login_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

9、抽取ds_db库中order_cart的增量数据进入Hive的ods库中表order_cart。
根据ods.order_cart表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

10、抽取ds_db库中customer_addr的增量数据进入Hive的ods库中表customer_addr。
根据ods.customer_addr表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

11、抽取ds_db库中customer_level_inf增量数据进入Hive的ods库中表customer_level_inf。
根据ods.customer_level_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)

代码

代码语言:scala
复制
import org.apache.spark.sql.{Dataset, SparkSession}
import java.time.LocalDate

object AddExtract {
  def main(args: Array[String]): Unit = {
    // TODO 创建SparkSession对象;设置Spark运行模式为本地模式,使用所有可用的核心;
    // TODO 设置Spark SQL的存储分配策略为LEGACY模式;设置应用程序的名称为"Input";用于与Spark进行交互启用对Hive的支持
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.sql.storeAssignmentPolicy", "LEGACY")
      .appName("Input")
      .enableHiveSupport()
      .getOrCreate()

    // TODO 设置Spark上下文的日志级别为ERROR,只显示错误信息,减少日志输出量
    spark.sparkContext.setLogLevel("ERROR")

    // TODO 创建一个包含连接MySQL数据库所需信息的映射
    val jdbcMap = Map(
      "user" -> "root",
      "password" -> "000000",
      "url" -> "jdbc:mysql://master:3306/ds_db?useSSL=false",
      "driver" -> "com.mysql.cj.jdbc.Driver"
    )

    // TODO 定义一个包含多个MySQL表名的数组
    val MysqlTables = Array("order_master", "order_detail", "coupon_info", "coupon_use",
      "product_browse", "product_info", "customer_inf", "customer_login_log",
      "order_cart", "customer_level_inf", "customer_addr")
    // TODO Hive中的表名与MySQL中的表名相同
    val HiveTables = MysqlTables

    // TODO 获取当前日期(例如:20241122)
    val date = LocalDate.now().plusDays(-1).toString.replace("-", "")

    // TODO zip使将Hive表名和MySQL表名进行一一配对
    for ((hiveTable, mysqlTable) <- HiveTables.zip(MysqlTables)) {
      // TODO 读取MySQL数据
      val mysqlDF = spark.read.format("jdbc").options(jdbcMap).option("dbtable", mysqlTable).load()
      // TODO 创建一个临时视图
      mysqlDF.createOrReplaceTempView("df")

      val insertQuery: String = mysqlTable match {
        case "customer_login_log" =>
          // TODO 获取hive的customer_login_log表中的最大登录时间戳(增量字段),并将结果转换为字符串
          val maxTimeQuery = s"select max(cast(login_time as TIMESTAMP)) from ods.${hiveTable}"
          val max_time = spark.sql(maxTimeQuery).collect()(0).get(0).toString

          // TODO 增量抽取数据至hive
          s"""
             |insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
             |select * from df where cast(login_time as TIMESTAMP) > cast("${max_time}" as TIMESTAMP)
             """.stripMargin
          
        case "coupon_use" =>
          // TODO 获取hive的coupon_use表中的最大时间(取get_time、pay_time、used_time中的最大值)(增量字段),并将结果转换为字符串
          val maxTimeQuery = s"select max(greatest(get_time, pay_time, used_time)) from ods.${hiveTable}"
          val max_time = spark.sql(maxTimeQuery).collect()(0).get(0).toString

          // TODO 增量抽取数据至hive
          s"""
             |insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
             |select * from df
             |where get_time  > "${max_time}" or
             |pay_time > "${max_time}"  or
             |used_time > "${max_time}"
             """.stripMargin
          
        case _ =>
          // TODO 获取hive表中modified_time字段的最大时间(增量字段),并将结果转换为字符串
          val maxTimeQuery = s"select max(cast(modified_time as TIMESTAMP)) from ods.${hiveTable}"
          val max_time = spark.sql(maxTimeQuery).collect()(0).get(0).toString

          // TODO 增量抽取数据至hive
          s"""
             |insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
             |select * from df where cast(modified_time as TIMESTAMP) > cast("${max_time}" as TIMESTAMP)
             """.stripMargin
      }

      try {
        // TODO 执行插入操作
        spark.sql(insertQuery)
        println(s"=======================成功抽取 ${hiveTable} 表=======================")
      } catch {
        case e: Exception =>
          println(s"=======================抽取 ${hiveTable} 表失败=======================")
          e.printStackTrace()
      }
    }

    // TODO 停止SparkSession,释放相关资源
    spark.stop()
  }
}

执行

  • 打开spark-shell,输入:paste
  • 然后直接粘贴代码,之后按住快捷键Ctrl + D
  • 执行AddExtract.main(Array.empty[String])

结果

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境准备
  • 需求
  • 代码
  • 执行
  • 结果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档