第十套题

一、python部分

1、数据处理

1 、删除 hotel.csv 中 商 圈 为 空 的 数 据 并 且 存 入 hotel2_c1_N.csv,N 为删除的数据条数;

import pandas as pd

# 读取数据
df = pd.read_csv('./data/hotel.csv')
# print(df.info())
# 删除 hotel.csv 中 商 圈 为 空 的 数 据 并 且 存 入 hotel2_c1_N.csv,N 为删除的数据条数;
df_1 = df.dropna(subset='shopping')
df_2 = len(df)-len(df_1)
print(df_2)
df_1.to_csv(f'hotel2_c1_{df_2}.csv')

2、删除 hotel.csv 中缺失值大于 3 个的数据列并且存 入 hotel2_c2_N.csv,N 为删除的数据列变量名,多列时用下 划线“_”间隔无顺序要求;

import pandas as pd

# 读取数据
df = pd.read_csv('./data/hotel.csv')

# 删除 hotel.csv 中缺失值大于 3 个的数据列并且存入 hotel2_c2_N.csv,N 为删除的数据列变量名,多列时用下 划线“_”间隔无顺序要求;

df_1 = df.columns[df.isnull().sum() > 3]
df_2 = '_'.join(df_1)
df_3 = len(df) - len(df_2)
print(df_3)
df.drop(df_3)
df.to_csv(F'hotel2_c2_{df_3}.csv')

3、将 hotel.csv 中评分为空的数据设置为 0 并且存入 hotel2_c3.csv;

import pandas as pd

# 读取数据
df = pd.read_csv('./data/hotel.csv')
print(df.info())
# 将 hotel.csv 中评分为空的数据设置为 0 并且存入 hotel2_c3.csv;
df_1 = df['score'].fillna(0)
print(df_1)
df_1.to_csv('hotel2_c3.csv')

4、将 hotel.csv 中评分为空的数据设置为总平均评分 并且存入 hotel2_c4_N.csv,N 为总平均评分保留一位小数。

import pandas as pd

# 读取数据
df = pd.read_csv('./data/hotel.csv')
# print(df.info())
# 将 hotel.csv 中评分为空的数据设置为总平均评分 并且存入 hotel2_c4_N.csv,N 为总平均评分保留一位小数

df_1 = round(df['score'].mean(), 1)
print('平均评分为:',df_1)

df_2 = df['score'].fillna(df_1)
df_2.to_csv(F'hotel2_c4_{df_1}.csv')

2、数据标注

使用 SnowNLP 对酒店评论数据 hotel_comment.csv 进行 标注,获取情感倾向评分(sentiments),具体的标注规则 如下: 对情感倾向分数大于等于 0.6 的评论数据标注为正向; 对情感倾向分数大于 0.4 小于 0.6 的评论数据标注为中 性; 对情感倾向分数小于等于 0.4 的评论数据标注为负向。 根据采集到的评论信息,给出三类标注好的数据。存入standard.csv中

from snownlp import SnowNLP
import pandas as pd

# 读取数据
df = pd.read_csv('./data/hotel_comment.csv', encoding='gbk')
# print(df)

# 判断得出情感值
def ananly(comment):
qing = SnowNLP(comment).sentiments
if qing >= 0.6 :
return '正向'
elif qing >= 0.4 or qing <= 0.6:
return '中性'
elif qing <= 0.4:
return '负向'

df['情感倾向'] = df['评论信息'].apply(ananly)
df['备注'] = ''

zheng = df[df['情感倾向'] == '正向']
zhong = df[df['情感倾向'] == '中性']
fu = df[df['情感倾向'] == '负向']

# 合并数据
df = pd.concat([zheng,zhong,fu], ignore_index=True)
df = df.sort_values(by='情感倾向')

print(df.sample(10))
df[['编号','酒店名称','评论信息','情感倾向','备注']].to_csv('standard.csv', index=False, encoding='gbk')

二、数据统计

hdfs操作

1.首先你要将user_info.csv拷贝到root下和hdfs上面

1.本地命令拷贝

scp -r 你存放文件的地方目录 root:你的机子ip地址:/root

# 完整的语句
scp -r user_info.csv [email protected]:/root

2、创建文件夹

1.在 HDFS 目录下新建目录/file2_1

hdfs dfs -mkdir /file2_1

2.查看目录命令截图

hdfs dfs -ls /

3、修改权限

