P3

时间: 2020-09-16|24次围观|0 条评论

P3插图

  1 package p3.hadoop.mapred;  2   3 import java.io.IOException;  4 import java.io.InputStream;  5 import org.apache.hadoop.conf.Configuration;  6 import org.apache.hadoop.io.BytesWritable;  7 import p3.common.lib.BinaryUtils;  8 import p3.common.lib.Bytes;  9  10 public class PcapLineReader 11 { 12   private static final int DEFAULT_BUFFER_SIZE = 2048; 13   private int bufferSize; 14   private static final int PCAP_FILE_HEADER_LENGTH = 24; 15   private static final int PCAP_PACKET_HEADER_LENGTH = 16; 16   private static final int PCAP_PACKET_HEADER_CAPLEN_POS = 8; 17   private static final int PCAP_PACKET_HEADER_WIREDLEN_POS = 12; 18   private static final int PCAP_PACKET_HEADER_CAPLEN_LEN = 4; 19   private static final int PCAP_PACKET_HEADER_TIMESTAMP_LEN = 4; 20   private static final int PCAP_PACKET_MIN_LEN = 53; 21   private static final int PCAP_PACKET_MAX_LEN = 1519; 22   private static final int MAGIC_NUMBER = -725372255; 23   private static final int MIN_PKT_SIZE = 42; 24   private long min_captime; 25   private long max_captime; 26   private InputStream in; 27   private byte[] buffer; 28   byte[] pcap_header; 29   private int bufferLength; 30   int consumed; 31  32   public PcapLineReader(InputStream in, int bufferSize, long min_captime, long max_captime) 33   { 34     this.bufferSize = 2048; 35  36     this.bufferLength = 0; 37     this.consumed = 0; 38  39     this.in = in; 40     this.bufferSize = bufferSize; 41     this.buffer = new byte[this.bufferSize]; 42     this.min_captime = min_captime; 43     this.max_captime = max_captime; 44   } 45  46   public PcapLineReader(InputStream in, Configuration conf) 47     throws IOException 48   { 49     this(in, 2048,  50       conf.getLong("pcap.file.captime.min", 1309412600L),  51       conf.getLong("pcap.file.captime.max", conf.getLong("pcap.file.captime.max", 1309412600L) + 172800L)); 52   } 53  54   public void close() 55     throws IOException 56   { 57     this.in.close(); 58   } 59  60   int skipPartialRecord(int fraction) 61     throws IOException 62   { 63     int pos = 0; 64     byte[] captured = new byte[fraction]; 65     byte[] tmpTimestamp1 = new byte[4]; 66     byte[] tmpTimestamp2 = new byte[4]; 67     byte[] tmpCapturedLen1 = new byte[4]; 68     byte[] tmpWiredLen1 = new byte[4]; 69     byte[] tmpCapturedLen2 = new byte[4]; 70     byte[] tmpWiredLen2 = new byte[4]; 71     int caplen1 = 0; 72     int wiredlen1 = 0; 73     int caplen2 = 0; 74     int wiredlen2 = 0; 75     long timestamp2 = 0L; 76  77     int size = 0; 78     long endureTime = 100L; 79  80     if ((size = this.in.read(captured)) < 42) return 0; 81  82     do 83     { 84       if ((size - pos < 32) || (size - pos < 53)) { 85         pos = size; 86         break; 87       } 88  89       System.arraycopy(captured, pos, tmpTimestamp1, 0, 4); 90       long timestamp1 = Bytes.toLong(BinaryUtils.flipBO(tmpTimestamp1, 4)); 91  92       System.arraycopy(captured, pos + 8, tmpCapturedLen1, 0, 4); 93       caplen1 = Bytes.toInt(BinaryUtils.flipBO(tmpCapturedLen1, 4)); 94  95       System.arraycopy(captured, pos + 12, tmpWiredLen1, 0, 4); 96       wiredlen1 = Bytes.toInt(BinaryUtils.flipBO(tmpWiredLen1, 4)); 97  98       if ((caplen1 > 53) && (caplen1 < 1519) && (size - pos - 32 - caplen1 > 0)) 99       {100         System.arraycopy(captured, pos + 16 + caplen1 + 8, tmpCapturedLen2, 0, 4);101         caplen2 = Bytes.toInt(BinaryUtils.flipBO(tmpCapturedLen2, 4));102 103         System.arraycopy(captured, pos + 16 + caplen1 + 12, tmpWiredLen2, 0, 4);104         wiredlen2 = Bytes.toInt(BinaryUtils.flipBO(tmpWiredLen2, 4));105 106         System.arraycopy(captured, pos + 16 + caplen1, tmpTimestamp2, 0, 4);107         timestamp2 = Bytes.toLong(BinaryUtils.flipBO(tmpTimestamp2, 4));108 109         if ((timestamp1 >= this.min_captime) && (timestamp1 < this.max_captime) && (this.min_captime <= timestamp2) && (timestamp2 < this.max_captime) && 110           (wiredlen1 > 53) && (wiredlen1 < 1519) && (wiredlen2 > 53) && (wiredlen2 < 1519) && 111           (caplen1 > 0) && (caplen1 <= wiredlen1) && (caplen2 > 0) && (caplen2 <= wiredlen2) && 112           (timestamp2 >= timestamp1) && (timestamp2 - timestamp1 < endureTime)) {113           return pos;114         }115 116       }117 118       ++pos;119     }120     while (pos < size);121 122     return pos;123   }124 125   int readPacket(int packetLen)126     throws IOException127   {128     int bufferPosn = 16;129     byte[] tmp_buffer = new byte[packetLen];130 131     if ((this.bufferLength = this.in.read(tmp_buffer)) < packetLen) {132       System.arraycopy(tmp_buffer, 0, this.buffer, bufferPosn, this.bufferLength);133       bufferPosn += this.bufferLength;134 135       byte[] newpacket = new byte[packetLen - this.bufferLength];136 137       if ((this.bufferLength = this.in.read(newpacket)) < 0) return bufferPosn;138       System.arraycopy(newpacket, 0, this.buffer, bufferPosn, this.bufferLength);139     }140     else141     {142       System.arraycopy(tmp_buffer, 0, this.buffer, bufferPosn, this.bufferLength);143     }144     bufferPosn += this.bufferLength;145 146     return bufferPosn;147   }148 149   int readPacketHeader()150   {151     int headerLength = 0;152     int headerPosn = 0;153     this.pcap_header = new byte[16];154 155     byte[] tmp_header = new byte[16];156     BytesWritable capturedLen = new BytesWritable();157     try158     {159       if ((headerLength = this.in.read(this.pcap_header)) < 16)160       {161         if (headerLength == -1) return 0;162         headerPosn += headerLength;163 164         byte[] newheader = new byte[16 - headerLength];165 166         if ((headerLength = this.in.read(newheader)) < 0) {167           this.consumed = headerPosn;168           return -1;169         }170         System.arraycopy(newheader, 0, this.pcap_header, headerPosn, headerLength);171       }172       capturedLen.set(this.pcap_header, 8, 4);173       System.arraycopy(this.pcap_header, 0, this.buffer, 0, 16);174       headerPosn = 0;175     }176     catch (IOException e)177     {178       e.printStackTrace();179     }180     return Bytes.toInt(BinaryUtils.flipBO(capturedLen.getBytes(), 4));181   }182 183   public int readFileHeader()184   {185     try {186       byte[] magic = new byte[4];187       this.bufferLength = this.in.read(this.buffer, 0, 24);188       System.arraycopy(this.buffer, 0, magic, 0, magic.length);189 190       if (Bytes.toInt(magic) == -725372255) break label50;191       return 0;192     }193     catch (IOException e) {194       e.printStackTrace();195     }196     label50: return this.bufferLength;197   }198 199   public int readLine(BytesWritable bytes, int maxLineLength, int maxBytesToConsume)200     throws IOException201   {202     bytes.set(new BytesWritable());203     boolean hitEndOfFile = false;204     long bytesConsumed = 0L;205 206     int caplen = readPacketHeader();207 208     if (caplen == 0) {209       bytesConsumed = 0L;210     } else if (caplen == -1) {211       bytesConsumed += this.consumed;212     }213     else if ((caplen > 0) && (caplen < 1519)) {214       if ((this.bufferLength = readPacket(caplen)) < caplen + 16) {215         hitEndOfFile = true;216       }217       bytesConsumed += this.bufferLength;218 219       if (!(hitEndOfFile)) {220         bytes.set(this.buffer, 0, caplen + 16);221       }222     }223 224     return (int)Math.min(bytesConsumed, 2147483647L);225   }226 227   public int readLine(BytesWritable str, int maxLineLength)228     throws IOException229   {230     return readLine(str, maxLineLength, 2147483647);231   }232 233   public int readLine(BytesWritable str)234     throws IOException235   {236     return readLine(str, 2147483647, 2147483647);237   }238 }

