第八套题:

一、Python部分

1.数据处理

打印data.csv数据

# 导入库
import pandas as pd

# 导入数据
big_data = pd.read_csv('./csv文档/data.csv')
# 打印数据
print(big_data)
# 打印字段
print(big_dta.info())

2.子任务二:数据处理

现已从相关网站及平台获取到原始数据集,为保障用户 隐私和行业敏感信息,已进行数据脱敏。数据脱敏是指对某 些敏感信息通过脱敏规则进行数据的变形,实现敏感隐私数 据的可靠保护。在涉及客户安全数据或者一些商业性敏感数 据的情况、不违反系统规则条件下,对真实数据进行改造并 提供测试使用,如身份证号、手机号等个人信息都需要进行 数据脱敏。

打开 ZZ052-8-M2-T1-SUBT1 文件夹,文件夹中包含 data.csv 文件。你的小组需要通过编写代码或脚本完成对 相关数据文件中数据的清洗和整理。请分析相关数据集, 根据题目规定要求实现数据处理,具体要求如下:

1.NAN值代表用户未浏览该页面,查看数据,将NAN 替换为0,然后存入data_c1.csv中;

# 导入库
import pandas as pd

# 导入数据
big_data = pd.read_csv('./csv文档/data.csv')

# 将NAN 替换为0,然后存入data_c1.csv中
big_data['confirmation_page'] = big_data['confirmation_page'].fillna(0)
big_data.to_csv('data_c1.csv')
print(big_data)

第二种

# 导入库
import pandas as pd

# 导入数据
big_data = pd.read_csv('./csv文档/data.csv')
# # 将NAN 替换为0,然后存入data_c1.csv中
big_data = big_data.fillna(0)
# big_data.to_csv('data_c1.csv')
print(big_data)

2.异常值处理:将年龄(age)数字大于等于100的 异常数据删除,然后存入data_c3.csv中; 将上述(1)-(3)任务的代码截图复制粘贴至客户端 10 10 桌面【M2-T1-SUBT2-提交结果 1.docx】中对应的任务序号下。

# 导入库
import pandas as pd

# 导入数据
big_data = pd.read_csv('./csv文档/data.csv')

# 异常值处理:将年龄(age)数字大于等于100的 异常数据删除,然后存入data_c3.csv中
big_data1 = big_data.drop(big_data[big_data['age'] >= 100].index)
big_data1.to_csv('data_c3.csv', index=False)
print(big_data1)

3.缺失值处理

①查看缺失值个数;

# 导入库
import pandas as pd

# 导入数据
big_data = pd.read_csv('./csv文档/data.csv')

# 查看缺失值的个数
print(big_data.isnull().sum())

②处理“source”列缺失值:当用户为新用户,source 缺失值填充为direct,老客户填充seo;

import pandas as pd

# 导入数据
data = pd.read_csv('./data/data.csv')

# 查看缺失值
print("source缺失值:", data["source"].isnull().sum())

# 处理“source”列缺失值:当用户为新用户,source 缺失值填充为direct,老客户填充seo;
data.loc[data["market"] <= 1, "source"] = "direct"
data.loc[data["market"] > 1, "source"] = "seo"

print(data["source"])
print("source缺失值:", data["source"].isnull().sum())

③处理“device”列缺失值:操作系统为mac,window, linux的设备空值填充为desktop,将操作系统为ioS, android的设备空值填充为mobile,如果为other和NAN值则 填充为众数;

import pandas as pd
import numpy as np

# 导入数据
data = pd.read_csv('./data/data.csv')

# 查看缺失值
print("device缺失值:", data["device"].isnull().sum())

# ~ 运算符来取反条件,即找到不在给定列表中的元素 isin() 方法返回一个布尔 Series ~ 运算符来取反条件,即找到不在给定列表中的元素
data.loc[(data["device"].isnull()) & (~data["operative_system"].isin(["mac", "window", "linux"])), "device"] = "desktop"
data.loc[(data["device"].isnull()) & (~data["operative_system"].isin(["iOS", "android"])), "device"] = "mobile"

# 将 "other" 和 NaN 值填充为众数 mode() 函数用于计算列中的众数 通过 [0] 索引,我们可以获取到众数的值
data.loc[data["device"].isin(["other", np.nan]), "device"] = data["device"].mode()[0]

print("处理后的device",data["device"].isnull().sum())

④处理“operative_system”缺省值:当用户设备时 mobile时,操作系统填充为iOS,当用户设备为desktop,操 作系统填充为windows。

import pandas as pd


# 导入数据
data = pd.read_csv('./data/data.csv')

print("operative_system缺失值有:",data["operative_system"].isnull().sum())

# 处理缺失值
data.loc[(data["operative_system"].isnull()) & (~data["device"].isin(["mobile"])), "operative_system"] = "iOS"
data.loc[(data["operative_system"].isnull()) & (~data["device"].isin(["desktop"])), "operative_system"] = "windows"

print("处理后的operative_system:",data["operative_system"].isnull().sum())

⑤处理sex缺失值:如果是linux系统,填充为Male,其 他均填充为Female。 所有缺失值处理完后,存入data_c4.csv中。

import pandas as pd


# 导入数据
data = pd.read_csv('./data/data.csv')

print("sex缺失值有:",data["sex"].isnull().sum())

# 缺失值处理
data.loc[(data["sex"].isnull()) & (~data["operative_system"].isin(['linux'])), "sex"] = "Male"
data.loc[(data["sex"].isnull()) & (~data["operative_system"].isin([])), "sex"] = "Female"

print("处理后的sex:",data["sex"].isnull().sum())

