阶段三实施数据抽取

数据库要用到的表

shtu_student

在工作之前要先保证‘zookeeper’和‘kafka’可以正常启动

  1. 这个顺序是要先启动zookeeper
  2. 注意要启动三台机子才可以
    zkServer.sh start
  3. 检查状态确保运行正常 三台机子启动完两个follower和一个leader
    zkServer.sh status

启动 zookeeper
OIP _3_.jpg
OIP _3_.jpg

启动kafka

  1. 启动kafka (注意三台机子都要)
  2. 只要启动什么都没有就是成功了
  3. 最后jps看一下有kafka就可以了
    kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
    这个是停掉kafka的命令
    kafka-server-stop.sh

创建主题 (可以理解为数据库)

  1. test是看比赛题目(这些名字到时候比赛会明确给出)
  2. –replication-factor副本
  3. –partition分区
    第一个
    kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
    第二个
    kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic dim_class
    第三个
    kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic dim_score
    第四个
    kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic dim_student

成功标识出现Created topic test.
OIP _3_.jpg

代码部分

  1. 用idea来做,写完代码直接运行即可
    import com.google.gson.JsonParser
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
    import org.apache.flink.util.Collector

    import java.util.Properties

    object Task4 {
    def main(args: Array[String]): Unit = {
    // 准备运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 数据来源
    val prop=new Properties()
    // kafka的地址
    prop.setProperty("bootstrap.servers","172.16.7.220:9092,172.16.7.221:9092,172.16.7.222:9092")
    // 随意起个名字
    prop.setProperty("group.id","aaa")
    // 添加数据源
    val dataStream=env.addSource(new FlinkKafkaConsumer[String]("test",new SimpleStringSchema(),prop))
    dataStream.print()

    // 分流
    // 标签
    val classTag = new OutputTag[String]("class")
    val scoreTag = new OutputTag[String]("score")
    val studentTag= new OutputTag[String]("student")

    // 数据跟标签定义关系匹配
    val myStream = dataStream.process(new ProcessFunction[String,String] {
    override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
    val obj=JsonParser.parseString(i).getAsJsonObject
    val table=obj.get("table").getAsString // 获取表明
    val data=obj.getAsJsonObject("data").toString //获取它的数据


    if(table.equals("tb_class")){
    context.output(classTag,data)
    }
    else if(table.equals("tb_score")){
    context.output(scoreTag,data)
    }
    else if(table.equals("tb_student")){
    context.output(studentTag,data)
    }
    else{
    collector.collect(data)
    }
    }
    })

    // 获取输出流
    val classStream = myStream.getSideOutput(classTag)
    val scoreStream = myStream.getSideOutput(scoreTag)
    val studentStream = myStream.getSideOutput(studentTag)


    // 写入
    classStream.addSink(new FlinkKafkaProducer[String]("dim_class",new SimpleStringSchema(),prop))
    scoreStream.addSink(new FlinkKafkaProducer[String]("dim_score",new SimpleStringSchema(),prop))
    studentStream.addSink(new FlinkKafkaProducer[String]("dim_student",new SimpleStringSchema(),prop))


    env.execute("Task4")

    }

    }

成功标识就是像这样子卡着不动就可以了,要是出现一大堆东西就是报错了
OIP _3_.jpg

关于import选择

如果没有那就统一默认选择第一个

1.
OIP _3_.jpg
2.
OIP _3_.jpg
3.
OIP _3_.jpg
4.
OIP _3_.jpg
5.
OIP _3_.jpg
6.
OIP _3_.jpg
7.
OIP _3_.jpg
8.
OIP _3_.jpg

查看

  1. 查看安装的库(也叫主题)
    kafka-topics.sh --zookeeper master:2181 --list

如果有报错的方法解决

  1. 方法一:把/tmp/kafka-logs整个删掉就行了(每台机子都要删)
    对于kafka 如果有问题要诊断的时候,把启动命令中的-daemon去掉,这样就会在控制台输出详细的日志了
  2. 方法二:
    1. 停掉所有的kafka
      kafka-server-stop.sh
    2. master运行
      zkCli.sh
    3. 输入下面命令回车即可
      deleteall /brokers/topics/__consumer_offsets
    4. 退出
      quit
    5. 重启kafka
      kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties

报错二,如果上面这个方法还不可以试一下这个(注意:要是两个都不得多半是你安装服务安装错了)

  1. 清理错误的kafka安装残留信息:
    进到zkCli.sh里面清理安装残留
    zkCli.sh
    在ZOOKEEPER命令行依次执行
    rmr /brokers
    rmr /consumers
    rmr /admin
    rmr /config
    rmr /controller_epoch
    rmr /controller
    退出
    quit
    然后删除kafka-logs文件
    rm -rf /tmp/kafka-logs
    最后在重启一下
    kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties

