第七套题:

一、Python部分

1、数据处理

(1)查看 train.csv 中数据总数、标准差、均值、最 小值、四分之一分位数、二分之一分位数、四分之三分位 数和最大值; 将上述代码截图复制粘贴至客户端桌面 【M2-T1-SUBT2-提交结果 1.docx】中对应的任务序号下。

import pandas as pd

data = pd.read_csv('./文本/train.csv', encoding='gbk')
# print('打印导入的数据:', data)

# 选择数值型的列
numeric_data = data.select_dtypes(include=['int64', 'float64'])

# 总数
data_1 = len(numeric_data)
print('总数:',data_1)
# 标准差
data_2 = numeric_data.std()
print('标准差:',data_2)
# 均值
data_3 = numeric_data.mean()
print('均值', data_3)
# 最小值
data_4 = numeric_data.min()
print('最小值', data_4)
# 计算四分之一位数
data_5 = numeric_data.quantile(0.25)
print('计算四分之一位数', data_5)
# 计算二分之一位数(中位数)
data_6 = numeric_data.quantile(0.5)
print('计算二分之一位数(中位数)', data_6)
# 计算四分之三分位数
data_7 = numeric_data.quantile(0.75)
print('计算二分之一位数(中位数)', data_7)
# 计算最大值
data_8 = numeric_data.max()
print('最大值', data_8)

(2)缺失值处理:

①对于 job 列数据,采用‘admin.’填充缺失值;

import pandas as pd

data = pd.read_csv('./文本/train.csv', encoding='gbk')

# 判断isnull是否有缺失值
# 检查缺失值
data_1 = data['job'].isnull()
print(data_1)

# 填充缺失值
data['job'].fillna('admin.', inplace=True)

print(data)

②对于 marital 列数据,如果年龄(age)小于 30 采 用’single’,如果大于 50 采用’divorced’代替,其他采用 ‘marital’代替;

import pandas as pd

df = pd.read_csv('./文本/train.csv', encoding='gbk')
# 检查缺失值
missing_values = df['marital'].isnull()

# 根据条件填充缺失值
df.loc[df['age'] < 30, 'marital'] = 'single'
df.loc[df['age'] > 50, 'marital'] = 'divorced'
df.loc[df['age'].isnull(), 'marital'] = 'marital'

print(df.head(5))
print(df[['age','marital']].head(10))

# df.to_csv('data_1.csv')


③将教育类型 basic.9y,basic.6y,basic.4y unknown 均变为 Basic;

import pandas as pd

df = pd.read_csv('./文本/train.csv', encoding='gbk')

# 检查教育类型的唯一值
unique_values = df['education'].unique()
print(unique_values)

# 使用replace()函数替换值。在pandas中,你可以使用replace()函数来替换列中的特定值。将需要替换的值作为参数传递给replace()函数,并指定替换后的新值。
# 替换教育类型的值
df['education'].replace(['basic.9y', 'basic.6y', 'basic.4y'], 'Basic', inplace=True)

# 打印替换后的数据
print(df.head(5))
print(df['education'].head(10))
df.to_csv('data_2.csv', index=False)

④对于 housing 列数据,如果信用卡是有违约,即 default 为 yes,则用 yes 代替,否则用 no 代替;

import pandas as pd

# 读取 train.csv 文件
data = pd.read_csv('./文本/train.csv')


# 根据 default 列的值替换 housing 列的数据
data['housing'] = data['housing'].where(data['default'] != 'yes', 'yes').where(data['default'] == 'yes', 'no')

# 保存修改后的数据到 train_c2.csv 文件
data.to_csv('train_c2.csv', index=False)
print(data)

⑤对于 loan 列数据,如果有房贷,即 housing 为 yes, 则用 yes 代替,否则用 no 代替。 所有缺失值处理完后,存入 train_c1.csv 中

import pandas as pd

# 读取 train.csv 文件
data = pd.read_csv('./文本/train.csv')


# 根据 housing 列的值替换 loan 列的数据
data['loan'] = data['loan'].where(data['housing'] != 'yes', 'yes').where(data['housing'] == 'yes', 'no')

# 处理缺失值
data.fillna({'loan': 'no'}, inplace=True)

