自定义 MR 实现如下逻辑
product_no lac_id moment start_time user_id county_id staytime city_id
13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571
字段解释:
product_no:用户手机号;
lac_id:用户所在基站;
start_time:用户在此基站的开始时间;
staytime:用户在此基站的逗留时间。
需求描述:
根据 lac_id 和 start_time 知道用户当时的位置,根据 staytime 知道用户各个基站的逗留时长。根据轨迹合
并连续基站的 staytime。最终得到每一个用户按时间排序在每一个基站驻留时长。
期望输出举例:
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
分析上面的结果:
第一列升序,第四列时间降序。因此,首先需要将这两列抽取出来,然后自定义排序。
实现如下:
package FindFriend;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StringComp2 {
final static String INPUT_PATH = "hdfs://master:8020/liguodong/test2";
final static String OUT_PATH = "hdfs://master:8020/liguodong/test2out";
public static void main(String[] args) throws IOException,
URISyntaxException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH),true);
}
Job job = Job.getInstance(conf, "date sort");
job.setJarByClass(StringComp2.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NewK2.class);
job.setMapOutputValueClass(Text.class);
//job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUT_PATH));
System.exit(job.waitForCompletion(true)?0:1);
}
static class MyMapper extends Mapper<LongWritable, Text, NewK2, Text>{
@Override
protected void map(LongWritable k1, Text v1,
Context context)
throws IOException, InterruptedException {
//这里采用正则表达式抽取出了product_no 与 start_time列的数据。
Pattern pattern = Pattern.compile
("([\\d]{11})|([\\d]{4}-[\\d]{2}-[\\d]{2} [\\d]{2}:[\\d]{2}:[\\d]{2}.[\\d]{9})");
Matcher matcher = pattern.matcher(v1.toString());
matcher.find();
String str1= matcher.group();
matcher.find();
String str2= matcher.group();
final NewK2 k2 = new NewK2(str1, str2);
//System.err.println(stringBuilder);
context.write(k2, v1);
}
}
static class MyReducer extends Reducer<NewK2, Text, Text, NullWritable>{
@Override
protected void reduce(NewK2 k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(v2,NullWritable.get());
}
}
}
static class NewK2 implements WritableComparable<NewK2>{
String first;
String second;
public NewK2(){}
public NewK2(String first, String second){
this.first = first;
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeUTF(second);
}
/** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列降序 */
@Override
public int compareTo(NewK2 o) {
final int minus = compTo(this.first,o.first);
if(minus != 0){
return minus;
}
return -compTo(this.second,o.second);
}
//仿照JDK源码String类的compareTo方法进行实现,
//我发现直接使用String类的compareTo方法,并不能得到我想要的结果(第一列升序,第二列降序)。
public int compTo(String one,String another) {
int len = one.length();
char[] v1 = one.toCharArray();
char[] v2 = another.toCharArray();
int k = 0;
while (k < len) {
char c1 = v1[k];
char c2 = v2[k];
if (c1 != c2) {
return c1 - c2;
}
k++;
}
return 0;
}
@Override
public int hashCode() {
return this.first.hashCode()+this.second.hashCode();
}
@Override
public boolean equals(Object obj) {
if(!(obj instanceof NewK2)){
return false;
}
NewK2 oK2 = (NewK2)obj;
return (this.first==oK2.first)&&(this.second==oK2.second);
}
}
}
运行结果:
原文链接:https://blog.csdn.net/scgaliguodong123_/article/details/46010947
本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。
还没有人抢沙发呢~