第六套题:
一、Python部分
1、数据处理
1.根据 distribution.csv 文件统计单条数据缺失字段计 数的最大值,将结果输出到控制台,输出格式如下: ===单条数据缺失字段计数的最大值为***===
import pandas as pd
df = pd.read_csv('../data/distribution.csv') missing_counts = df.isnull().sum() max_missing_count = missing_counts.max()
print("===单条数据缺失字段计数的最大值为{}===".format(max_missing_count))
|
2、数据标注
在末尾新增一列数据为“当日是否解冻”,若当日 最高温大于 0,并且风力小于等于 2 级,打标签为‘是’;否 则打标签为‘否’。标记完成后保存到当前目录,文件命名 为“annotation.xlsx”,并将数据截图粘贴到答题报告对应位置。
import pandas as pd
df = pd.read_excel('../data/长春天气信息.xlsx')
df['hightest_tem'] = df['hightest_tem'].astype(float)
df['wind_level'] = df['wind_level'].str.extract('(\d+)', expand=False).astype(int)
df.loc[(df['hightest_tem'] > 0) & (df['wind_level'] <= 2), '当日是否解冻'] = '是' df.loc[(df['hightest_tem'] <= 0) | (df['wind_level'] > 2), '当日是否解冻'] = '否'
print(df)
|
二、数据统计
子任务二:HDFS 操作
本任务需要使用 Hadoop,HDFS 命令,已安装 Hadoop 及 需要配置前置环境,具体要求如下:
(1)在 master 节点的 hadoop 环境中,使用 HDFS 命令 列出 HDFS 的文件和目录,将完整命令及结果截图粘贴到对 应答题报告中;
(2)使用 HDFS 命令创建一个名为 bigdata 目录,将完 整命令及结果截图粘贴到对应答题报告中;
(3)使用 HDFS 命令将/opt/eurasia_mainland.csv 文 件上传到 HDFS 文件系统的/bigdata 目录下,将完整命令及 结果截图粘贴到对应答题报告中;
hdfs dfs -put /opt/eurasia_mainland.csv /bigdata
|
(4)使用 HDFS 命令将/bigdata/eurasia_mainland.csv 12 12 文件下载到/root 目录下,将完整命令及结果截图粘贴到对 应答题报告中;
hdfs dfs -get /bigdata/eurasia_mainland.csv /root/
|
(5)使用 HDFS 令查看/bigdata/eurasia_mainland.csv 文件的数据内容,将完整命令及结果截图粘贴到对应答题报 告中。
hdfs dfs -cat /bigdata/eurasia_mainland.csv
|
修改mapred-site.xml 文件
1.cd到hadoop下
cd usr/local/src/hadoop-3.1.3/etc/hadoop/
|
2.vi编辑一下mapred-site.xml
将下面的复制到mapred-site.xml 里面
/usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录
<property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property>
|
3.重启一下hadoop
4.将mapred-site.xml 拷贝到两台机子
/usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录)
# slave1 scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/
# slave2 scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/
|
5.最后启动一下hadoop
要是运行出错可能Hadoop没有启动完,稍微等一下就好了
1.第一题
编写 MapReduce 程序,实现以下功能:清除年份、国家 区 域 为 空 的 数 据 , 将 清 理 后 的 数 据 保 存 到 HDFS 中 /clean_data 目录下,若目录不存在,请自行创建,使用命 令查看该文件的大小,将完整命令及结果截图粘贴到对应答 题报告中。
package org.January;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class task07 {
public static class taskMapper extends Mapper<LongWritable, Text, Text, Text> { private Text one = new Text(); private Text status = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(","); String year = line[0].trim(); String country = line[1].trim(); if (!year.isEmpty() && !country.isEmpty()) { one.set(year); status.set(value); context.write(one, status); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(key, value); } } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "task07"); job.setJarByClass(task07.class); job.setMapperClass(taskMapper.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path outpath = new Path(args[1]); FileSystem fileSystem = outpath.getFileSystem(conf); if (fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
2.第二题
编写 MapReduce 程序,实现以下功能:统计每个国家不 同年份基于灾害类型为气候灾害受损经济最高的国家,并在 控制台输出打印出气候灾害受损经济最高的 10 个国家
package org.exercise1;
import jdk.nashorn.internal.scripts.JO; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.util.TreeMap;
public class task06 { public static class taskMapper extends Mapper<Object, Text,Text, DoubleWritable>{ @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { if (key.toString().equals("0")) return; String line = value.toString(); String[] fields = line.split(","); String country = fields[1]; Double economic = Double.parseDouble(fields[7]); context.write(new Text(country), new DoubleWritable(economic)); } }
public static class Reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{ private TreeMap<Double, String> result = new TreeMap<>(); @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { String country = key.toString(); Double economicMax = Double.MIN_VALUE; for (DoubleWritable value : values){ economicMax = Math.max(economicMax, value.get()); } result.put(economicMax,country); } @Override protected void cleanup(Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (Double temp : result.descendingKeySet()){ if (count <= 10){ String country = result.get(temp); context.write(new Text(country), new DoubleWritable(temp)); count++; }else { break; } } } }
public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"task06"); job.setJarByClass(taskMapper.class); job.setMapperClass(taskMapper.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
最后一步运行JAR环境包
hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名
# 完整的语句 hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out
|