第四套题:

一、Python部分

1、数据处理

1.删除shopping.csv中库存小于 10 或库存大于 10000 的数据,并存入shop1.csv;

# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("../data/shopping.csv")
print(df.info())

# 删除shopping.csv中库存小于 10 或库存大于10000 的数据,并存入shop1.csv
df_1 = df.drop(df[(df['库存'] < 10) | (df['库存'] > 10000)].index)
print(df_1['库存'])
df_1.to_csv("shop1.csv")

2.将涉及“刷单”、“捡漏”等字段的数据删除, 并存入shop2.csv;

# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("../data/shopping.csv")
print(df.info())

# 将涉及“刷单”、“捡漏”等字段的数据删除, 并存入shop2.csv;
df_1 = df.drop(df[(df['名称'] == '刷单') | (df['名称'] == '捡漏')].index)
print(df_1['名称'])
df_1.to_csv("shop2.csv")

3.将商品中涉及“女装”字段的数据删除,并存入 shop3.csv;

# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("../data/shopping.csv")
print(df.info())


# 将商品中涉及“女装”字段的数据删除,并存入 shop3.csv;
df_1 = df.drop(df[df['名称'] == '女装'].index)
print(df_1)

4.将shopping.csv中手机价格为区间数据的,设置为 价格区间的平均数,存入shop4.csv。

# 导入库
import pandas as pd

# 读取数据
df = pd.read_csv("../data/shopping.csv")
print(df.info())

# 将shopping.csv中手机价格为区间数据的,设置为价格区间的平均数,存入shop4.csv。
df['平均数'] = df['价格'].apply(lambda x : sum(map(float, x.split('-'))) //2 if '-' in x else float(x))
print(df['平均数'])

2、数据标注

import pandas as pd
from snownlp import SnowNLP
# 导入数据
data = pd.read_csv('./文本/model_comment.csv', encoding='utf-8')

# 调用sonwnlp方法进行分析,得出情感值
def anzlyze(comment):
sentiment = SnowNLP(comment).sentiments # sentiments情感分数
if sentiment >= 0.6:
return '正向'
elif sentiment >= 0.4 or sentiment <= 0.6:
return '中性'
elif sentiment <= 0.4:
return '负向'

# 调用上面定义好的 anzlyze 方法进行对手机商城评论信息挨个分析
data['情感倾向'] = data['评论信息'].apply(anzlyze)

# 取出正向 中性 负向的评论数据
zheng = data[data['情感倾向'] == '正向']
zhong = data[data['情感倾向'] == '中性']
fu = data[data['情感倾向'] == '负向']

# 合并正向 中性 负向的数据 ignore_index:忽略索引列
data = pd.concat([zheng, zhong, fu], ignore_index=True)

# 将数据进行排序一下 sort_values:排序
data.sort_values(by='情感倾向')

# 保存存入model_sen.csv
data.to_csv('model_sen.csv', index=False,encoding='utf-8')
# 打印
print(data)

二、数据统计

hdfs操作

1.首先你要将你需要运行的文件拷贝到root下和hdfs上面

1.本地命令拷贝

scp -r 你存放文件的地方目录 root:你的机子ip地址:/root

# 完整的语句
scp -r user_info.csv [email protected]:/root

2、创建文件夹

1.在 HDFS 目录下新建目录/input

hdfs dfs -mkdir /input

2.查看目录命令截图

hdfs dfs -ls /

3、下载 HDFS 新建目录/input,到本地容器 Master 指定目录/root/下

  1. -get是下载
  2. -put是上传
hdfs dfs -get  /input /root/

查看一下root是否成功下载下来file2_1

ls

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编写 MapReduce 程序,实现以下功能:清除数据中分 隔符混乱的,多于11个字段的数据,输出文件到HDFS;在 控制台按顺序打印输出前 10条数据

package org.example;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class task08 {

// 定义一个taskMapper 继承 Mapper
public static class taskmapper extends Mapper<LongWritable, Text, Text, Text> {

// 定义一个map接收数据
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将value转换成字符串
String line = value.toString();
// 以制表符分割
String[] fields = line.split("\t");

// 判断小于11个字段输出
if (fields.length <= 11) {
context.write(new Text(line), new Text());
}
}
}
// 定义一个Reduce 继承 Reduce
public static class Reduce extends Reducer<Text, Text, Text, Text> {
// 定义一个count 初始化为0
private int count = 0;
// 定义一个Reduce接收数据
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 输出清洗后的数据到HDFS
context.write(key, new Text());
}
}
// 设置输出输入路径
public static void main(String[] args) throws Exception {
// 配置对象
Configuration conf = new Configuration();
// 设置Job类
Job job = Job.getInstance(conf, "task08");
// 设置运行类
job.setJarByClass(task08.class);
// 设置map类
job.setMapperClass(taskmapper.class);
// 设置Reduce类
job.setReducerClass(Reduce.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(Text.class);
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}


}

2.第二题:

编写MapReduce程序,实现以下功能:根据 user_impression这一字段,统计买家对商家销售的手机商 品的印象,结果按照印象数降序排序,格式为: (user_impression,次数),如:(性价比高,10),结果保存 至HDFS,在控制台读取HDFS文件输出各组人数

package org.exercise1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class task04 {
// 定义一个 taskMapper 继承 Mapper
public static class taskMapper extends Mapper<Object, Text,Text, IntWritable>{
// 定义一个 常量
private static final IntWritable one = new IntWritable(1);
// 定义一个Text
private Text status = new Text();
// 重写 map
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将 value 转换成 字符串
String line = value.toString();
// 制表符分割
String[] fields = line.split("\t");
// 判断数组长度
if(fields.length >= 11){
// 获取user_impression并且空格分割
String[] impressions = fields[6].split(" ");
for (String impression : impressions){
// 设置 status 为 impression
status.set(impression);
context.write(status,one);
}
}
}
}

// 定义一个 Reducer 处理 Mapper
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
// 定义一个
private static IntWritable result = new IntWritable();
// 重写reduce
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个 count 为 0
int count = 0;
// 遍历 value 累加count
for (IntWritable value : values){
count += value.get();
}
// 设置 result 为 count
result.set(count);
// 输出
context.write(key,result);
}
}

// 设置配置对象
public static void main(String[] args) throws Exception{
// 配置对象
Configuration conf = new Configuration();
// 设置 Job 类
Job job = Job.getInstance(conf,"task04");
// 设置 运行类
job.setJarByClass(taskMapper.class);
// 设置 map类
job.setMapperClass(taskMapper.class);
// 设置 Reducer 类
job.setReducerClass(Reduce.class);
// 设置 key类
job.setOutputKeyClass(Text.class);
// 设置 value类
job.setOutputValueClass(IntWritable.class);
// 设置输入累加
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出累加
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out