View Code

 

// Decompiled by Jad v1.5.8e2. Copyright 2001 Pavel Kouznetsov.// Jad home page: http://kpdus.tripod.com/jad.html// Decompiler options: packimports(3) fieldsfirst ansi space // Source File Name:   PcapVlenRecordReader.javapackage p3.hadoop.mapred;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.RecordReader;// Referenced classes of package p3.hadoop.mapred://PcapLineReaderpublic class PcapVlenRecordReaderimplements RecordReader{private CompressionCodecFactory compressionCodecs;private long start;private long pos;private long end;private PcapLineReader in;int maxLineLength;private boolean fileheader_skip;public PcapVlenRecordReader(Configuration job, FileSplit split)throws IOException{compressionCodecs = null;fileheader_skip = true;maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 0x7fffffff);fileheader_skip = job.getBoolean("pcap.file.header.skip", true);start = split.getStart();end = start + split.getLength();Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFileHeader = false;boolean skipPartialRecord = false;int fraction = 4000;if (codec != null){in = new PcapLineReader(codec.createInputStream(fileIn), job);end = 0x7fffffffffffffffL;skipFileHeader = true;} else{if (start == 0L){skipFileHeader = true;} else{skipPartialRecord = true;fileIn.seek(start);}in = new PcapLineReader(fileIn, job);}if (skipFileHeader)start += in.readFileHeader();if (skipPartialRecord){int skip;for (skip = in.skipPartialRecord(fraction); skip == fraction; skip = in.skipPartialRecord(fraction))start += skip;start += skip;fileIn.seek(start);in = new PcapLineReader(fileIn, job);}pos = start;}public LongWritable createKey(){return new LongWritable();}public BytesWritable createValue(){return new BytesWritable();}public synchronized boolean next(LongWritable key, BytesWritable value)throws IOException{while (pos < end) {key.set(pos);int newSize = in.readLine(value, maxLineLength, Math.max((int)Math.min(0x7fffffffL, end - pos), maxLineLength));if (newSize == 0){pos = end;return false;}pos += newSize;if (newSize < maxLineLength)return true;}return false;}public float getProgress(){if (start == end)return 0.0F;elsereturn Math.min(1.0F, (float)(pos - start) / (float)(end - start));}public synchronized long getPos()throws IOException{return pos;}public synchronized void close()throws IOException{if (in != null)in.close();}public volatile boolean next(Object obj, Object obj1)throws IOException{return next((LongWritable)obj, (BytesWritable)obj1);}public volatile Object createValue(){return createValue();}public volatile Object createKey(){return createKey();}}

  

文章转载于:https://www.cnblogs.com/kxdblog/p/4233101.html

原著是一个有趣的人,若有侵权,请通知删除

本博客所有文章如无特别注明均为原创。
复制或转载请以超链接形式注明转自起风了,原文地址《P3
   

还没有人抢沙发呢~