大数据阶段二-1

阶段二-任务一的要求

任务一:数据抽取
使用Scala编写spark工程代码,将MySQL的ds_db01库中表customer_inf、order_detail、order_master、product_info的数据增量抽取到Hive的ods库
(需自建)中对应表customer_inf、order_detail、order_master、product_info中。

1、抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用 hive cli执行 show partitions ods.customer_inf命令;

2、抽取ds_db01库中product_info的全量数据进入Hive的ods库中表product_info,字段名称、类型不变。根据ods.product_info表中production_date,同时添加动态分区,分区字段为year、month(将production_date拆分为year和month),类型为int。使用 hive cli执行 show partitions ods.product_info命令;

3、抽取ds_db01库中order_detail的全量数据进入Hive的ods库中表order_detail。字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用 hive cli执行 show partitions ods.order_detail命令;

4、抽取ds_db01库中order_master的部分数据进入Hive的ods库中表order_master。根据ods.order_master表中province作为判断字段,只将上海市和浙江省的数据抽入,字段名称、类型不变,同时添加动态分区,分区字段为province,类型为String。使用 hive cli执行 show partitions ods.order_master命令;

任务二:
请使用Flume采集18888端口的实时数据流并存入Kafka中名为flume的Topic中。运行数据生成脚本后,使用kafka-console-consumer.sh显示接收到的数据。

修改customer_inf表

你可以先将它复制到记事本 然后记得把所有的”`”去掉

  1. 修改里面的varchar和char类型
    把varchar和char改为 string
  2. 修改里面的tinyint类型
    把tinyint改为 int
  3. 修改里面的datatime类型
    把datatime改为 timestamp
  4. 修改里面的decimal类型
    把decima改为 double
  5. 新增字段
    partitioned by (etl_date string) row format delimited fields terminated by ',' lines terminated by "\n";

没修改的样子

OIP _3_.jpg

修改过的效果

OIP _3_.jpg

将修改过的customer_inf表放进hive里面执行

  1. 首先启动master并且查看一些hadoop有没有启动
  2. 之后直接:hive
  3. 然后在hive里面创建ods的库,创建完出现个ok就行了
    create database ods;
  4. 切换到ods的库里面
    use ods;
  5. 然后可以使用 hive cli执行 show partitions ods.customer_inf命令; 出行一个ok即可
    show partitions ods.customer_inf;
  6. 最后将你修改好的customer_inf直接复制进去即可,出现ok就表示成功了
    CREATE TABLE customer_inf  (
    customer_inf_id int,
    customer_id int,
    customer_name string,
    identity_card_type int,
    identity_card_no string,
    mobile_phone string,
    customer_email string,
    gender string,
    customer_point int,
    register_time timestamp,
    birthday timestamp,
    customer_level int,
    customer_money double,
    modified_time timestamp
    ) partitioned by (etl_date string) row format delimited fields terminated by ',' lines terminated by '\n';
  7. 可是因为还没有导入数据所以里面什么也没有,但是你可以通过 desc customer_inf查看一下字段
    desc customer_inf
  1. 将customer_inf表放进新建的ods库里面
    OIP _3_.jpg
  2. 查看字段
    OIP _3_.jpg

代码段导入数据

数据抽取代码部分一

package com.jnds

import org.apache.spark.sql.SparkSession

object task1 {


def main(args: Array[String]): Unit = {
// 创建一个是spark的会话 sparkSession是个名字想起什么都可以你记得就得了
val sparkSession=SparkSession.builder()
// 任务的名字起啥都可以
.appName("Task1")
// 启动到hive里面
.enableHiveSupport()
// 将获取现有的SparkSession,或者,如果没有现有的a,则基于构建器中设置的选项创建一个新的a。
.getOrCreate()
// 创建一个到mysql的连接(数据源)
val customerDf=sparkSession.read.format("jdbc")
// 数据库名称
.option("user","root")
// 数据库密码 密码是你自己设置的
.option("password","Mysql123...")
// 这个比如master的ip
.option("url","jdbc:mysql://172.16.7.220/db")
// 告诉它从哪里读取数据
.option("dbtable","(SELECT * FROM customer_inf WHERE modified_time>'2022-05-01 00:00:00') as t1")
// 读取
.load()
// 建立一个临时表存储读出来的数据
customerDf.createOrReplaceTempView("customer_inf_tmp")
// 把数据放到hive中
sparkSession.sql("use ods")
// 插数据
sparkSession.sql("INSERT INTO customer_inf partition (etl_date='20230703') SELECT * FROM customer_inf_tmp")
// 连接关闭
sparkSession.stop()
}

}


从IDEA里面scp到master里面

scp .\task2.jar [email protected]:/tmp 

到msater里面跑任务

  1. 跑任务
    spark-submit --master yarn --class com.jnds.task1 /tmp/task2.jar 

成功表示
OIP _3_.jpg

然后hive进去查看有没有导入成功

  1. use 你键的库名
  2. 查看数据是否放成功

    select * from customer_inf
  3. 最后输入这条命令然后截图,看到有这个就是算完成了(etl_date=20230703)

    show partitions ods.customer_inf;

OIP _3_.jpg