2.数据标注

对 data_c4.csv 数据进行标注,判断客服是否下单,具 体的标注规则如下:

(1)如果“confirmation_page”列数据为 1,则数据 标注为‘yes’;

(2)如果“confirmation_page”列数据为 0,则数据] 标注为‘no’; 标注好的数据存储为列‘subscribe’并和 data_c4.csv 数据合并存入 result.csv。 将代码截图复制粘贴至客户端桌面【M2-T2-SUBT1-提交 结果 1.docx】中对应的任务序号下。

# 导入库
import pandas as pd
from snownlp import SnowNLP

# 导入数据
big_data = pd.read_csv('./csv文档/data.csv')
def config(comment):
if comment == 1:
return 'yes'
else:
return 'no'
big_data['confirmation_page'] = big_data['new_user'].apply(config)

# 拿出数据
yes = df[df['confirmation_page'] == 'yes']
no = df[df['confirmation_page'] == 'no']

# 合并拿出来的数据
df = pd.concat([yes,no], ignore_index=True)


big_data.to_csv('result.csv')
print(big_data)

二、数据统计

hdfs操作

1.子任务一:HDFS 文件上传下载 本任务需要使用 Hadoop,HDFS 命令,已安装 Hadoop 及 需要配置前置环境,具体要求如下:

(1)在 Master 中的/root/目录下新建一个文件 夹:result,将创建文件夹命令与结果截图粘贴至客户端桌 面【M2-T3-SUBT1-提交结果 1.docx】中对应的任务序号下;

mkdir /root/result

(2)使用 HDFS 命令,将 Master 下:/root 目录下新建 的文件夹:result 上传到 HDFS 指定目录下:/根目录下;并 且使用 HDFS 命令查看目录;将 HDFS 上传,查看命令截图粘 贴至客户端桌面【M2-T3-SUBT1-提交结果 2.docx】中对应的 任务序号下;

hdfs dfs -put /root/result /

查看:

hdfs dfs -ls /

(3)使用 HDFS 命令,将 HDFS 目录下的/result 文件夹 下载到 Master 指定目录下:根目录下;将下载文件夹命令与 结果截图粘贴至客户端桌面【M2-T3-SUBT1-提交结果 3.docx】 中对应的任务序号下。

hdfs dfs -get /result /

查看

ls /root/

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编 写 MapReduce 程 序 , 实 现 以 下 功 能 : 将 user_info.csv 数据的分隔符“,”转换为“|”,输出文件到 HDFS,然后在控制台按顺序打印输出前 10 条数据,将结果 截图粘贴至客户端桌面【M2-T3-SUBT2-提交结果 1.docx】中 对应的任务序号下。

package org.January;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class task10 {
// 定义一个 taskMapper 继承 Mapper
public static class taskMapper extends Mapper<Object, Text, Text, NullWritable>{
// 定义一个 Text
private Text status = new Text();
// 定义一个 map 用来接收
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 将value转换成字符串 并且逗号转换成 |
status.set(value.toString().replaceAll(",", "|"));
// 输出
context.write(status, NullWritable.get());
}
}

// 设置输入输出路径
public static void main(String[] args) throws Exception{
// 设置配置对象
Configuration conf = new Configuration();
// 设置 Job 类
Job job = Job.getInstance(conf, "task10");
// 设置 运行类
job.setJarByClass(task10.class);
// 设置 Map类
job.setMapperClass(taskMapper.class);
// 设置 key类
job.setOutputKeyClass(Text.class);
// 设置 Value类
job.setOutputValueClass(NullWritable.class);
// 设置文件路径
Path outpath = new Path(args[1]);
// 获取hdfs路径
FileSystem fileSystem = outpath.getFileSystem(conf);
// 判断文件是否存在 如果存在就删除
if(fileSystem.exists(outpath)){
fileSystem.delete(outpath, true);
}
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

2.第二题

编写 MapReduce 程序,实现以下功能:三级分类 category3_id 范围为[1,10],1 表示最低级别,10 表示最高 级别。本任务遍历 sku_info.csv 中数据,统计字段“三级 分类”级别为“10”最高级别的商品数量,将结果截图粘贴 至客户端桌面【M2-T3-SUBT3-提交结果 1.docx】中对应的任 务序号下。

package org.January;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

public class task11 {

// 定义一个 taskMapper 继承 Mapper
public static class taskMapper extends Mapper<Object, Text, Text, IntWritable>{
// 定义一个常量
private static final IntWritable one = new IntWritable(1);
// 定义一个 Text
private Text status = new Text();
// 定义一个 map 用来接收
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将value转换成字符串逗号分割得出数组
String[] line = value.toString().split(",");
// 判断数组长度
if (line.length >= 10){
// 获取 category3_id 字段
int category = Integer.parseInt(line[7]);
if(category == 10){
context.write(status, one);
}
}
}
}

// 定义一个Reduce 继承 Reducer
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
// 定义一个 IntWritable 存储count
private IntWritable result = new IntWritable();
// 定义一个 reduce 用来接收
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个 count
int count = 0;
// 遍历 value 累加 count
for(IntWritable value : values){
count += value.get();
}
result.set(count);
context.write(key, result);
}
}

// 设置 输入输出累加
public static void main(String [] args) throws Exception{
// 设置 配置对象
Configuration conf = new Configuration();
// 设置Job类
Job job = Job.getInstance(conf, "task11");
// 设置运行类
job.setJarByClass(task11.class);
// 设置 Map类
job.setMapperClass(taskMapper.class);
// 设置 Reduce 类
job.setReducerClass(Reduce.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(IntWritable.class);
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out