第十套题
一、python部分
1、数据处理
1 、删除 hotel.csv 中 商 圈 为 空 的 数 据 并 且 存 入 hotel2_c1_N.csv,N 为删除的数据条数;
import pandas as pd
df = pd.read_csv('./data/hotel.csv')
df_1 = df.dropna(subset='shopping') df_2 = len(df)-len(df_1) print(df_2) df_1.to_csv(f'hotel2_c1_{df_2}.csv')
|
2、删除 hotel.csv 中缺失值大于 3 个的数据列并且存 入 hotel2_c2_N.csv,N 为删除的数据列变量名,多列时用下 划线“_”间隔无顺序要求;
import pandas as pd
df = pd.read_csv('./data/hotel.csv')
df_1 = df.columns[df.isnull().sum() > 3] df_2 = '_'.join(df_1) df_3 = len(df) - len(df_2) print(df_3) df.drop(df_3) df.to_csv(F'hotel2_c2_{df_3}.csv')
|
3、将 hotel.csv 中评分为空的数据设置为 0 并且存入 hotel2_c3.csv;
import pandas as pd
df = pd.read_csv('./data/hotel.csv') print(df.info())
df_1 = df['score'].fillna(0) print(df_1) df_1.to_csv('hotel2_c3.csv')
|
4、将 hotel.csv 中评分为空的数据设置为总平均评分 并且存入 hotel2_c4_N.csv,N 为总平均评分保留一位小数。
import pandas as pd
df = pd.read_csv('./data/hotel.csv')
df_1 = round(df['score'].mean(), 1) print('平均评分为:',df_1)
df_2 = df['score'].fillna(df_1) df_2.to_csv(F'hotel2_c4_{df_1}.csv')
|
2、数据标注
使用 SnowNLP 对酒店评论数据 hotel_comment.csv 进行 标注,获取情感倾向评分(sentiments),具体的标注规则 如下: 对情感倾向分数大于等于 0.6 的评论数据标注为正向; 对情感倾向分数大于 0.4 小于 0.6 的评论数据标注为中 性; 对情感倾向分数小于等于 0.4 的评论数据标注为负向。 根据采集到的评论信息,给出三类标注好的数据。存入standard.csv中
from snownlp import SnowNLP import pandas as pd
df = pd.read_csv('./data/hotel_comment.csv', encoding='gbk')
def ananly(comment): qing = SnowNLP(comment).sentiments if qing >= 0.6 : return '正向' elif qing >= 0.4 or qing <= 0.6: return '中性' elif qing <= 0.4: return '负向'
df['情感倾向'] = df['评论信息'].apply(ananly) df['备注'] = ''
zheng = df[df['情感倾向'] == '正向'] zhong = df[df['情感倾向'] == '中性'] fu = df[df['情感倾向'] == '负向']
df = pd.concat([zheng,zhong,fu], ignore_index=True) df = df.sort_values(by='情感倾向')
print(df.sample(10)) df[['编号','酒店名称','评论信息','情感倾向','备注']].to_csv('standard.csv', index=False, encoding='gbk')
|
二、数据统计
hdfs操作
1.首先你要将user_info.csv拷贝到root下和hdfs上面
1.本地命令拷贝
scp -r 你存放文件的地方目录 root:你的机子ip地址:/root
# 完整的语句 scp -r user_info.csv [email protected]:/root
|
2、创建文件夹
1.在 HDFS 目录下新建目录/file2_1
2.查看目录命令截图
3、修改权限
1.修改权限,赋予目录/file2_1 最高 777 权限
hdfs dfs -chmod 777 /file2_1
|
2.查看 目录权限截图
4、下载 HDFS 新建目录/file2_1,到本地容器 Master 指定目录/root/下
- -get是下载
- -put是上传
hdfs dfs -get /file2-1 /root/
|
查看一下root是否成功下载下来file2_1
6.如果遇到:
mkdir: Cannot create directory /input. Name node is in safe mode.(意思是hadoop安全模式正在开启,要关掉才可以新建)
hadoop强制关掉安全模式:
hdfs dfsadmin -safemode forceExit
|
出现:Safe mode is OFF 就代表关掉成功了
修改mapred-site.xml 文件
1.cd到hadoop下
cd usr/local/src/hadoop-3.1.3/etc/hadoop/
|
2.vi编辑一下mapred-site.xml
将下面的复制到mapred-site.xml 里面
/usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录
<property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property>
|
3.重启一下hadoop
4.将mapred-site.xml 拷贝到两台机子
/usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录)
# slave1 scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/
# slave2 scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/
|
5.最后启动一下hadoop
要是运行出错可能Hadoop没有启动完,稍微等一下就好了
1.第一题
编 写 MapReduce 程 序 , 实 现 以 下 功 能 : 将 user_info.csv 数据的分隔符“,”转换为“|”,输出文件到 HDFS,然后在在控制台按顺序打印输出前 10 条数据,将结 果截图粘贴至客户端桌面【Release\任务 E 提交结果.docx】 中对应的任务序号下。
package org.January;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class task05 { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "task05"); job.setJarByClass(task05.class); job.setMapperClass(taskMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Path outpath = new Path(args[1]); FileSystem fileSystem = outpath.getFileSystem(conf); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
public static class taskMapper extends Mapper<Object, Text, Text, NullWritable> { private Text status = new Text(); @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { status.set(value.toString().replaceAll(",", "|")); context.write(status, NullWritable.get()); } } }
|
2.第二题
编写 Spark 程序,实现以下功能:对于 gender 这一字 段统计电商消费人数男女数量,将结果写入 HDFS 中,格式 为:(性别,人数),如:(男,10),在控制台读取 HDFS 文件 输 出 男 女 各 多 少 人 , 将 结 果 截 图 粘 贴 至 客 户 端 桌 面 【Release\任务 E 提交结果.docx】中对应的任务序号下.
package org.January;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.checkerframework.checker.units.qual.C;
import java.io.IOException; import java.util.Map; import java.util.TreeMap;
public class task06 {
public static class taskMapper extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text status = new Text();
@Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); if(fields.length >= 13){ String genders = fields[10]; status.set(genders); context.write(status, one);
} } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private TreeMap<Integer, String> result = new TreeMap<>();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values){ sum += value.get(); } result.put(sum, key.toString()); }
@Override protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { for(Map.Entry<Integer, String> entry : result.descendingMap().entrySet()){ int count = entry.getKey(); String fields = entry.getValue(); context.write(new Text(fields), new IntWritable(count)); } } }
public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "task06"); job.setJarByClass(task06.class); job.setMapperClass(taskMapper.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
最后一步运行JAR环境包
hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名
# 完整的语句 hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out
|