大数据阶段二的第二部分

抽取ds_db01库中product_info的全量数据进入Hive的ods库中表product_info,字段名称、类型不变。根据ods.product_info表中production_date,同时添加动态分区,分区字段为year、month(将production_date拆分为year和month),类型为int。使用 hive cli执行 show partitions ods.product_info命令;

首先将product_info这个表修改一下字段

将表明修改为对应的product_info

将`db_dbs`.`Untitled`修改为product_info
  1. 将char修改为string
    char修改为string
  2. 将smallint修改为int
  3. 将tinyint修改为int
  4. 将decimal修改为double
  5. 将enum修改为string
  6. 将datetime修改为timestamp
  7. 将text修改为string
  8. 在结尾添加分区
    partitioned by (year int,month int) row format delimited fields terminated by ',' lines terminated by '\n';

剩下没有说到的就原封不动

CREATE TABLE product_info (
product_id int,
product_core string,
product_name string,
bar_code string,
brand_id int,
one_category_id int,
two_category_id int,
three_category_id int,
supplier_id int,
price double,
average_cost double,
publish_status int,
audit_status int,
weight float,
length float,
height float,
width float,
color_type string,
production_date timestamp,
shelf_life int,
descript string,
indate timestamp,
modified_time timestamp
) partitioned by (year int,month int) row format delimited fields terminated by ',' lines terminated by '\n';
  1. 没有修改过的表
    OIP _3_.jpg
  2. 修改过的表
    OIP _3_.jpg

master工作

  1. 首先启动一下hadoop
    start-all.sh
  2. 进入hive里面
    hive
  3. 进到hive里面先use到ods你都库里面
    use ods;
  4. 将你修改好product_info的表放进来运行一下
    CREATE TABLE product_info (
    product_id int,
    product_core string,
    product_name string,
    bar_code string,
    brand_id int,
    one_category_id int,
    two_category_id int,
    three_category_id int,
    supplier_id int,
    price double,
    average_cost double,
    publish_status int,
    audit_status int,
    weight float,
    length float,
    height float,
    width float,
    color_type string,
    production_date timestamp,
    shelf_life int,
    descript string,
    indate timestamp,
    modified_time timestamp
    ) partitioned by (year int,month int) row format delimited fields terminated by ',' lines terminated by '\n';
  5. 可以用desc product_info查看一下字段
    看到有year和month就得了

代码部分

package org.jnds

import org.apache.spark.sql.SparkSession

object task02 {


def main(args: Array[String]): Unit = {
// 声明一个变量,创建一个spark的会话
val sparkSession=SparkSession.builder()
// 随机起一个任务名字
.appName("Task2")
// 这个是要启动到hive
.enableHiveSupport()
// 启用动态分区的
.config("hive.exec.dynamic.partition","true")
.config("hive.exec.dynamic.partition.mode","nonstrict")
// 将获取现有的SparkSession,或者,如果没有现有的a,则基于构建器中设置的选项创建一个新的a。
.getOrCreate()
// 创建一个到mysql的连接(数据源)
val customerDF=sparkSession.read.format("jdbc")
// user
.option("user","root")
// password
.option("password","Mysql123...")
// url
.option("url","jdbc:mysql://172.16.7.220/db_dbs")
// 告诉它从哪里拿数据
.option("dbtable","product_info")
// 读取
.load()
// 建立一个零时表,将数据放这个数据放到这个零时表里面,名字随意
customerDF.createOrReplaceTempView("product_info_tmp")
// 把数据放到hive里面的ods库里面
sparkSession.sql("use ods")
// 查数据,product_info表,
sparkSession.sql("INSERT INTO product_info partition (year,month) SELECT *,YEAR(production_date),MONTH(production_date) FROM product_info_tmp")
// 链接关闭
sparkSession.stop()

}
}

弄完这次构建一下,然后scp拷贝到master里面

在master运行一下

直接键入即可,注意看文件名,这个文件名每个人不一样

spark-submit --master yarn --class org.jnds.task02 /tmp/unnamed.jar 

然后进入到hive里面验证一下

  1. 直接键入
    hive
  2. use到ods库里面
    use ods;
  3. select * from 查看数据
    select * from product_info
  4. 用这条 show partitions ods.product_info 命令然后截图即可
    show partitions ods.product_info

OIP _3_.jpg

最后你还可以通过图像界面来查看

  1. 你直接到你的浏览器输入你的ip地址后面加上9870即可
    例如:172.16.1.1:9870
  2. 看到页面之后点击Utilities下面的(Browse the file system)
  3. 点击步骤:
    1. use
    2. hive
    3. warehouse
    4. ods.db
    5. product_info
    6. year=2022
  4. 最后就可以看到你的数据了

成功界面
OIP _3_.jpg