第三套题

一、python部分

( 1 ) 删 除 hotel.csv 中 商 圈 为 空 的 数 据 并 且 存 入 hotel2_c1_N.csv,N 为删除的数据条数;

  1. shopping:这个是商圈的英文名称  
    
  2. dropna:删除是否为空  
    
  3. subset:判断哪一个字段
    
  4. len:用于返回一个对象的长度或元素的数量。它可以用于字符串、列表、元组、字典、集合等可迭代对象。
    
  5. to_csv:保存的csv文件
    
# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("文件路径")

# 删除商圈为空的数据
df_1 = df.dropna(subset=['shopping'])
# 用len判断元素数量,然后用原数据减去删除的数据
df_2 = len(df) - len(df_1)

# 保存数据 encoding='utf-8':字符格式
df.to_csv(f'hotel2_c1_{df_2}.csv', encoding='utf-8')

(2)删除 hotel.csv 中缺失值大于 3 个的数据列并且存 入 hotel2_c2_N.csv,N 为删除的数据列变量名,多列时用下划 线“_”间隔无顺序要求;

  1. columns: 判断每一列
  2. isnull:判断哪个数据为空
  3. sum:求和
  4. join:是一个字符串方法用于元素拼接,并返回新的字符串
# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("文件路径")

# 删除缺失值大于 3 个的数据
df_1 = df.columns[df.isnull().sum() > 3]
# 列时用下划 线“_”间隔无顺序要求;
df_2 = '_'.join(df_1)
# 用len判断元素数量,然后用原数据减去删除的数据
df_3 = len(df) - len(df_2)

# 保存数据
df.to_csv(f'hotel2_c2_{drop_3}.csv')

(3)将 hotel.csv 中评分为空的数据设置为 0 并且存入 hotel2_c3.csv;

  1. fillna:用来填数据中的缺失值
# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("文件路径")

# 评分为空的数据设置为 0
score = df['score'].fillna(0)
# 保存数据
df.to_csv('hotel2_c3.csv')

(4)将 hotel.csv 中评分为空的数据设置为总平均评分 并且存入 hotel2_c4_N.csv,N 为总平均评分保留一位小数。

  1. round:用来对数字进行四舍五入后面逗号的 1 是总平均评分保留一位小数
  2. mea():计算平均数
# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("文件路径")

# 中评分为空的数据设置为总平均评分
score = round(df['score'].mean(), 1)
# 保存数据
df.to_csv(f'hotel2_c4_{score}.csv')

二、数据统计MapReduce

hdfs操作

1.首先你要将user_info.csv拷贝到root下和hdfs上面

1.本地命令拷贝

scp -r 你存放文件的地方目录 root:你的机子ip地址:/root

# 完整的语句
scp -r user_info.csv [email protected]:/root

2、创建文件夹

1.在 HDFS 目录下新建目录/file2_1

hdfs dfs -mkdir /file2_1

2.查看目录命令截图

hdfs dfs -ls /

3、修改权限

1.修改权限,赋予目录/file2_1 最高 777 权限

hdfs dfs -chmod 777 /file2_1

2.查看 目录权限截图

hdfs dfs -ls /

4、下载 HDFS 新建目录/file2_1,到本地容器 Master 指定目录/root/下

  1. -get是下载
  2. -put是上传
hdfs dfs -get  /file2-1 /root/

查看一下root是否成功下载下来file2_1

ls

6.如果遇到:

mkdir: Cannot create directory /input. Name node is in safe mode.(意思是hadoop安全模式正在开启,要关掉才可以新建)

hadoop强制关掉安全模式:

hdfs dfsadmin -safemode forceExit

出现:Safe mode is OFF 就代表关掉成功了

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编写 MapReduce 程序,实现以下功能:将 user_info.csv 数据的分隔符“,”转换为“|”,输出文件到 HDFS,然后 在在控制台按顺序打印输出前 10 条数据

MapReduce代码

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;


/***
*编写 MapReduce 程序,实现以下功能:将 user_info.csv
* 数据的分隔符“,”转换为“|”
*
*/
public class task01 {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//todo 初始化config配置项
Configuration conf = new Configuration();
//todo 设置job任务,设置任务的名称
Job job = Job.getInstance(conf, "task01");
//todo 设置job任务主类
job.setJarByClass(task01.class);
//todo 设置mapper类
job.setMapperClass(taskMapper.class);
//todo 设置output key输出的类型
job.setOutputKeyClass(Text.class);
//todo 设置output value输出的类型
job.setOutputValueClass(NullWritable.class);
// todo 获取输出文件路径
Path outPath = new Path(args[1]);
// todo 获取hdfs文件路径
FileSystem fileSystem = outPath.getFileSystem(conf);
// todo 判断输出文件路径是否存在
if (fileSystem.exists(outPath)) {
// todo 直接删除文件目录
fileSystem.delete(outPath, true);
}
//todo 第一个参数为输入的文件路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//todo 第二个参数为输出的文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private static class taskMapper extends Mapper<Object, Text, Text, NullWritable>{
private static Text text = new Text();
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 将逗号修改为 |
text.set(value.toString().replaceAll(",", "|"));
context.write(text, NullWritable.get());
}
}
}

2.第二题

编写 MapReduce 程序,实现以下功能: 对于 order_status 这一字段统计每种状态的订单总数,将结果写 入 HDFS,在控制台读取 HDFS 文件

(步骤和上面一样,注意:类名不一样还有定义的名称)

MapReduce代码

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderStatusMR {

// 定义 Map 类,继承 Mapper
public static class Map extends Mapper<Object, Text, Text, IntWritable> {
// 定义常量 1
private final static IntWritable one = new IntWritable(1);
// 定义 Text 对象,用于存储 order_status
private Text status = new Text();

// 定义 map 方法,接收键值对,输出键值对
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 将 value 转换为字符串
String line = value.toString();
// 按照逗号分隔字符串,得到一个数组
String[] fields = line.split(",");
// 判断数组长度是否合法,至少应该有 20 个字段
if (fields.length >= 20) {
// 获取 order_status 字段,位于第 5 个位置
String order_status = fields[4];
// 设置 status 为 order_status
status.set(order_status);
// 输出键值对,键是 order_status,值是 1
context.write(status, one);
}
}
}

// 定义 Reduce 类,继承 Reducer
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
// 定义 IntWritable 对象,用于存储 count
private IntWritable result = new IntWritable();

// 定义 reduce 方法,接收键值对,输出键值对
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 初始化 count 为 0
int count = 0;
// 遍历 values 集合,累加 count
for (IntWritable value : values) {
count += value.get();
}
// 设置 result 为 count
result.set(count);
// 输出键值对,键是 order_status,值是 count
context.write(key, result);
}
}

// 定义 Driver 类,设置输入和输出路径,以及 Map 和 Reduce 类
public static void main(String[] args) throws Exception {
// 创建配置对象
Configuration conf = new Configuration();
// 创建 Job 对象
Job job = Job.getInstance(conf, "OrderStatusMR");
// 设置运行类
job.setJarByClass(OrderStatusMR.class);
// 设置 Map 类
job.setMapperClass(Map.class);
// 设置 Reduce 类
job.setReducerClass(Reduce.class);
// 设置输出键的类型
job.setOutputKeyClass(Text.class);
// 设置输出值的类型
job.setOutputValueClass(IntWritable.class);
// 设置输入路径,从命令行参数获取
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径,从命令行参数获取
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out