写入数据(发送端)

  1. 在master里面模拟一个生产者生产变动
  2. 成功标记就是出来一个箭头标记就表示成功了
    kafka-console-producer.sh --broker-list master:9092 --topic test

多开一两个master来当消费者(接收端)

  1. 再开一台机子然后直接键入即可
  2. 成功标识就是什么都没有得了
    dim_class
    kafka-console-consumer.sh --bootstrap-server master:9092 --topic dim_class --from-beginning
    dim_score
    kafka-console-consumer.sh --bootstrap-server master:9092 --topic dim_score --from-beginning
    dim_student
    kafka-console-consumer.sh --bootstrap-server master:9092 --topic dim_student --from-beginning

临时数据

“注意有一些是class和score还有student的”

  1. 这数据要一条一条的插入
    {"database":"shtd_student","table":"tb_class","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":1,"cname":"Class#1","specialty":"人工智能","school":"武汉职业技术学院"}}
    {"database":"shtd_student","table":"tb_class","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":2,"cname":"Class#2","specialty":"人工智能","school":"武汉职业技术学院"}}
    {"database":"shtd_student","table":"tb_class","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":3,"cname":"Class#3","specialty":"人工智能","school":"广西职业技术学院"}}
    {"database":"shtd_student","table":"tb_score","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":4,"cname":"Class#4","specialty":"人工智能","school":"武汉职业技术学院"}}
    {"database":"shtd_student","table":"tb_score","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":5,"cname":"Class#5","specialty":"人工智能","school":"武汉职业技术学院"}}
    {"database":"shtd_student","table":"tb_student","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"sid":1,"sname":"jack","sex":0,"birthday":"2000-12-13","phone":"134343434444","address":"襄阳","scid":2,"reg_date":"2022-09-13"}}
    {"database":"shtd_student","table":"tb_student","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"sid":2,"sname":"rose","sex":0,"birthday":"2000-12-13","phone":"134343434444","address":"襄阳","scid":2,"reg_date":"2022-09-13"}}

修改mySQL配置文件

  1. 编辑MySQL配置文件 /etc/my.cnf,在结尾添加即可
    server-id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog-do-db=shtd_student
  2. 然后重启mysql
    systemctl restart mysqld

安装maxwell

先到mysort下在安装

tar -zxvf maxwell-1.29.0.tar.gz -C /usr/local/src/
cd /usr/local/src
mv maxwell-1.29.0/ maxwell

配置环境变量

  1. 使用vi /root/.bash_profile
  2. [键盘Shift+G处于最底部、键盘O回车一行] 末尾处添加
    #MAXWELL
    export MAXWELL_HOME=/usr/local/src/maxwell
    export PATH=:$PATH:$MAXWELL_HOME/bin

刷新环境变量

  1. 使用source /root/.bash_profile
  2. 或者. /root/.bash_profile 即可刷新系统变量使其生效

配置maxwell里面的config.properties.example文件

  1. 先cp一下
    cp config.properties.example config.properties
  2. vi进去修改配置文件
    vi config.properties
    这地方一共有两个地方要配
  3. kafka.bootstrap.servers=localhost:9092改为每一台机子的主机名
    kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092

  4. 还要新增一个topic(在kafka.bootstrap.servers 下面新增即可)
    test是你之前设置的那个主题名称
    kafka_topic=test
  5. mysql的用户和密码
    use:root
    password:你设置的密码
    mysql里面的host因为我是在本机操作所以不用修改,如果mysql在另一台机子上就要改为相应的IP地址

没修过的
OIP _3_.jpg
修改后的
OIP _3_.jpg

启动maxwell

maxwell --config /usr/local/src/maxwell/config.properties -daemon

如果出现报错用这个试一下

  1. 首先你先进入msyql里面执行
    mysql -uroot -pMysql123...
  2. 然后在mysql中设置时区,默认为SYSTEM
    set global time_zone='+8:00';
    最后在启动一次maxwell看看
    成功标识是在最后一行出行一( Binlog connected.)就是表示成功了

在mysql中设置时区
OIP _3_.jpg
报错部分
OIP _3_.jpg
成功表示
OIP _3_.jpg

最后执行生产者和消费者的步骤就ok了,还有去Navicat里面插入数据看看(注意:每条数据对应每一台机子)