
1 package p3.hadoop.mapred; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.io.BytesWritable; 7 import p3.common.lib.BinaryUtils; 8 import p3.common.lib.Bytes; 9 10 public class PcapLineReader 11 { 12 private static final int DEFAULT_BUFFER_SIZE = 2048; 13 private int bufferSize; 14 private static final int PCAP_FILE_HEADER_LENGTH = 24; 15 private static final int PCAP_PACKET_HEADER_LENGTH = 16; 16 private static final int PCAP_PACKET_HEADER_CAPLEN_POS = 8; 17 private static final int PCAP_PACKET_HEADER_WIREDLEN_POS = 12; 18 private static final int PCAP_PACKET_HEADER_CAPLEN_LEN = 4; 19 private static final int PCAP_PACKET_HEADER_TIMESTAMP_LEN = 4; 20 private static final int PCAP_PACKET_MIN_LEN = 53; 21 private static final int PCAP_PACKET_MAX_LEN = 1519; 22 private static final int MAGIC_NUMBER = -725372255; 23 private static final int MIN_PKT_SIZE = 42; 24 private long min_captime; 25 private long max_captime; 26 private InputStream in; 27 private byte[] buffer; 28 byte[] pcap_header; 29 private int bufferLength; 30 int consumed; 31 32 public PcapLineReader(InputStream in, int bufferSize, long min_captime, long max_captime) 33 { 34 this.bufferSize = 2048; 35 36 this.bufferLength = 0; 37 this.consumed = 0; 38 39 this.in = in; 40 this.bufferSize = bufferSize; 41 this.buffer = new byte[this.bufferSize]; 42 this.min_captime = min_captime; 43 this.max_captime = max_captime; 44 } 45 46 public PcapLineReader(InputStream in, Configuration conf) 47 throws IOException 48 { 49 this(in, 2048, 50 conf.getLong("pcap.file.captime.min", 1309412600L), 51 conf.getLong("pcap.file.captime.max", conf.getLong("pcap.file.captime.max", 1309412600L) + 172800L)); 52 } 53 54 public void close() 55 throws IOException 56 { 57 this.in.close(); 58 } 59 60 int skipPartialRecord(int fraction) 61 throws IOException 62 { 63 int pos = 0; 64 byte[] captured = new byte[fraction]; 65 byte[] tmpTimestamp1 = new byte[4]; 66 byte[] tmpTimestamp2 = new byte[4]; 67 byte[] tmpCapturedLen1 = new byte[4]; 68 byte[] tmpWiredLen1 = new byte[4]; 69 byte[] tmpCapturedLen2 = new byte[4]; 70 byte[] tmpWiredLen2 = new byte[4]; 71 int caplen1 = 0; 72 int wiredlen1 = 0; 73 int caplen2 = 0; 74 int wiredlen2 = 0; 75 long timestamp2 = 0L; 76 77 int size = 0; 78 long endureTime = 100L; 79 80 if ((size = this.in.read(captured)) < 42) return 0; 81 82 do 83 { 84 if ((size - pos < 32) || (size - pos < 53)) { 85 pos = size; 86 break; 87 } 88 89 System.arraycopy(captured, pos, tmpTimestamp1, 0, 4); 90 long timestamp1 = Bytes.toLong(BinaryUtils.flipBO(tmpTimestamp1, 4)); 91 92 System.arraycopy(captured, pos + 8, tmpCapturedLen1, 0, 4); 93 caplen1 = Bytes.toInt(BinaryUtils.flipBO(tmpCapturedLen1, 4)); 94 95 System.arraycopy(captured, pos + 12, tmpWiredLen1, 0, 4); 96 wiredlen1 = Bytes.toInt(BinaryUtils.flipBO(tmpWiredLen1, 4)); 97 98 if ((caplen1 > 53) && (caplen1 < 1519) && (size - pos - 32 - caplen1 > 0)) 99 {100 System.arraycopy(captured, pos + 16 + caplen1 + 8, tmpCapturedLen2, 0, 4);101 caplen2 = Bytes.toInt(BinaryUtils.flipBO(tmpCapturedLen2, 4));102 103 System.arraycopy(captured, pos + 16 + caplen1 + 12, tmpWiredLen2, 0, 4);104 wiredlen2 = Bytes.toInt(BinaryUtils.flipBO(tmpWiredLen2, 4));105 106 System.arraycopy(captured, pos + 16 + caplen1, tmpTimestamp2, 0, 4);107 timestamp2 = Bytes.toLong(BinaryUtils.flipBO(tmpTimestamp2, 4));108 109 if ((timestamp1 >= this.min_captime) && (timestamp1 < this.max_captime) && (this.min_captime <= timestamp2) && (timestamp2 < this.max_captime) && 110 (wiredlen1 > 53) && (wiredlen1 < 1519) && (wiredlen2 > 53) && (wiredlen2 < 1519) && 111 (caplen1 > 0) && (caplen1 <= wiredlen1) && (caplen2 > 0) && (caplen2 <= wiredlen2) && 112 (timestamp2 >= timestamp1) && (timestamp2 - timestamp1 < endureTime)) {113 return pos;114 }115 116 }117 118 ++pos;119 }120 while (pos < size);121 122 return pos;123 }124 125 int readPacket(int packetLen)126 throws IOException127 {128 int bufferPosn = 16;129 byte[] tmp_buffer = new byte[packetLen];130 131 if ((this.bufferLength = this.in.read(tmp_buffer)) < packetLen) {132 System.arraycopy(tmp_buffer, 0, this.buffer, bufferPosn, this.bufferLength);133 bufferPosn += this.bufferLength;134 135 byte[] newpacket = new byte[packetLen - this.bufferLength];136 137 if ((this.bufferLength = this.in.read(newpacket)) < 0) return bufferPosn;138 System.arraycopy(newpacket, 0, this.buffer, bufferPosn, this.bufferLength);139 }140 else141 {142 System.arraycopy(tmp_buffer, 0, this.buffer, bufferPosn, this.bufferLength);143 }144 bufferPosn += this.bufferLength;145 146 return bufferPosn;147 }148 149 int readPacketHeader()150 {151 int headerLength = 0;152 int headerPosn = 0;153 this.pcap_header = new byte[16];154 155 byte[] tmp_header = new byte[16];156 BytesWritable capturedLen = new BytesWritable();157 try158 {159 if ((headerLength = this.in.read(this.pcap_header)) < 16)160 {161 if (headerLength == -1) return 0;162 headerPosn += headerLength;163 164 byte[] newheader = new byte[16 - headerLength];165 166 if ((headerLength = this.in.read(newheader)) < 0) {167 this.consumed = headerPosn;168 return -1;169 }170 System.arraycopy(newheader, 0, this.pcap_header, headerPosn, headerLength);171 }172 capturedLen.set(this.pcap_header, 8, 4);173 System.arraycopy(this.pcap_header, 0, this.buffer, 0, 16);174 headerPosn = 0;175 }176 catch (IOException e)177 {178 e.printStackTrace();179 }180 return Bytes.toInt(BinaryUtils.flipBO(capturedLen.getBytes(), 4));181 }182 183 public int readFileHeader()184 {185 try {186 byte[] magic = new byte[4];187 this.bufferLength = this.in.read(this.buffer, 0, 24);188 System.arraycopy(this.buffer, 0, magic, 0, magic.length);189 190 if (Bytes.toInt(magic) == -725372255) break label50;191 return 0;192 }193 catch (IOException e) {194 e.printStackTrace();195 }196 label50: return this.bufferLength;197 }198 199 public int readLine(BytesWritable bytes, int maxLineLength, int maxBytesToConsume)200 throws IOException201 {202 bytes.set(new BytesWritable());203 boolean hitEndOfFile = false;204 long bytesConsumed = 0L;205 206 int caplen = readPacketHeader();207 208 if (caplen == 0) {209 bytesConsumed = 0L;210 } else if (caplen == -1) {211 bytesConsumed += this.consumed;212 }213 else if ((caplen > 0) && (caplen < 1519)) {214 if ((this.bufferLength = readPacket(caplen)) < caplen + 16) {215 hitEndOfFile = true;216 }217 bytesConsumed += this.bufferLength;218 219 if (!(hitEndOfFile)) {220 bytes.set(this.buffer, 0, caplen + 16);221 }222 }223 224 return (int)Math.min(bytesConsumed, 2147483647L);225 }226 227 public int readLine(BytesWritable str, int maxLineLength)228 throws IOException229 {230 return readLine(str, maxLineLength, 2147483647);231 }232 233 public int readLine(BytesWritable str)234 throws IOException235 {236 return readLine(str, 2147483647, 2147483647);237 }238 }
View Code
// Decompiled by Jad v1.5.8e2. Copyright 2001 Pavel Kouznetsov.// Jad home page: http://kpdus.tripod.com/jad.html// Decompiler options: packimports(3) fieldsfirst ansi space // Source File Name: PcapVlenRecordReader.javapackage p3.hadoop.mapred;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.RecordReader;// Referenced classes of package p3.hadoop.mapred://PcapLineReaderpublic class PcapVlenRecordReaderimplements RecordReader{private CompressionCodecFactory compressionCodecs;private long start;private long pos;private long end;private PcapLineReader in;int maxLineLength;private boolean fileheader_skip;public PcapVlenRecordReader(Configuration job, FileSplit split)throws IOException{compressionCodecs = null;fileheader_skip = true;maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 0x7fffffff);fileheader_skip = job.getBoolean("pcap.file.header.skip", true);start = split.getStart();end = start + split.getLength();Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFileHeader = false;boolean skipPartialRecord = false;int fraction = 4000;if (codec != null){in = new PcapLineReader(codec.createInputStream(fileIn), job);end = 0x7fffffffffffffffL;skipFileHeader = true;} else{if (start == 0L){skipFileHeader = true;} else{skipPartialRecord = true;fileIn.seek(start);}in = new PcapLineReader(fileIn, job);}if (skipFileHeader)start += in.readFileHeader();if (skipPartialRecord){int skip;for (skip = in.skipPartialRecord(fraction); skip == fraction; skip = in.skipPartialRecord(fraction))start += skip;start += skip;fileIn.seek(start);in = new PcapLineReader(fileIn, job);}pos = start;}public LongWritable createKey(){return new LongWritable();}public BytesWritable createValue(){return new BytesWritable();}public synchronized boolean next(LongWritable key, BytesWritable value)throws IOException{while (pos < end) {key.set(pos);int newSize = in.readLine(value, maxLineLength, Math.max((int)Math.min(0x7fffffffL, end - pos), maxLineLength));if (newSize == 0){pos = end;return false;}pos += newSize;if (newSize < maxLineLength)return true;}return false;}public float getProgress(){if (start == end)return 0.0F;elsereturn Math.min(1.0F, (float)(pos - start) / (float)(end - start));}public synchronized long getPos()throws IOException{return pos;}public synchronized void close()throws IOException{if (in != null)in.close();}public volatile boolean next(Object obj, Object obj1)throws IOException{return next((LongWritable)obj, (BytesWritable)obj1);}public volatile Object createValue(){return createValue();}public volatile Object createKey(){return createKey();}}
还没有人抢沙发呢~