package net.ripe.hadoop.pcap;import java.io.DataInputStream;import java.io.IOException;import com.google.common.hash.Hashing;import net.ripe.hadoop.pcap.packet.HashPayloadPacket;import net.ripe.hadoop.pcap.packet.Packet;public class HashPayloadPcapReader extends PcapReader {public HashPayloadPcapReader(DataInputStream is) throws IOException {super(is);}@Overrideprotected Packet createPacket() {return new HashPayloadPacket();}@Overrideprotected boolean isReassemble() {return true;}@Overrideprotected boolean isPush() {return false;}@Overrideprotected void processPacketPayload(Packet packet, byte[] payload) {if (payload.length > 0) {packet.put(HashPayloadPacket.PAYLOAD_SHA1_HASH, Hashing.sha1().hashBytes(payload).toString());packet.put(HashPayloadPacket.PAYLOAD_SHA256_HASH, Hashing.sha256().hashBytes(payload).toString());packet.put(HashPayloadPacket.PAYLOAD_SHA512_HASH, Hashing.sha512().hashBytes(payload).toString());packet.put(HashPayloadPacket.PAYLOAD_MD5_HASH, Hashing.md5().hashBytes(payload).toString());}}}
HttpPcapReader
package net.ripe.hadoop.pcap;import java.io.ByteArrayInputStream;import java.io.DataInputStream;import java.io.IOException;import java.util.LinkedList;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.http.Header;import org.apache.http.HttpClientConnection;import org.apache.http.HttpException;import org.apache.http.HttpRequest;import org.apache.http.HttpRequestFactory;import org.apache.http.HttpResponse;import org.apache.http.HttpResponseFactory;import org.apache.http.impl.DefaultHttpRequestFactory;import org.apache.http.impl.DefaultHttpResponseFactory;import org.apache.http.impl.conn.DefaultClientConnection;import org.apache.http.impl.io.AbstractSessionInputBuffer;import org.apache.http.impl.io.AbstractSessionOutputBuffer;import org.apache.http.impl.io.DefaultHttpRequestParser;import org.apache.http.impl.io.DefaultHttpResponseParser;import org.apache.http.io.HttpMessageParser;import org.apache.http.io.SessionInputBuffer;import org.apache.http.io.SessionOutputBuffer;import org.apache.http.params.BasicHttpParams;import org.apache.http.params.HttpParams;import com.google.common.base.Joiner;import net.ripe.hadoop.pcap.packet.HttpPacket;import net.ripe.hadoop.pcap.packet.Packet;public class HttpPcapReader extends PcapReader{public static final Log LOG = LogFactory.getLog(HttpPcapReader.class);public static final int HTTP_PORT = 80;public static final String HEADER_PREFIX = "header_";private HttpParams params = new BasicHttpParams();private HttpRequestFactory reqFactory = new DefaultHttpRequestFactory();private HttpResponseFactory respFactory = new DefaultHttpResponseFactory();public HttpPcapReader(DataInputStream is) throws IOException {super(is);}@Overrideprotected Packet createPacket() {return new HttpPacket();}@Overrideprotected boolean isReassemble() {return true;}@Overrideprotected boolean isPush() {return false;}@Overrideprotected void processPacketPayload(Packet packet, final byte[] payload) {HttpPacket httpPacket = (HttpPacket)packet;Integer srcPort = (Integer)packet.get(Packet.SRC_PORT);Integer dstPort = (Integer)packet.get(Packet.DST_PORT);if ((HTTP_PORT == srcPort || HTTP_PORT == dstPort) && packet.containsKey(Packet.REASSEMBLED_FRAGMENTS) && PROTOCOL_TCP.equals(packet.get(Packet.PROTOCOL))) { final SessionInputBuffer inBuf = new AbstractSessionInputBuffer() { {init(new ByteArrayInputStream(payload), 1024, params);}@Overridepublic boolean isDataAvailable(int timeout) throws IOException {return true;} }; final SessionOutputBuffer outBuf = new AbstractSessionOutputBuffer() {}; if (HTTP_PORT == srcPort) { HttpMessageParser<HttpResponse> parser = new DefaultHttpResponseParser(inBuf, null, respFactory, params); HttpClientConnection conn = new DefaultClientConnection() { { init(inBuf, outBuf, params); }@Overrideprotected void assertNotOpen() {}@Overrideprotected void assertOpen() {} }; try { HttpResponse response = parser.parse(); conn.receiveResponseEntity(response); propagateHeaders(httpPacket, response.getAllHeaders());} catch (IOException e) {LOG.error("IOException when decoding HTTP response", e);} catch (HttpException e) {LOG.error("HttpException when decoding HTTP response", e);} } else if (HTTP_PORT == dstPort) { HttpMessageParser<HttpRequest> parser = new DefaultHttpRequestParser(inBuf, null, reqFactory, params); try { HttpRequest request = parser.parse(); propagateHeaders(httpPacket, request.getAllHeaders());} catch (IOException e) {LOG.error("IOException when decoding HTTP request", e);} catch (HttpException e) {LOG.error("HttpException when decoding HTTP request", e);} }}}private void propagateHeaders(HttpPacket packet, Header[] headers) {LinkedList<String> headerKeys = new LinkedList<String>();for (Header header : headers) {String headerKey = HEADER_PREFIX + header.getName().toLowerCase();packet.put(headerKey, header.getValue());}packet.put(HttpPacket.HTTP_HEADERS, Joiner.on(',').join(headerKeys));}}
DnsPcapReader
package net.ripe.hadoop.pcap;import java.io.DataInputStream;import java.io.IOException;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import net.ripe.hadoop.pcap.packet.DnsPacket;import net.ripe.hadoop.pcap.packet.Packet;import org.xbill.DNS.Header;import org.xbill.DNS.Message;import org.xbill.DNS.Opcode;import org.xbill.DNS.Rcode;import org.xbill.DNS.Record;import org.xbill.DNS.Section;import org.xbill.DNS.Flags;public class DnsPcapReader extends PcapReader {public static final int DNS_PORT = 53;public DnsPcapReader(DataInputStream is) throws IOException {super(is);}@Overrideprotected Packet createPacket() {return new DnsPacket();}@Overrideprotected boolean isReassemble() {return true;}@Overrideprotected boolean isPush() {return false;}@Overrideprotected void processPacketPayload(Packet packet, byte[] payload) {DnsPacket dnsPacket = (DnsPacket)packet;if (DNS_PORT == (Integer)packet.get(Packet.SRC_PORT) || DNS_PORT == (Integer)packet.get(Packet.DST_PORT)) {if (PROTOCOL_TCP.equals(packet.get(Packet.PROTOCOL)) && payload.length > 2) // TODO Support DNS responses with multiple messages (as used for XFRs)payload = Arrays.copyOfRange(payload, 2, payload.length); // First two bytes denote the size of the DNS message, ignore themtry {Message msg = new Message(payload);Header header = msg.getHeader();dnsPacket.put(DnsPacket.QUERYID, header.getID());dnsPacket.put(DnsPacket.FLAGS, header.printFlags());dnsPacket.put(DnsPacket.QR, header.getFlag(Flags.QR));dnsPacket.put(DnsPacket.OPCODE, Opcode.string(header.getOpcode()));dnsPacket.put(DnsPacket.RCODE, Rcode.string(header.getRcode()));dnsPacket.put(DnsPacket.QUESTION, convertRecordToString(msg.getQuestion()));dnsPacket.put(DnsPacket.QNAME, convertRecordOwnerToString(msg.getQuestion()));dnsPacket.put(DnsPacket.QTYPE, convertRecordTypeToInt(msg.getQuestion()));dnsPacket.put(DnsPacket.ANSWER, convertRecordsToStrings(msg.getSectionArray(Section.ANSWER)));dnsPacket.put(DnsPacket.AUTHORITY, convertRecordsToStrings(msg.getSectionArray(Section.AUTHORITY)));dnsPacket.put(DnsPacket.ADDITIONAL, convertRecordsToStrings(msg.getSectionArray(Section.ADDITIONAL)));} catch (Exception e) {// If we cannot decode a DNS packet we ignore it}}}private String convertRecordToString(Record record) {if (record == null)return null;String recordString = record.toString();recordString = normalizeRecordString(recordString);return recordString;}private String convertRecordOwnerToString(Record record) {if (record == null)return null;String ownerString = record.getName().toString();ownerString = ownerString.toLowerCase();return ownerString;}private int convertRecordTypeToInt(Record record) {if (record == null)return -1;return record.getType();}private List<String> convertRecordsToStrings(Record[] records) {if (records == null)return null;ArrayList<String> retVal = new ArrayList<String>(records.length);for (Record record : records)retVal.add(convertRecordToString(record));return retVal;}protected String normalizeRecordString(String recordString) {if (recordString == null)return null;// Reduce everything that is more than one whitespace to a single whitespacerecordString = recordString.replaceAll("\\s{2,}", " ");// Replace tabs with a single whitespacerecordString = recordString.replaceAll("\\t{1,}", " ");return recordString;}}
还没有人抢沙发呢~