Spark数据抽取3

阶段三实施数据抽取
数据库要用到的表
在工作之前要先保证‘zookeeper’和‘kafka’可以正常启动
- 这个顺序是要先启动zookeeper
- 注意要启动三台机子才可以
zkServer.sh start
- 检查状态确保运行正常 三台机子启动完两个follower和一个leader
zkServer.sh status
启动 zookeeper
启动kafka
- 启动kafka (注意三台机子都要)
- 只要启动什么都没有就是成功了
- 最后jps看一下有kafka就可以了这个是停掉kafka的命令
kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
kafka-server-stop.sh
创建主题 (可以理解为数据库)
- test是看比赛题目(这些名字到时候比赛会明确给出)
- –replication-factor副本
- –partition分区
第一个第二个kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
第三个kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic dim_class
第四个kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic dim_score
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic dim_student
成功标识出现Created topic test.
代码部分
- 用idea来做,写完代码直接运行即可
import com.google.gson.JsonParser
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.util.Collector
import java.util.Properties
object Task4 {
def main(args: Array[String]): Unit = {
// 准备运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 数据来源
val prop=new Properties()
// kafka的地址
prop.setProperty("bootstrap.servers","172.16.7.220:9092,172.16.7.221:9092,172.16.7.222:9092")
// 随意起个名字
prop.setProperty("group.id","aaa")
// 添加数据源
val dataStream=env.addSource(new FlinkKafkaConsumer[String]("test",new SimpleStringSchema(),prop))
dataStream.print()
// 分流
// 标签
val classTag = new OutputTag[String]("class")
val scoreTag = new OutputTag[String]("score")
val studentTag= new OutputTag[String]("student")
// 数据跟标签定义关系匹配
val myStream = dataStream.process(new ProcessFunction[String,String] {
override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
val obj=JsonParser.parseString(i).getAsJsonObject
val table=obj.get("table").getAsString // 获取表明
val data=obj.getAsJsonObject("data").toString //获取它的数据
if(table.equals("tb_class")){
context.output(classTag,data)
}
else if(table.equals("tb_score")){
context.output(scoreTag,data)
}
else if(table.equals("tb_student")){
context.output(studentTag,data)
}
else{
collector.collect(data)
}
}
})
// 获取输出流
val classStream = myStream.getSideOutput(classTag)
val scoreStream = myStream.getSideOutput(scoreTag)
val studentStream = myStream.getSideOutput(studentTag)
// 写入
classStream.addSink(new FlinkKafkaProducer[String]("dim_class",new SimpleStringSchema(),prop))
scoreStream.addSink(new FlinkKafkaProducer[String]("dim_score",new SimpleStringSchema(),prop))
studentStream.addSink(new FlinkKafkaProducer[String]("dim_student",new SimpleStringSchema(),prop))
env.execute("Task4")
}
}
成功标识就是像这样子卡着不动就可以了,要是出现一大堆东西就是报错了
关于import选择
如果没有那就统一默认选择第一个
1.
2.
3.
4.
5.
6.
7.
8.
查看
- 查看安装的库(也叫主题)
kafka-topics.sh --zookeeper master:2181 --list
如果有报错的方法解决
- 方法一:把/tmp/kafka-logs整个删掉就行了(每台机子都要删)
对于kafka 如果有问题要诊断的时候,把启动命令中的-daemon去掉,这样就会在控制台输出详细的日志了
- 方法二:
- 停掉所有的kafka
kafka-server-stop.sh
- master运行
zkCli.sh
- 输入下面命令回车即可
deleteall /brokers/topics/__consumer_offsets
- 退出
quit
- 重启kafka
kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
- 停掉所有的kafka
报错二,如果上面这个方法还不可以试一下这个(注意:要是两个都不得多半是你安装服务安装错了)
- 清理错误的kafka安装残留信息:
进到zkCli.sh里面清理安装残留在ZOOKEEPER命令行依次执行zkCli.sh
rmr /brokers
rmr /consumers
rmr /admin
rmr /config
rmr /controller_epoch
退出rmr /controller
然后删除kafka-logs文件quit
最后在重启一下rm -rf /tmp/kafka-logs
kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
写入数据(发送端)
- 在master里面模拟一个生产者生产变动
- 成功标记就是出来一个箭头标记就表示成功了
kafka-console-producer.sh --broker-list master:9092 --topic test
多开一两个master来当消费者(接收端)
- 再开一台机子然后直接键入即可
- 成功标识就是什么都没有得了
dim_classdim_scorekafka-console-consumer.sh --bootstrap-server master:9092 --topic dim_class --from-beginning
dim_studentkafka-console-consumer.sh --bootstrap-server master:9092 --topic dim_score --from-beginning
kafka-console-consumer.sh --bootstrap-server master:9092 --topic dim_student --from-beginning
临时数据
“注意有一些是class和score还有student的”
- 这数据要一条一条的插入
{"database":"shtd_student","table":"tb_class","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":1,"cname":"Class#1","specialty":"人工智能","school":"武汉职业技术学院"}}
{"database":"shtd_student","table":"tb_class","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":2,"cname":"Class#2","specialty":"人工智能","school":"武汉职业技术学院"}}
{"database":"shtd_student","table":"tb_class","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":3,"cname":"Class#3","specialty":"人工智能","school":"广西职业技术学院"}}
{"database":"shtd_student","table":"tb_score","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":4,"cname":"Class#4","specialty":"人工智能","school":"武汉职业技术学院"}}
{"database":"shtd_student","table":"tb_score","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"cid":5,"cname":"Class#5","specialty":"人工智能","school":"武汉职业技术学院"}}
{"database":"shtd_student","table":"tb_student","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"sid":1,"sname":"jack","sex":0,"birthday":"2000-12-13","phone":"134343434444","address":"襄阳","scid":2,"reg_date":"2022-09-13"}}
{"database":"shtd_student","table":"tb_student","type":"insert","ts":1634004537,"xid":1231232,"commit":true,"data":{"sid":2,"sname":"rose","sex":0,"birthday":"2000-12-13","phone":"134343434444","address":"襄阳","scid":2,"reg_date":"2022-09-13"}}
修改mySQL配置文件
- 编辑MySQL配置文件 /etc/my.cnf,在结尾添加即可
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=shtd_student - 然后重启mysql
systemctl restart mysqld
安装maxwell
先到mysort下在安装
tar -zxvf maxwell-1.29.0.tar.gz -C /usr/local/src/ |
配置环境变量
- 使用vi /root/.bash_profile
- [键盘Shift+G处于最底部、键盘O回车一行] 末尾处添加
#MAXWELL
export MAXWELL_HOME=/usr/local/src/maxwell
export PATH=:$PATH:$MAXWELL_HOME/bin
刷新环境变量
- 使用source /root/.bash_profile
- 或者. /root/.bash_profile 即可刷新系统变量使其生效
配置maxwell里面的config.properties.example文件
- 先cp一下
cp config.properties.example config.properties
- vi进去修改配置文件这地方一共有两个地方要配
vi config.properties
- kafka.bootstrap.servers=localhost:9092改为每一台机子的主机名
kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092
- 还要新增一个topic(在kafka.bootstrap.servers 下面新增即可)
test是你之前设置的那个主题名称kafka_topic=test
- mysql的用户和密码
use:root
mysql里面的host因为我是在本机操作所以不用修改,如果mysql在另一台机子上就要改为相应的IP地址password:你设置的密码
没修过的
修改后的
启动maxwell
maxwell --config /usr/local/src/maxwell/config.properties -daemon |
如果出现报错用这个试一下
- 首先你先进入msyql里面执行
mysql -uroot -pMysql123...
- 然后在mysql中设置时区,默认为SYSTEM最后在启动一次maxwell看看
set global time_zone='+8:00';
成功标识是在最后一行出行一( Binlog connected.)就是表示成功了
在mysql中设置时区
报错部分
成功表示
最后执行生产者和消费者的步骤就ok了,还有去Navicat里面插入数据看看(注意:每条数据对应每一台机子)
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MENGLAN!