
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.smy</groupId>
<artifactId>national_competition_sample_paper_2023</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.0</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<hive.version>3.1.2</hive.version>
<mysqlconnect.version>5.1.37</mysqlconnect.version>
<clickhouse.version>0.3.2</clickhouse.version>
<hdfs.version>3.1.3</hdfs.version>
<spark.version>3.1.1</spark.version>
<hbase.version>2.2.3</hbase.version>
<kafka.version>2.4.1</kafka.version>
<hudi.version>0.12.0</hudi.version>
<lang3.version>3.9</lang3.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${lang3.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysqlconnect.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
</exclusions>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
</exclusions>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hdfs.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hdfs.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<!--该pom文件依赖已过时-->
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--hudi-->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/scala</directory>
</resource>
<resource>
<directory>src/main/java</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>1、抽取ds_db库中order_master的增量数据进入Hive的ods库中表order_master。
根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
2、抽取ds_db库中order_detail的增量数据进入Hive的ods库中表order_detail。
根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
3、抽取ds_db库中coupon_info的增量数据进入Hive的ods库中表coupon_info。
根据ods.coupon_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
4、抽取ds_db库中coupon_use的增量数据进入Hive的ods库中表coupon_use。
根据ods.coupon_use表中get_time、used_time、pay_time中的最大者作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
5、抽取ds_db库中product_browse的增量数据进入Hive的ods库中表product_browse。
根据ods.product_browse表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
6、抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。
根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
7、抽取ds_db库中customer_inf的增量数据进入Hive的ods库中表customer_inf。
根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
8、抽取ds_db01库中customer_login_log的增量数据进入Hive的ods库中表customer_login_log。
根据ods.customer_login_log表中login_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
9、抽取ds_db库中order_cart的增量数据进入Hive的ods库中表order_cart。
根据ods.order_cart表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
10、抽取ds_db库中customer_addr的增量数据进入Hive的ods库中表customer_addr。
根据ods.customer_addr表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)
11、抽取ds_db库中customer_level_inf增量数据进入Hive的ods库中表customer_level_inf。
根据ods.customer_level_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,
同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)import org.apache.spark.sql.{Dataset, SparkSession}
import java.time.LocalDate
object AddExtract {
def main(args: Array[String]): Unit = {
// TODO 创建SparkSession对象;设置Spark运行模式为本地模式,使用所有可用的核心;
// TODO 设置Spark SQL的存储分配策略为LEGACY模式;设置应用程序的名称为"Input";用于与Spark进行交互启用对Hive的支持
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.sql.storeAssignmentPolicy", "LEGACY")
.appName("Input")
.enableHiveSupport()
.getOrCreate()
// TODO 设置Spark上下文的日志级别为ERROR,只显示错误信息,减少日志输出量
spark.sparkContext.setLogLevel("ERROR")
// TODO 创建一个包含连接MySQL数据库所需信息的映射
val jdbcMap = Map(
"user" -> "root",
"password" -> "000000",
"url" -> "jdbc:mysql://master:3306/ds_db?useSSL=false",
"driver" -> "com.mysql.cj.jdbc.Driver"
)
// TODO 定义一个包含多个MySQL表名的数组
val MysqlTables = Array("order_master", "order_detail", "coupon_info", "coupon_use",
"product_browse", "product_info", "customer_inf", "customer_login_log",
"order_cart", "customer_level_inf", "customer_addr")
// TODO Hive中的表名与MySQL中的表名相同
val HiveTables = MysqlTables
// TODO 获取当前日期(例如:20241122)
val date = LocalDate.now().plusDays(-1).toString.replace("-", "")
// TODO zip使将Hive表名和MySQL表名进行一一配对
for ((hiveTable, mysqlTable) <- HiveTables.zip(MysqlTables)) {
// TODO 读取MySQL数据
val mysqlDF = spark.read.format("jdbc").options(jdbcMap).option("dbtable", mysqlTable).load()
// TODO 创建一个临时视图
mysqlDF.createOrReplaceTempView("df")
val insertQuery: String = mysqlTable match {
case "customer_login_log" =>
// TODO 获取hive的customer_login_log表中的最大登录时间戳(增量字段),并将结果转换为字符串
val maxTimeQuery = s"select max(cast(login_time as TIMESTAMP)) from ods.${hiveTable}"
val max_time = spark.sql(maxTimeQuery).collect()(0).get(0).toString
// TODO 增量抽取数据至hive
s"""
|insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
|select * from df where cast(login_time as TIMESTAMP) > cast("${max_time}" as TIMESTAMP)
""".stripMargin
case "coupon_use" =>
// TODO 获取hive的coupon_use表中的最大时间(取get_time、pay_time、used_time中的最大值)(增量字段),并将结果转换为字符串
val maxTimeQuery = s"select max(greatest(get_time, pay_time, used_time)) from ods.${hiveTable}"
val max_time = spark.sql(maxTimeQuery).collect()(0).get(0).toString
// TODO 增量抽取数据至hive
s"""
|insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
|select * from df
|where get_time > "${max_time}" or
|pay_time > "${max_time}" or
|used_time > "${max_time}"
""".stripMargin
case _ =>
// TODO 获取hive表中modified_time字段的最大时间(增量字段),并将结果转换为字符串
val maxTimeQuery = s"select max(cast(modified_time as TIMESTAMP)) from ods.${hiveTable}"
val max_time = spark.sql(maxTimeQuery).collect()(0).get(0).toString
// TODO 增量抽取数据至hive
s"""
|insert overwrite table ods.${hiveTable} partition (etl_date="${date}")
|select * from df where cast(modified_time as TIMESTAMP) > cast("${max_time}" as TIMESTAMP)
""".stripMargin
}
try {
// TODO 执行插入操作
spark.sql(insertQuery)
println(s"=======================成功抽取 ${hiveTable} 表=======================")
} catch {
case e: Exception =>
println(s"=======================抽取 ${hiveTable} 表失败=======================")
e.printStackTrace()
}
}
// TODO 停止SparkSession,释放相关资源
spark.stop()
}
}:paste
Ctrl + D
AddExtract.main(Array.empty[String])

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。