第四套题:
一、Python部分
1、数据处理
1.删除shopping.csv中库存小于 10 或库存大于 10000 的数据,并存入shop1.csv;
import pandas as pd
df = pd.read_csv("../data/shopping.csv") print(df.info())
df_1 = df.drop(df[(df['库存'] < 10) | (df['库存'] > 10000)].index) print(df_1['库存']) df_1.to_csv("shop1.csv")
|
2.将涉及“刷单”、“捡漏”等字段的数据删除, 并存入shop2.csv;
import pandas as pd
df = pd.read_csv("../data/shopping.csv") print(df.info())
df_1 = df.drop(df[(df['名称'] == '刷单') | (df['名称'] == '捡漏')].index) print(df_1['名称']) df_1.to_csv("shop2.csv")
|
3.将商品中涉及“女装”字段的数据删除,并存入 shop3.csv;
import pandas as pd
df = pd.read_csv("../data/shopping.csv") print(df.info())
df_1 = df.drop(df[df['名称'] == '女装'].index) print(df_1)
|
4.将shopping.csv中手机价格为区间数据的,设置为 价格区间的平均数,存入shop4.csv。
import pandas as pd
df = pd.read_csv("../data/shopping.csv") print(df.info())
df['平均数'] = df['价格'].apply(lambda x : sum(map(float, x.split('-'))) //2 if '-' in x else float(x)) print(df['平均数'])
|
2、数据标注
import pandas as pd from snownlp import SnowNLP
data = pd.read_csv('./文本/model_comment.csv', encoding='utf-8')
def anzlyze(comment): sentiment = SnowNLP(comment).sentiments if sentiment >= 0.6: return '正向' elif sentiment >= 0.4 or sentiment <= 0.6: return '中性' elif sentiment <= 0.4: return '负向'
data['情感倾向'] = data['评论信息'].apply(anzlyze)
zheng = data[data['情感倾向'] == '正向'] zhong = data[data['情感倾向'] == '中性'] fu = data[data['情感倾向'] == '负向']
data = pd.concat([zheng, zhong, fu], ignore_index=True)
data.sort_values(by='情感倾向')
data.to_csv('model_sen.csv', index=False,encoding='utf-8')
print(data)
|
二、数据统计
hdfs操作
1.首先你要将你需要运行的文件拷贝到root下和hdfs上面
1.本地命令拷贝
scp -r 你存放文件的地方目录 root:你的机子ip地址:/root
# 完整的语句 scp -r user_info.csv [email protected]:/root
|
2、创建文件夹
1.在 HDFS 目录下新建目录/input
2.查看目录命令截图
3、下载 HDFS 新建目录/input,到本地容器 Master 指定目录/root/下
- -get是下载
- -put是上传
hdfs dfs -get /input /root/
|
查看一下root是否成功下载下来file2_1
修改mapred-site.xml 文件
1.cd到hadoop下
cd usr/local/src/hadoop-3.1.3/etc/hadoop/
|
2.vi编辑一下mapred-site.xml
将下面的复制到mapred-site.xml 里面
/usr/local/src/hadoop-3.1.3这是是你自己安装在哪个的文件目录
<property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=/usr/local/src/hadoop-3.1.3</value> </property>
|
3.重启一下hadoop
4.将mapred-site.xml 拷贝到两台机子
/usr/local/src/hadoop-3.1.3/etc/hadoop/ (这个是你自己的文件存放目录)
# slave1 scp mapred-site.xml slave1:/usr/local/src/hadoop-3.1.3/etc/hadoop/
# slave2 scp mapred-site.xml slave2:/usr/local/src/hadoop-3.1.3/etc/hadoop/
|
5.最后启动一下hadoop
要是运行出错可能Hadoop没有启动完,稍微等一下就好了
1.第一题
编写 MapReduce 程序,实现以下功能:清除数据中分 隔符混乱的,多于11个字段的数据,输出文件到HDFS;在 控制台按顺序打印输出前 10条数据
package org.example;
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class task08 {
public static class taskmapper extends Mapper<LongWritable, Text, Text, Text> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t");
if (fields.length <= 11) { context.write(new Text(line), new Text()); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { private int count = 0; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key, new Text()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "task08"); job.setJarByClass(task08.class); job.setMapperClass(taskmapper.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
}
|
2.第二题:
编写MapReduce程序,实现以下功能:根据 user_impression这一字段,统计买家对商家销售的手机商 品的印象,结果按照印象数降序排序,格式为: (user_impression,次数),如:(性价比高,10),结果保存 至HDFS,在控制台读取HDFS文件输出各组人数
package org.exercise1;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class task04 { public static class taskMapper extends Mapper<Object, Text,Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text status = new Text(); @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); if(fields.length >= 11){ String[] impressions = fields[6].split(" "); for (String impression : impressions){ status.set(impression); context.write(status,one); } } } }
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ private static IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values){ count += value.get(); } result.set(count); context.write(key,result); } }
public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"task04"); job.setJarByClass(taskMapper.class); job.setMapperClass(taskMapper.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
最后一步运行JAR环境包
hadoop jar jar包名 idea里面java类名 /csv文件 /随意起一个输出结果文件名
# 完整的语句 hadoop jar /root/file2_1.jar org.example.task01 /user_info.csv /file2_1_out
|