第六套题:

一、Python部分

1、数据处理

1.根据 distribution.csv 文件统计单条数据缺失字段计 数的最大值,将结果输出到控制台,输出格式如下: ===单条数据缺失字段计数的最大值为***===

import pandas as pd

df = pd.read_csv('../data/distribution.csv')
missing_counts = df.isnull().sum()
max_missing_count = missing_counts.max()

print("===单条数据缺失字段计数的最大值为{}===".format(max_missing_count))

2、数据标注

在末尾新增一列数据为“当日是否解冻”,若当日 最高温大于 0,并且风力小于等于 2 级,打标签为‘是’;否 则打标签为‘否’。标记完成后保存到当前目录,文件命名 为“annotation.xlsx”,并将数据截图粘贴到答题报告对应位置。

# 导入库
import pandas as pd
# 读取数据
df = pd.read_excel('../data/长春天气信息.xlsx')

# 将hightest_tem转换成浮点数
df['hightest_tem'] = df['hightest_tem'].astype(float)
# 将wind_level部分数据转换成成整数类型
df['wind_level'] = df['wind_level'].str.extract('(\d+)', expand=False).astype(int)

# 判断
df.loc[(df['hightest_tem'] > 0) & (df['wind_level'] <= 2), '当日是否解冻'] = '是'
df.loc[(df['hightest_tem'] <= 0) | (df['wind_level'] > 2), '当日是否解冻'] = '否'

print(df)

二、数据统计

子任务二:HDFS 操作

本任务需要使用 Hadoop,HDFS 命令,已安装 Hadoop 及 需要配置前置环境,具体要求如下:

(1)在 master 节点的 hadoop 环境中,使用 HDFS 命令 列出 HDFS 的文件和目录,将完整命令及结果截图粘贴到对 应答题报告中;

hdfs dfs -ls /

(2)使用 HDFS 命令创建一个名为 bigdata 目录,将完 整命令及结果截图粘贴到对应答题报告中;

hdfs dfs -mkdir /bigdata

(3)使用 HDFS 命令将/opt/eurasia_mainland.csv 文 件上传到 HDFS 文件系统的/bigdata 目录下,将完整命令及 结果截图粘贴到对应答题报告中;

hdfs dfs -put /opt/eurasia_mainland.csv /bigdata

(4)使用 HDFS 命令将/bigdata/eurasia_mainland.csv 12 12 文件下载到/root 目录下,将完整命令及结果截图粘贴到对 应答题报告中;

hdfs dfs -get /bigdata/eurasia_mainland.csv /root/

(5)使用 HDFS 令查看/bigdata/eurasia_mainland.csv 文件的数据内容,将完整命令及结果截图粘贴到对应答题报 告中。

hdfs dfs -cat /bigdata/eurasia_mainland.csv

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编写 MapReduce 程序,实现以下功能:清除年份、国家 区 域 为 空 的 数 据 , 将 清 理 后 的 数 据 保 存 到 HDFS 中 /clean_data 目录下,若目录不存在,请自行创建,使用命 令查看该文件的大小,将完整命令及结果截图粘贴到对应答 题报告中。

package org.January;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class task07 {

// 定义一个taskMapper 继承Mapper
public static class taskMapper extends Mapper<LongWritable, Text, Text, Text> {
// 定义一个Text key
private Text one = new Text();
// value
private Text status = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
String year = line[0].trim();
String country = line[1].trim();
if (!year.isEmpty() && !country.isEmpty()) {
one.set(year);
status.set(value);
context.write(one, status);
}
}
}
// 定义一个 Reduce 继承 Reducer
public static class Reduce extends Reducer<Text, Text, Text, Text> {
// 定义一个reduce 用来接收
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value); // 直接输出键值对,无需进行处理
}
}
}

public static void main(String[] args) throws Exception {
// 设置配置对象
Configuration conf = new Configuration();
// 设置 Job 类
Job job = Job.getInstance(conf, "task07");
// 设置 运行类
job.setJarByClass(task07.class);
// 设置 Map 类
job.setMapperClass(taskMapper.class);
// 设置 Reduce类
job.setReducerClass(Reduce.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(Text.class);
// 设置 输入路径
Path outpath = new Path(args[1]);
// 获取hdfs路径
FileSystem fileSystem = outpath.getFileSystem(conf);
// 判断
if (fileSystem.exists(outpath)){
fileSystem.delete(outpath, true);
}
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2.第二题

编写 MapReduce 程序,实现以下功能:统计每个国家不 同年份基于灾害类型为气候灾害受损经济最高的国家,并在 控制台输出打印出气候灾害受损经济最高的 10 个国家

package org.exercise1;

import jdk.nashorn.internal.scripts.JO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

public class task06 {
// 定义一个 taskMapper 继承 Mapper
public static class taskMapper extends Mapper<Object, Text,Text, DoubleWritable>{
// 重写map
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
// 过滤第一行表头
if (key.toString().equals("0")) return;
// 将value转换成字符串
String line = value.toString();
// 逗号分割
String[] fields = line.split(",");
// 获取国家
String country = fields[1];
// 获取总经济损失
Double economic = Double.parseDouble(fields[7]);
// 输出
context.write(new Text(country), new DoubleWritable(economic));
}
}

// 定义一个 Reducer 处理Mapper
public static class Reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
// 定义一个 Treemap
private TreeMap<Double, String> result = new TreeMap<>();
// 重写reduce
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
// 获取国家
String country = key.toString();
// 获取受损经济最高的
Double economicMax = Double.MIN_VALUE;
// 遍历
for (DoubleWritable value : values){
economicMax = Math.max(economicMax, value.get());
}
// 设置 result 为 economicMax和country
result.put(economicMax,country);
}
// 定义一个 cleanup
@Override
protected void cleanup(Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
// 定义一个 count
int count = 0;
// 遍历得出最高
for (Double temp : result.descendingKeySet()){
// if判断
if (count <= 10){
// 获取国家
String country = result.get(temp);
// 输出
context.write(new Text(country), new DoubleWritable(temp));
count++;
}else {
break;
}
}
}
}

// 设置配置对象
public static void main(String[] args) throws Exception{
// 配置对象
Configuration conf = new Configuration();
// 设置 Job 类
Job job = Job.getInstance(conf,"task06");
// 设置 运行类
job.setJarByClass(taskMapper.class);
// 设置 Map 类
job.setMapperClass(taskMapper.class);
// 设置 Reducer类
job.setReducerClass(Reduce.class);
// 设置key类
job.setOutputKeyClass(Text.class);
// 设置Value类
job.setOutputValueClass(DoubleWritable.class);
// 设置 输入路径
FileInputFormat.addInputPath(job,new Path(args[0]));
// 设置 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out