# 保存处理后的数据到 train_c1.csv 文件
data.to_csv('train_c1.csv', index=False)
print(data)

将上述①-⑤任务的代码截图及结果截图复制粘贴至客 户端桌面【M2-T1-SUBT2-提交结果2.docx】中对应的任务序 号下。

(3)查看 train.csv 中的数字特征,对数字特征进行 描述性统计,并采用四分位数法进行数据清洗以减少噪声 数据的影响,然后存入 train_c2.csv 中;

import pandas as pd

# 读取 train.csv 文件
data = pd.read_csv('./文本/train.csv')

# 查看数字特征的描述性统计
numeric_features = data.select_dtypes(include=['int64', 'float64'])
numeric_stats = numeric_features.describe()

# 使用四分位数法进行数据清洗
Q1 = numeric_features.quantile(0.25)
Q3 = numeric_features.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
cleaned_data = numeric_features[(numeric_features >= lower_bound) & (numeric_features <= upper_bound)]

# 将清洗后的数据保存到 train_c2.csv 文件中
cleaned_data.to_csv('train_c2.csv', index=False)

print(len(data) - len(cleaned_data))

(4)对 train.csv 中的非数字特征进行 LabelEncoder 编码并存入 train_c3.csv 中。 将(3)-(4)小题的代码截图复制粘贴至客户端桌面 【M2-T1-SUBT2-提交结果3.docx】中对应的任务序号下。

2、数据标注

对上述train_c3.csv数据进行标注,判断客户是否会购 买银行的产品,具体的标注规则如下:

(1)如果“subscribe”列数据为1,则数据标注为‘yes’;

(2)如果“subscribe”列数据为0,则数据标注为‘no’; 标注好的数据存储为列‘subscribe’并和train.csv数 据合并存入result.csv。 将代码截图复制粘贴至客户端桌面【M2-T2-SUBT1-提交 结果1.docx】中对应的任务序号下。

map和replace用法 介绍
map map方法适用于Series对象,可以通过提供一个字典或函数来将Series中的值映射为其他值。例如,你可以使用map方法将Series中的1映射为’yes’,将0映射为’no’
replace replace方法适用于Series或DataFrame对象,可以通过提供一个字典或值的替换规则来替换数据。例如,你可以使用replace方法将Series或DataFrame中的1替换为’yes’,将0替换为’no’
import pandas as pd

# 读取 train.csv 文件
train_data = pd.read_csv('./文本/train.csv')
print(train_data.info())

# 根据 subscribe 列的值标注数据
train_data['subscribe'] = train_data['subscribe'].map({1: 'yes', 0: 'no'})

# 合并数据
result_data = pd.concat([train_data['subscribe'], train_data], axis=1)

# 保存结果到 result.csv 文件
result_data.to_csv('result.csv', index=False)

二:数据统计

hdfs操作步骤

1.子任务一:HDFS 文件上传下载 本任务需要使用 Hadoop,HDFS 命令,已安装 Hadoop 及 需要配置前置环境,具体要求如下:

(1)在 Master 中的/root/目录下新建一个文件 夹:result,将创建文件夹命令与结果截图粘贴至客户端桌 面【M2-T3-SUBT1-提交结果 1.docx】中对应的任务序号下;

mkdir /root/result

(2)使用 HDFS 命令,将 Master 下:/root 目录下新建 的文件夹:result 上传到 HDFS 指定目录下:/根目录下;并 且使用 HDFS 命令查看目录;将 HDFS 上传,查看命令截图粘 贴至客户端桌面【M2-T3-SUBT1-提交结果 2.docx】中对应的 任务序号下;

hdfs dfs -put /root/result /

(3)使用 HDFS 命令,将 HDFS 目录下的/result 文件夹 下载到 Master 指定目录下:根目录下(他这里没有提供,所以我就将他放到root下);将下载文件夹命令与 结果截图粘贴至客户端桌面【M2-T3-SUBT1-提交结果 3.docx】 中对应的任务序号下。

hdfs dfs -get /result /root/

修改mapred-site.xml 文件

1.cd到hadoop下

cd usr/local/src/hadoop-3.1.3/etc/hadoop/