1.修改权限,赋予目录/file2_1 最高 777 权限

hdfs dfs -chmod 777 /file2_1

2.查看 目录权限截图

hdfs dfs -ls /

4、下载 HDFS 新建目录/file2_1,到本地容器 Master 指定目录/root/下

  1. -get是下载
  2. -put是上传
hdfs dfs -get  /file2-1 /root/

查看一下root是否成功下载下来file2_1

ls

6.如果遇到:

mkdir: Cannot create directory /input. Name node is in safe mode.(意思是hadoop安全模式正在开启,要关掉才可以新建)

hadoop强制关掉安全模式:

hdfs dfsadmin -safemode forceExit

出现:Safe mode is OFF 就代表关掉成功了

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编 写 MapReduce 程 序 , 实 现 以 下 功 能 : 将 user_info.csv 数据的分隔符“,”转换为“|”,输出文件到 HDFS,然后在在控制台按顺序打印输出前 10 条数据,将结 果截图粘贴至客户端桌面【Release\任务 E 提交结果.docx】 中对应的任务序号下。

package org.January;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class task05 {
// 设置 输入输出路径
public static void main(String[] args) throws Exception{
// 设置 配置对象
Configuration conf = new Configuration();
// 设置Job类
Job job = Job.getInstance(conf, "task05");
// 设置 运行类
job.setJarByClass(task05.class);
// 设置 Map 类
job.setMapperClass(taskMapper.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(NullWritable.class);
// 设置 文件路径
Path outpath = new Path(args[1]);
// 设置 hdfs路径
FileSystem fileSystem = outpath.getFileSystem(conf);
// 判断文件是否存在
if(fileSystem.exists(outpath)){
fileSystem.delete(outpath, true);
}
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

// 定义一个 taskMapper 继承 Mapper
public static class taskMapper extends Mapper<Object, Text, Text, NullWritable> {
// 定义一个 Text
private Text status = new Text();
// 定义一个map
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 将value转换成字符串 并且将逗号转换成 |
status.set(value.toString().replaceAll(",", "|"));
// 输出
context.write(status, NullWritable.get());
}
}
}

2.第二题

编写 Spark 程序,实现以下功能:对于 gender 这一字 段统计电商消费人数男女数量,将结果写入 HDFS 中,格式 为:(性别,人数),如:(男,10),在控制台读取 HDFS 文件 输 出 男 女 各 多 少 人 , 将 结 果 截 图 粘 贴 至 客 户 端 桌 面 【Release\任务 E 提交结果.docx】中对应的任务序号下.

package org.January;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.checkerframework.checker.units.qual.C;

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

public class task06 {

// 定义一个 taskMapper 继承 Mapper
public static class taskMapper extends Mapper<Object, Text, Text, IntWritable>{
// 定义一个 常量
private static final IntWritable one = new IntWritable(1);
// 定义一个 Text
private Text status = new Text();
// 定义一个 map 接收

@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将value转换成字符串
String line = value.toString();
// 以逗号分割得出数组
String[] fields = line.split(",");
// 判断数组长度
if(fields.length >= 13){
// 获取 gender字段
String genders = fields[10];
// 设置 status 为 gender
status.set(genders);
// 输出
context.write(status, one);

}
}
}
// 定义一个 Reduce 继承 Reducer
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
// 定义一个 Treemap
private TreeMap<Integer, String> result = new TreeMap<>();
// 定义一个 reduce 接收

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个 sum 为 0
int sum = 0;
// 遍历value 累加 sum
for (IntWritable value : values){
sum += value.get();
}
// 设置 result 为 sum
result.put(sum, key.toString());
}
// 在定义一个 Treemap

@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 重新定义一个Treemap
for(Map.Entry<Integer, String> entry : result.descendingMap().entrySet()){
// 获取key计数值
int count = entry.getKey();
// 获取 value 字符串值
String fields = entry.getValue();
// 输出
context.write(new Text(fields), new IntWritable(count));
}
}
}

// 设置输入输出路径
public static void main(String[] args) throws Exception{
// 设置配置对象
Configuration conf = new Configuration();
// 设置Job 类
Job job = Job.getInstance(conf, "task06");
// 设置 运行类
job.setJarByClass(task06.class);
// 设置map类
job.setMapperClass(taskMapper.class);
// 设置 Reduce类
job.setReducerClass(Reduce.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 值
job.setOutputValueClass(IntWritable.class);
// 设置 输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out