第五套题:
一、Python部分
1.数据处理
1.读取已经爬取到的 distribution.csv 数据文件,根据 表头字段名统计每一列缺失值个数
import pandas as pd
df = pd.read_csv("../data/distribution.csv")
df = df.isnull().sum() print(f"===统计每一列缺失值个数为\n{df}===")
|
2.数据标注
使用 Pandas 读取数据后,将数据按日期列升序排列, 在末尾新增一列数据为“是否适合出行游玩”,若当日为周 六周日,气温大于等于 18 度小于等于 30 度,并且不下雨, 打标签为‘是’;否则打标签为‘否’。标记完成后将标记数 据集保存到项目下的“taged_data.xlsx”的文件中
import pandas as pd
df = pd.read_excel("../data/鞍山.xlsx")
df['日期'] = pd.to_datetime(df['date'], errors='coerce')
df = df.sort_values('日期')
def clean(row): day = {'星期一': 1, '星期二': 2, '星期三': 3, '星期四': 4, '星期五': 5, '星期六': 6, '星期天': 7} wekkday = day[row['weekday']] hightest_tem = float(row['hightest_tem']) lowest_tem = float(row['lowest_tem']) weather =row['weather']
if wekkday == 6 and 18 <= hightest_tem <= 30 and 18 <= lowest_tem <= 30 and weather != '小雨': return '是' else: return '否'
df['适合游玩'] = df.apply(clean,axis=1)
df.to_excel('taged_data.xlsx') print(df)
|
二、数据统计:
hdfs操作
本任务需要使用 Hadoop,HDFS 命令,已安装 Hadoop 及 需要配置前置环境,具体要求如下:
1.在 master 节点 HDFS 根目录下创建 student 目录, 将完整命令及结果截图粘贴到对应答题报告中;
2.使用命令将/root/clean_month.csv 文件上传到 HDFS 文件系统的/student 目录下,将完整命令及结果截图 粘贴到对应答题报告中;
hdfs dfs -put /root/clean_month.csv /student
|
3.使用命令查看 HDFS 中/student/clean_month.csv 文件的后 5 条数据,将完整命令及结果截图粘贴到对应答题 报告中;
hdfs dfs -cat /student/clean_month.csv | tail -n 5
|
4.使用命令查看 HDFS 中/student 目录下每个文件所 占磁盘空间,人性化显示文件大小,将完整命令及结果截图 粘贴到对应答题报告中
修改mapred-site.xml 文件
1.cd到hadoop下
cd usr/local/src/hadoop-3.1.3/etc/hadoop/
|
2.vi编辑一下mapred-site.xml
将下面的复制到mapred-site.xml 里面
/usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录
<property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property>
|
3.重启一下hadoop
4.将mapred-site.xml 拷贝到两台机子
/usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录)
# slave1 scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/
# slave2 scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/
|
5.最后启动一下hadoop
要是运行出错可能Hadoop没有启动完,稍微等一下就好了
1.第一题
编写 MapReduce 程序,实现以下功能:清除月份为空 的数据,将清理后的数据输出到 HDFS 中/clean 目录下,若 目录不存在,请自行创建,使用命令查看该文件的大小,将 完整命令及结果截图粘贴到对应答题报告中。
package org.January; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
public class task04 { public static class taskMapper extends Mapper<LongWritable, Text, Text, Text> { private Text one = new Text(); private Text status = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(","); String fields = line[1].trim(); if (!fields.isEmpty()) { one.set(fields); status.set(value); context.write(one, status); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(key, value); } } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "task04"); job.setJarByClass(task04.class); job.setMapperClass(taskMapper.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path outpath = new Path(args[1]); FileSystem fileSystem = outpath.getFileSystem(conf); if (fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
2.第二题
编写 MapReduce 程序,实现以下功能:统计每个城市最 高温度,并在控制台输出温度最高的 5 个城市以及最高的温 度,将输出结果截图粘贴到对应答题报告中。
package org.February;
import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class task03 { public static class CityTempMapper extends Mapper<Object, Text, Text, DoubleWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { if(key.toString().equals("0")) return; String line = value.toString(); String[] fields = line.split(","); String city = fields[0]; double extremeHighTemp = Double.parseDouble(fields[4]); context.write(new Text(city), new DoubleWritable(extremeHighTemp)); } }
public static class CityTempReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private TreeMap<Double, String> cityTempMap = new TreeMap<>();
@Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { String city = key.toString(); double maxTemp = Double.MIN_VALUE; for (DoubleWritable value : values) { maxTemp = Math.max(maxTemp, value.get()); } cityTempMap.put(maxTemp, city); }
@Override protected void cleanup(Context context) throws IOException, InterruptedException { int count = 0; for (Double temp : cityTempMap.descendingKeySet()) { if (count < 5) { String city = cityTempMap.get(temp); context.write(new Text(city), new DoubleWritable(temp)); count++; } else { break; } } } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "task03"); job.setJarByClass(task03.class); job.setMapperClass(CityTempMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.setReducerClass(CityTempReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
最后一步运行JAR环境包
hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名
# 完整的语句 hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out
|