2.vi编辑一下mapred-site.xml

​ 将下面的复制到mapred-site.xml 里面

​ /usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value>
</property>

3.重启一下hadoop

stop-all.sh

4.将mapred-site.xml 拷贝到两台机子

​ /usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录

# slave1
scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/

# slave2
scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/

5.最后启动一下hadoop

start-all.sh

要是运行出错可能Hadoop没有启动完,稍微等一下就好了

1.第一题

编写 MapReduce 程序,实现以下功能:清除日志中字 段长度比 11 小的日志记录,输出文件到 HDFS;在控制台按 顺序打印输出前 20 条数据,将结果截图粘贴至客户端桌面 【M2-T3-SUBT2-提交结果 1.docx】中对应的任务序号下。

package org.January;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class task08 {
// 定义一个taskMapper
public static class taskMapper extends Mapper<Object, Text, Text, Text>{
// 定义一个Text
private Text status = new Text();
// 定义一个map

@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 将value转换成字符串
String line = value.toString();
// 以逗号分割得出数组
String[] fields = line.split(",");
// 判断数组长度
if(fields.length >= 11){
// 输出
context.write(new Text(line), status);
}
}
}
// 定义一个 Reduce 继承 Reducer
public static class Reduce extends Reducer<Text, Text, Text, Text>{
// 定义一个 reduce 接收

@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
for (Text value : values){
context.write(key, value);
}
}
}

// 设置输入输出路径
public static void main(String[] args) throws Exception{
// 设置 配置对象
Configuration conf = new Configuration();
// 设置 Job类
Job job = Job.getInstance(conf, "task08");
// 设置运行类
job.setJarByClass(task08.class);
// 设置map类
job.setMapperClass(taskMapper.class);
// 设置 Reduce类
job.setReducerClass(Reduce.class);
// 设置 key 类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(Text.class);
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


2.第二题

编写 MapReduce 程序,实现以下功能:对于 gender 这 一字段统计电商消费人数男女数量,在控制台输出男女各多 少人,将结果截图粘贴至客户端桌面【M2-T3-SUBT3-提交结 果 1.docx】中对应的任务序号下。

package org.January;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

public class task09 {
// 定义一个taskMapper 继承Mapper
public static class taskMapper extends Mapper<Object, Text,Text, IntWritable>{
// 定义一个常量
private static final IntWritable one = new IntWritable(1);
// 定义一个 Text
private Text status = new Text();
// 定义一个map接收

@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 将value转换成字符串
String line = value.toString();
// 以逗号分割得出数组
String[] fields = line.split(",");
// 判断数组长度
if(fields.length <= 13){
// 获取gender字段
String genders = fields[10];
// 设置status为genders
status.set(genders);
// 输出键对值
context.write(status, one);
}
}
}

// 定义一个Reduce 继承 Reducer
public static class Reduce extends Reducer<Text, IntWritable,Text, IntWritable>{
// 定义一个Treemap
private TreeMap<Integer, String> result = new TreeMap<>();
// 定义一个 reduce 继承

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 定义一个sum为0
int sum = 0;
// 遍历value 累加 sum
for (IntWritable value : values){
sum += value.get();
}
// 设置 result 为sum
result.put(sum, key.toString());
}
// 定义一个cleanup

@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 在定义一个 Treemap
for (Map.Entry<Integer, String> entry : result.descendingMap().entrySet()){
// 获取key计数值
int count = entry.getKey();
// 获取value字符串值
String fields = entry.getValue();
// 输出
context.write(new Text(fields), new IntWritable(count));
}
}
}

// 设置输入输出路径
public static void main(String[] args) throws Exception{
// 设置配置对象
Configuration conf = new Configuration();
// 设置 Job类
Job job = Job.getInstance(conf,"task09");
// 设置 运行类
job.setJarByClass(task09.class);
// 设置 Map 类
job.setMapperClass(taskMapper.class);
// 设置 Reduce 类
job.setReducerClass(Reduce.class);
// 设置 key类
job.setOutputKeyClass(Text.class);
// 设置 value 类
job.setOutputValueClass(IntWritable.class);
// 设置 输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

最后一步运行JAR环境包

hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名

# 完整的语句
hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out