第五套题:

一、Python部分

1.数据处理

1.读取已经爬取到的 distribution.csv 数据文件,根据 表头字段名统计每一列缺失值个数

# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("../data/distribution.csv")

# 读取已经爬取到的 distribution.csv 数据文件,根据 表头字段名统计每一列缺失值个数
df = df.isnull().sum()
print(f"===统计每一列缺失值个数为\n{df}===")

2.数据标注

使用 Pandas 读取数据后,将数据按日期列升序排列, 在末尾新增一列数据为“是否适合出行游玩”,若当日为周 六周日,气温大于等于 18 度小于等于 30 度,并且不下雨, 打标签为‘是’;否则打标签为‘否’。标记完成后将标记数 据集保存到项目下的“taged_data.xlsx”的文件中

import pandas as pd

# 读取数据
df = pd.read_excel("../data/鞍山.xlsx")

# 将日期列转换成日期类型
df['日期'] = pd.to_datetime(df['date'], errors='coerce')
# 按照日期列升序排列
df = df.sort_values('日期')
# 判断
def clean(row):
# 定义日期
day = {'星期一': 1, '星期二': 2, '星期三': 3, '星期四': 4, '星期五': 5, '星期六': 6, '星期天': 7}
# 将日期转换成数字
wekkday = day[row['weekday']]
# 获取最高气温转换成浮点数
hightest_tem = float(row['hightest_tem'])
# 获取最低气温转换成浮点数
lowest_tem = float(row['lowest_tem'])
# 获取天气
weather =row['weather']

# 开始判断
if wekkday == 6 and 18 <= hightest_tem <= 30 and 18 <= lowest_tem <= 30 and weather != '小雨':
return '是'
else:
return '否'

df['适合游玩'] = df.apply(clean,axis=1)

# 保存
df.to_excel('taged_data.xlsx')
print(df)

二、数据统计:

hdfs操作

本任务需要使用 Hadoop,HDFS 命令,已安装 Hadoop 及 需要配置前置环境,具体要求如下:

1.在 master 节点 HDFS 根目录下创建 student 目录, 将完整命令及结果截图粘贴到对应答题报告中;

hdfs dfs -mkdir /student

2.使用命令将/root/clean_month.csv 文件上传到 HDFS 文件系统的/student 目录下,将完整命令及结果截图 粘贴到对应答题报告中;

hdfs dfs -put /root/clean_month.csv /student

3.使用命令查看 HDFS 中/student/clean_month.csv 文件的后 5 条数据,将完整命令及结果截图粘贴到对应答题 报告中;

hdfs dfs -cat /student/clean_month.csv | tail -n 5

4.使用命令查看 HDFS 中/student 目录下每个文件所 占磁盘空间,人性化显示文件大小,将完整命令及结果截图 粘贴到对应答题报告中

hdfs dfs -du -h /student

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编写 MapReduce 程序,实现以下功能:清除月份为空 的数据,将清理后的数据输出到 HDFS 中/clean 目录下,若 目录不存在,请自行创建,使用命令查看该文件的大小,将 完整命令及结果截图粘贴到对应答题报告中。

package org.January;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;

public class task04 {
// 定义一个taskMapper 继承Mapper
public static class taskMapper extends Mapper<LongWritable, Text, Text, Text> {
// 定义一个Text key
private Text one = new Text();
// value
private Text status = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将value转换成字符串,并且以逗号分割
String[] line = value.toString().split(",");
// 清除两端空格
String fields = line[1].trim();
// 判断月份是否为空
if (!fields.isEmpty()) {
// 或月份
one.set(fields);
// 获取整行
status.set(value);
context.write(one, status);
}
}
}
// 定义一个 Reduce 继承 Reducer
public static class Reduce extends Reducer<Text, Text, Text, Text> {
// 定义一个reduce 用来接收
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value); // 直接输出键值对,无需进行处理
}
}
}

public static void main(String[] args) throws Exception {
// 设置配置对象
Configuration conf = new Configuration();
// 设置 Job 类
Job job = Job.getInstance(conf, "task04");
// 设置 运行类
job.setJarByClass(task04.class);
// 设置 Map 类
job.setMapperClass(taskMapper.class);
// 设置 Reduce类
job.setReducerClass(Reduce.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(Text.class);
// 设置 输入路径
Path outpath = new Path(args[1]);
// 获取hdfs路径
FileSystem fileSystem = outpath.getFileSystem(conf);
// 判断
if (fileSystem.exists(outpath)){
fileSystem.delete(outpath, true);
}
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2.第二题

编写 MapReduce 程序,实现以下功能:统计每个城市最 高温度,并在控制台输出温度最高的 5 个城市以及最高的温 度,将输出结果截图粘贴到对应答题报告中。

package org.February;
// 导入需要的类
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class task03 {
// 定义一个 Mapper 类来处理输入的数据
public static class CityTempMapper extends Mapper<Object, Text, Text, DoubleWritable> {
// 重写 map 方法
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if(key.toString().equals("0")) return;
// 将输入的数据转换成 String 类型
String line = value.toString();
// 将数据按逗号分隔
String[] fields = line.split(",");
// 获取城市名称和最高温度
String city = fields[0];
double extremeHighTemp = Double.parseDouble(fields[4]);
// 将城市名称和最高温度作为键值对输出
context.write(new Text(city), new DoubleWritable(extremeHighTemp));
}
}

// 定义一个 Reducer 类来处理 Mapper 的输出
public static class CityTempReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

// 定义一个 TreeMap 来存储城市和最高温度,按温度降序排序
private TreeMap<Double, String> cityTempMap = new TreeMap<>();

// 重写 reduce 方法
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
// 获取城市名称
String city = key.toString();
// 遍历温度值,找出最高的一个
double maxTemp = Double.MIN_VALUE;
for (DoubleWritable value : values) {
maxTemp = Math.max(maxTemp, value.get());
}
// 将城市和最高温度存入 TreeMap 中,以温度为键,城市为值
cityTempMap.put(maxTemp, city);
}

// 重写 cleanup 方法,在任务结束时输出结果
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 输出温度最高的 5 个城市和最高温度
int count = 0;
for (Double temp : cityTempMap.descendingKeySet()) {
if (count < 5) {
String city = cityTempMap.get(temp);
context.write(new Text(city), new DoubleWritable(temp));
count++;
} else {
break;
}
}
}
}

// 定义一个主方法来运行任务
public static void main(String[] args) throws Exception {
// 创建一个 Configuration 对象
Configuration conf = new Configuration();
// 创建一个 Job 对象
Job job = Job.getInstance(conf, "task03");
// 设置运行类
job.setJarByClass(task03.class);
// 设置 Mapper 类
job.setMapperClass(CityTempMapper.class);
// 设置输出的 key 类型
job.setOutputKeyClass(Text.class);
// 设置输出的 value 类型
job.setOutputValueClass(DoubleWritable.class);
// 设置 Reducer 类
job.setReducerClass(CityTempReducer.class);
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out