From c43e03086d36b44ed44222e52f9ce96ad599ffc2 Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 12 Dec 2016 21:51:09 +1300 Subject: [PATCH] Ignore corrupt files 2 (#59) * add ignore corrupt to library * add ignore corrupt to rdd, update tests * update readme * fix iterators * fix bugs, address comments * fix exception and tests * remove unused imports * address comments --- README.md | 5 + .../github/sadikovi/netflowlib/Buffers.java | 24 +- .../netflowlib/CorruptNetFlowHeader.java | 193 ++++++++++++++++ .../sadikovi/netflowlib/NetFlowHeader.java | 11 + .../sadikovi/netflowlib/NetFlowReader.java | 139 +++++++---- .../netflowlib/util/FilterIterator.java | 2 +- .../netflowlib/util/SafeIterator.java | 79 +++++++ .../sadikovi/spark/rdd/NetFlowFileRDD.scala | 218 +++++++++--------- .../netflowlib/NetFlowHeaderSuite.java | 59 +++++ .../netflowlib/NetFlowReaderSuite.java | 18 +- .../github/sadikovi/netflowlib/UtilSuite.java | 186 +++++++++++++++ .../sadikovi/spark/netflow/NetFlowSuite.scala | 59 +++-- .../github/sadikovi/testutil/SparkBase.scala | 7 +- .../github/sadikovi/testutil/SparkLocal.scala | 5 +- 14 files changed, 832 insertions(+), 173 deletions(-) create mode 100644 src/main/java/com/github/sadikovi/netflowlib/CorruptNetFlowHeader.java create mode 100644 src/main/java/com/github/sadikovi/netflowlib/util/SafeIterator.java create mode 100644 src/test/java/com/github/sadikovi/netflowlib/NetFlowHeaderSuite.java create mode 100644 src/test/java/com/github/sadikovi/netflowlib/UtilSuite.java diff --git a/README.md b/README.md index 6970ede..5f965e8 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,11 @@ specified when selecting data, columns that are selected contain all statistics all data is scanned/requested. The easiest way to trigger that is running `count()` on DataFrame. Using statistics does not require any special conditions apart from enabling option. +### Dealing with corrupt files +Package supports Spark option `spark.files.ignoreCorruptFiles`. When set to `true`, corrupt files +are ignored (corrupt header, wrong format) or partially read (corrupt data block in a middle of a +file). By default option is set to `false`, similar to Spark. + ## Example ### Scala API diff --git a/src/main/java/com/github/sadikovi/netflowlib/Buffers.java b/src/main/java/com/github/sadikovi/netflowlib/Buffers.java index 6f2ee13..c924bcc 100644 --- a/src/main/java/com/github/sadikovi/netflowlib/Buffers.java +++ b/src/main/java/com/github/sadikovi/netflowlib/Buffers.java @@ -32,6 +32,7 @@ import com.github.sadikovi.netflowlib.record.RecordMaterializer; import com.github.sadikovi.netflowlib.util.FilterIterator; import com.github.sadikovi.netflowlib.util.ReadAheadInputStream; +import com.github.sadikovi.netflowlib.util.SafeIterator; /** * All buffers supported in NetFlow reader. @@ -95,7 +96,8 @@ public ScanRecordBuffer( int recordSize, ByteOrder byteOrder, boolean isCompressed, - int bufferLength) { + int bufferLength, + boolean ignoreCorrupt) { if (isCompressed) { inflater = new Inflater(); // InflaterInputStream is replaced with ReadAheadInputStream to allow to resolve EOF before @@ -111,6 +113,7 @@ public ScanRecordBuffer( this.recordMaterializer = recordMaterializer; this.recordSize = recordSize; + this.ignoreCorrupt = ignoreCorrupt; primary = new byte[recordSize]; secondary = new byte[recordSize]; buffer = Unpooled.wrappedBuffer(primary).order(byteOrder); @@ -119,7 +122,7 @@ public ScanRecordBuffer( @Override public Iterator iterator() { - return new Iterator() { + Iterator iter = new Iterator() { @Override public boolean hasNext() { // `stream.available()` returns either [0, 1] in case of compressed stream and @@ -188,12 +191,19 @@ public void remove() { throw new UnsupportedOperationException("Remove operation is not supported"); } }; + + // when ignoring corrupt records, wrap it into iterator with safe termination on failures + if (ignoreCorrupt) { + return new SafeIterator(iter); + } else { + return iter; + } } @Override public String toString() { return "Record buffer: " + getClass().getCanonicalName() + "[compression: " + compression + - ", record size: " + recordSize + "]"; + ", record size: " + recordSize + ", ignoreCorrupt: " + ignoreCorrupt + "]"; } // Whether or not input stream is compressed @@ -214,6 +224,8 @@ public String toString() { private final int recordSize; // Record materializer to process individual record private final RecordMaterializer recordMaterializer; + // Ignore corrupt records and terminate iterator once encountered + private final boolean ignoreCorrupt; } /** @@ -228,8 +240,10 @@ public FilterRecordBuffer( int recordSize, ByteOrder byteOrder, boolean isCompressed, - int bufferLength) { - super(in, recordMaterializer, recordSize, byteOrder, isCompressed, bufferLength); + int bufferLength, + boolean ignoreCorrupt) { + super(in, recordMaterializer, recordSize, byteOrder, isCompressed, bufferLength, + ignoreCorrupt); } @Override diff --git a/src/main/java/com/github/sadikovi/netflowlib/CorruptNetFlowHeader.java b/src/main/java/com/github/sadikovi/netflowlib/CorruptNetFlowHeader.java new file mode 100644 index 0000000..460cc8c --- /dev/null +++ b/src/main/java/com/github/sadikovi/netflowlib/CorruptNetFlowHeader.java @@ -0,0 +1,193 @@ +/* + * Copyright 2016 sadikovi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sadikovi.netflowlib; + +import java.nio.ByteOrder; + +/** + * Class to indicate that header is corrupt. Overwrites parent method `isValid()` to hint on + * incorrectness of header. + */ +public class CorruptNetFlowHeader extends NetFlowHeader { + public CorruptNetFlowHeader() { } + + //////////////////////////////////////////////////////////// + // Setters API + //////////////////////////////////////////////////////////// + @Override + public void setFlowVersion(short version) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setStartCapture(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setEndCapture(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setHeaderFlags(long flags) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setRotation(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setNumFlows(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setNumDropped(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setNumMisordered(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setHostname(String hostname) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setComments(String comments) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setVendor(int value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setAggVersion(short value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setAggMethod(short value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setExporterIP(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setNumCorrupt(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setSeqReset(long value) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setInterfaceName(long ip, int ifIndex, String name) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public void setInterfaceAlias(long ip, int ifIndexCount, int ifIndex, String alias) { + throw new UnsupportedOperationException("Header is corrupt"); + } + + //////////////////////////////////////////////////////////// + // Getters API + //////////////////////////////////////////////////////////// + @Override + public short getStreamVersion() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public ByteOrder getByteOrder() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public int getHeaderSize() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public short getFlowVersion() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public long getStartCapture() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public long getEndCapture() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public long getHeaderFlags() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public boolean isCompressed() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public String getHostname() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public String getComments() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public short getAggMethod() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public short getAggVersion() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public long getFields() { + throw new UnsupportedOperationException("Header is corrupt"); + } + + @Override + public boolean isValid() { + return false; + } +} diff --git a/src/main/java/com/github/sadikovi/netflowlib/NetFlowHeader.java b/src/main/java/com/github/sadikovi/netflowlib/NetFlowHeader.java index 0135d42..e5f679f 100644 --- a/src/main/java/com/github/sadikovi/netflowlib/NetFlowHeader.java +++ b/src/main/java/com/github/sadikovi/netflowlib/NetFlowHeader.java @@ -75,6 +75,9 @@ public NetFlowHeader(short streamVersion, ByteOrder byteOrder, int headerSize) { this.headerSize = headerSize; } + /** For subclasses to overwrite */ + protected NetFlowHeader() { } + //////////////////////////////////////////////////////////// // Setters API //////////////////////////////////////////////////////////// @@ -257,6 +260,14 @@ public long getFields() { return this.FIELDS; } + /** + * By default standard NetFlow header is always valid. Subclasses should overwrite this method, + * if that is not the case, or method should be conditional. + */ + public boolean isValid() { + return true; + } + // bit vector of fields private long FIELDS = 0; // flow stream format version either 1 or 3 diff --git a/src/main/java/com/github/sadikovi/netflowlib/NetFlowReader.java b/src/main/java/com/github/sadikovi/netflowlib/NetFlowReader.java index 6a41b04..f8b4b5b 100644 --- a/src/main/java/com/github/sadikovi/netflowlib/NetFlowReader.java +++ b/src/main/java/com/github/sadikovi/netflowlib/NetFlowReader.java @@ -70,21 +70,37 @@ public final class NetFlowReader { * com.github.sadikovi.netflowlib.Buffers for more information on buffer size constants. * @param inputStream input stream, can be Hadoop FSDataInputStream * @param buffer buffer size in bytes + * @param ignoreCorruptFile if true, ignores corrupt file, either when reading header or data + * @return reader + */ + public static NetFlowReader prepareReader( + DataInputStream inputStream, + int buffer, + boolean ignoreCorruptFile) throws IOException { + return new NetFlowReader(inputStream, buffer, ignoreCorruptFile); + } + + /** + * Initialize reader with input stream and buffer size. By default, fails if + * file is corrupt, e.g. file is not NetFlow file, or has corrupt data block. + * @param inputStream input stream, can be Hadoop FSDataInputStream + * @param buffer buffer size in bytes * @return reader */ public static NetFlowReader prepareReader( DataInputStream inputStream, int buffer) throws IOException { - return new NetFlowReader(inputStream, buffer); + return prepareReader(inputStream, buffer, false); } /** - * Initialize reader with input stream and default buffer size ~1Mb. + * Initialize reader with input stream and default buffer size ~1Mb. By default, fails if + * file is corrupt, e.g. file is not NetFlow file, or has corrupt data block. * @param inputStream input stream, can be Hadoop FSDataInputStream * @return reader */ public static NetFlowReader prepareReader(DataInputStream inputStream) throws IOException { - return prepareReader(inputStream, RecordBuffer.BUFFER_LENGTH_2); + return prepareReader(inputStream, RecordBuffer.BUFFER_LENGTH_2, false); } /** @@ -92,48 +108,67 @@ public static NetFlowReader prepareReader(DataInputStream inputStream) throws IO * strategy based on columns, predicate and statistics. Metadata, header are parsed as part of * initialization. */ - private NetFlowReader(DataInputStream inputStream, int buffer) throws IOException { + private NetFlowReader( + DataInputStream inputStream, + int buffer, + boolean ignoreCorruptFile) throws IOException { in = inputStream; bufferLength = buffer; + ignoreCorrupt = ignoreCorruptFile; + byte[] metadata = null; + ByteBuf buf = null; + + try { + metadata = new byte[METADATA_LENGTH]; + in.read(metadata, 0, METADATA_LENGTH); + + // Parse metadata, byte order does not really matter, so we go for big endian. Metadata contains + // magic numbers to verify consistency of the NetFlow file, byte order encoded as either 1 or 2, + // and stream version which affects header parsing (currently only 1 and 3 are supported). + buf = Unpooled.wrappedBuffer(metadata).order(ByteOrder.BIG_ENDIAN); + short magic1 = buf.getUnsignedByte(0); + short magic2 = buf.getUnsignedByte(1); + short order = buf.getUnsignedByte(2); + short stream = buf.getUnsignedByte(3); + + // Verify consistency of NetFlow file, also this ensures that we are at the beginning of the + // input stream + if (magic1 != HEADER_MAGIC1 || magic2 != HEADER_MAGIC2) { + throw new IOException("Corrupt NetFlow file. Wrong magic number"); + } - byte[] metadata = new byte[METADATA_LENGTH]; - in.read(metadata, 0, METADATA_LENGTH); - - // Parse metadata, byte order does not really matter, so we go for big endian. Metadata contains - // magic numbers to verify consistency of the NetFlow file, byte order encoded as either 1 or 2, - // and stream version which affects header parsing (currently only 1 and 3 are supported). - ByteBuf buf = Unpooled.wrappedBuffer(metadata).order(ByteOrder.BIG_ENDIAN); - short magic1 = buf.getUnsignedByte(0); - short magic2 = buf.getUnsignedByte(1); - short order = buf.getUnsignedByte(2); - short stream = buf.getUnsignedByte(3); - - // Verify consistency of NetFlow file, also this ensures that we are at the beginning of the - // input stream - if (magic1 != HEADER_MAGIC1 || magic2 != HEADER_MAGIC2) { - throw new UnsupportedOperationException("Corrupt NetFlow file. Wrong magic number"); - } + // Resolve byte order, last case corresponds to incorrect reading from buffer + if (order == HEADER_BIG_ENDIAN) { + byteOrder = ByteOrder.BIG_ENDIAN; + } else if (order == HEADER_LITTLE_ENDIAN) { + byteOrder = ByteOrder.LITTLE_ENDIAN; + } else { + throw new IOException("Could not recognize byte order " + order); + } - // Resolve byte order, last case corresponds to incorrect reading from buffer - if (order == HEADER_BIG_ENDIAN) { - byteOrder = ByteOrder.BIG_ENDIAN; - } else if (order == HEADER_LITTLE_ENDIAN) { - byteOrder = ByteOrder.LITTLE_ENDIAN; - } else { - throw new UnsupportedOperationException("Could not recognize byte order " + order); + streamVersion = stream; + + // Check stream version + ensureStreamVersion(); + + // Read header + header = getHeader(); + } catch (IOException err) { + if (ignoreCorrupt) { + // we subsume exception and log warning. Set header to null + log.warn("Failed to initialize reader, ignoreCorruptFile=" + ignoreCorrupt + + ", error=" + err); + header = new CorruptNetFlowHeader(); + } else { + throw err; + } + } finally { + metadata = null; + if (buf != null) { + buf.release(); + buf = null; + } } - - streamVersion = stream; - - // Check stream version - ensureStreamVersion(); - - // Read header - header = getHeader(); - - metadata = null; - buf.release(); - buf = null; } /** Ensure that stream version is either version 1 or version 3 */ @@ -442,13 +477,14 @@ private RecordBuffer prepareRecordBuffer(ScanStrategy strategy, NetFlow flowInte log.info("Skip scan based on strategy " + strategy); return new EmptyRecordBuffer(); } else if (strategy.fullScan()) { - log.info("Full scan based on strategy " + strategy); + log.info("Full scan based on strategy " + strategy + ", ignoreCorrupt=" + ignoreCorrupt); + // wrap into closeable iterator return new ScanRecordBuffer(in, strategy.getRecordMaterializer(), recordSize, byteOrder, - isCompressed, bufferLength); + isCompressed, bufferLength, ignoreCorrupt); } else { - log.info("Filter scan based on strategy " + strategy); + log.info("Filter scan based on strategy " + strategy + ", ignoreCorrupt=" + ignoreCorrupt); return new FilterRecordBuffer(in, strategy.getRecordMaterializer(), recordSize, byteOrder, - isCompressed, bufferLength); + isCompressed, bufferLength, ignoreCorrupt); } } @@ -463,14 +499,25 @@ public int getBufferLength() { return this.bufferLength; } + /** + * Whether or not reader is valid, currently is based on validity of header, assuming that file + * is of correct format, but might still have corrupt data blocks. See buffers implementation for + * usage of `ignoreCorrupt`. + */ + public boolean isValid() { + return header.isValid(); + } + // Stream of the NetFlow file private final DataInputStream in; // Byte order of the file - private final ByteOrder byteOrder; + private ByteOrder byteOrder; // Stream version of the file - private final short streamVersion; + private short streamVersion; // Buffer size for record buffer private final int bufferLength; // NetFlow header private NetFlowHeader header = null; + // Whether or not to ignore corrupt file stream + private final boolean ignoreCorrupt; } diff --git a/src/main/java/com/github/sadikovi/netflowlib/util/FilterIterator.java b/src/main/java/com/github/sadikovi/netflowlib/util/FilterIterator.java index f6d8dfb..63cbb78 100644 --- a/src/main/java/com/github/sadikovi/netflowlib/util/FilterIterator.java +++ b/src/main/java/com/github/sadikovi/netflowlib/util/FilterIterator.java @@ -25,7 +25,7 @@ * internal methods to provide original stream of values. When iterator is called, standard methods * will be called with filter already. */ -public final class FilterIterator implements Iterator { +public class FilterIterator implements Iterator { public FilterIterator(Iterator iterator) { this.iterator = iterator; } diff --git a/src/main/java/com/github/sadikovi/netflowlib/util/SafeIterator.java b/src/main/java/com/github/sadikovi/netflowlib/util/SafeIterator.java new file mode 100644 index 0000000..a77b0fe --- /dev/null +++ b/src/main/java/com/github/sadikovi/netflowlib/util/SafeIterator.java @@ -0,0 +1,79 @@ +/* + * Copyright 2016 sadikovi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sadikovi.netflowlib.util; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Iterator implementation that terminates delegate iterator when exception occurs during value + * extraction. + */ +public class SafeIterator implements Iterator { + private boolean gotNext = false; + private E nextValue = null; + protected boolean finished = false; + private Iterator delegate; + + public SafeIterator(Iterator delegate) { + this.delegate = delegate; + } + + /** + * If no next element is available, `finished` is set to `true` and may return any value + * (it will be ignored). This convention is required because `null` may be a valid value. + * @return E instance, or set 'finished' to `true` when done + */ + private E getNext() { + try { + if (delegate.hasNext()) { + return delegate.next(); + } else { + finished = true; + return null; + } + } catch (Exception err) { + finished = true; + return null; + } + } + + @Override + public boolean hasNext() { + if (!finished) { + if (!gotNext) { + nextValue = getNext(); + gotNext = true; + } + } + return !finished; + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException("End of stream"); + } + gotNext = false; + return nextValue; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Operation 'remove' is not supported"); + } +} diff --git a/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala b/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala index f3ebc98..eb3543e 100644 --- a/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala +++ b/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala @@ -67,6 +67,9 @@ private[spark] class NetFlowFileRDD[T <: SQLRow : ClassTag] ( val resolvedColumns: Array[MappedColumn], val resolvedFilter: Option[FilterPredicate], val statisticsIndex: Map[Int, MappedColumn]) extends FileRDD[SQLRow](sc, Nil) { + // when true, ignore corrupt files, either with wrong header or corrupt data block + private val ignoreCorruptFiles = sc.getConf.getBoolean("spark.files.ignoreCorruptFiles", false) + override def getPartitions: Array[Partition] = { val slices = partitionMode.tryToPartition(data) slices.indices.map(i => new NetFlowFilePartition[NetFlowFileStatus](id, i, slices(i))).toArray @@ -106,131 +109,138 @@ private[spark] class NetFlowFileRDD[T <: SQLRow : ClassTag] ( // Prepare file stream var stm: FSDataInputStream = fs.open(path) - val reader = NetFlowReader.prepareReader(stm, elem.bufferSize) - val header = reader.getHeader() - // Actual version of the file - val actualVersion = header.getFlowVersion() - // Compression flag - val isCompressed = header.isCompressed() - - logDebug(s""" - > NetFlow: { - > File: ${elem.path}, - > File length: ${fileLength} bytes, - > Flow version: ${actualVersion}, - > Compression: ${isCompressed}, - > Buffer size: ${elem.bufferSize} bytes, - > Start capture: ${header.getStartCapture()}, - > End capture: ${header.getEndCapture()}, - > Hostname: ${header.getHostname()}, - > Comments: ${header.getComments()} - > } - """.stripMargin('>')) - - // Currently we cannot resolve version and proceed with parsing, we require pre-set version. - require(actualVersion == elem.version, - s"Expected version ${elem.version}, got ${actualVersion} for file ${elem.path}. " + - "Scan of the files with different (compatible) versions, e.g. 5, 6, and 7 is not " + - "supported currently") - - // Build record buffer based on resolved filter, if filter is not defined use default scan - // with trivial predicate - val recordBuffer = if (updatedFilter.nonEmpty) { - reader.prepareRecordBuffer(internalColumns, updatedFilter.get) + val reader = NetFlowReader.prepareReader(stm, elem.bufferSize, ignoreCorruptFiles) + // this flag is only checked when ignoreCorruptFiles = true, otherwise initialization will + // throw exception, if file is corrupt + if (!reader.isValid()) { + logWarning(s"Failed to read file $path, ignoreCorruptFiles=$ignoreCorruptFiles") + buffer = buffer ++ Iterator.empty } else { - reader.prepareRecordBuffer(internalColumns) - } + val header = reader.getHeader() + // Actual version of the file + val actualVersion = header.getFlowVersion() + // Compression flag + val isCompressed = header.isCompressed() - val rawIterator = new CloseableIterator[Array[Object]] { - private var delegate = recordBuffer.iterator().asScala + logDebug(s""" + > NetFlow: { + > File: ${elem.path}, + > File length: ${fileLength} bytes, + > Flow version: ${actualVersion}, + > Compression: ${isCompressed}, + > Buffer size: ${elem.bufferSize} bytes, + > Start capture: ${header.getStartCapture()}, + > End capture: ${header.getEndCapture()}, + > Hostname: ${header.getHostname()}, + > Comments: ${header.getComments()} + > } + """.stripMargin('>')) - override def getNext(): Array[Object] = { - // If delegate has traversed over all elements mark it as finished - // to allow to close stream - if (delegate.hasNext) { - delegate.next - } else { - finished = true - null - } + // Currently we cannot resolve version and proceed with parsing, we require pre-set version. + require(actualVersion == elem.version, + s"Expected version ${elem.version}, got ${actualVersion} for file ${elem.path}. " + + "Scan of the files with different (compatible) versions, e.g. 5, 6, and 7 is not " + + "supported currently") + + // Build record buffer based on resolved filter, if filter is not defined use default scan + // with trivial predicate + val recordBuffer = if (updatedFilter.nonEmpty) { + reader.prepareRecordBuffer(internalColumns, updatedFilter.get) + } else { + reader.prepareRecordBuffer(internalColumns) } - override def close(): Unit = { - // Close stream if possible of fail silently, - // at this point exception does not really matter - try { - if (stm != null) { - stm.close() - stm = null + val rawIterator = new CloseableIterator[Array[Object]] { + private var delegate = recordBuffer.iterator().asScala + + override def getNext(): Array[Object] = { + // If delegate has traversed over all elements mark it as finished + // to allow to close stream + if (delegate.hasNext) { + delegate.next + } else { + finished = true + null } - } catch { - case err: Exception => // do nothing } - } - } - Option(TaskContext.get).foreach(_.addTaskCompletionListener(_ => rawIterator.closeIfNeeded)) - // Try collecting statistics before any other mode, because attributes collect raw data. If - // file exists, it is assumed that statistics are already written - val writableIterator = if (updatedFilter.isEmpty && elem.statisticsPathStatus.nonEmpty && - statisticsIndex.nonEmpty) { - val statStatus = elem.statisticsPathStatus.get - if (!statStatus.exists) { - logDebug(s"Prepare statistics for a path ${statStatus.path}") - val attributes = AttributeMap.create() - new Iterator[Array[Object]] { - override def hasNext: Boolean = { - // If raw iterator does not have any elements we assume that it is EOF and write - // statistics into a file - // There is a feature in Spark when iterator is invoked once to get `hasNext`, and - // then continue to extract records. For empty files, Spark will try to write - // statistics twice, because of double invocation of `hasNext`, we overwrite old file - if (!rawIterator.hasNext) { - logInfo(s"Ready to write statistics for path: ${statStatus.path}") - attributes.write(statStatus.path, conf, overwrite = true) + override def close(): Unit = { + // Close stream if possible of fail silently, + // at this point exception does not really matter + try { + if (stm != null) { + stm.close() + stm = null } - rawIterator.hasNext + } catch { + case err: Exception => // do nothing } + } + } + Option(TaskContext.get).foreach(_.addTaskCompletionListener(_ => rawIterator.closeIfNeeded)) - override def next(): Array[Object] = { - val arr = rawIterator.next - for (i <- 0 until numColumns) { - if (statisticsIndex.contains(i)) { - val key = statisticsIndex(i).internalColumn.getColumnName - attributes.updateStatistics(key, arr(i)) + // Try collecting statistics before any other mode, because attributes collect raw data. If + // file exists, it is assumed that statistics are already written + val writableIterator = if (updatedFilter.isEmpty && elem.statisticsPathStatus.nonEmpty && + statisticsIndex.nonEmpty) { + val statStatus = elem.statisticsPathStatus.get + if (!statStatus.exists) { + logDebug(s"Prepare statistics for a path ${statStatus.path}") + val attributes = AttributeMap.create() + new Iterator[Array[Object]] { + override def hasNext: Boolean = { + // If raw iterator does not have any elements we assume that it is EOF and write + // statistics into a file + // There is a feature in Spark when iterator is invoked once to get `hasNext`, and + // then continue to extract records. For empty files, Spark will try to write + // statistics twice, because of double invocation of `hasNext`, overwrite old file + if (!rawIterator.hasNext) { + logInfo(s"Ready to write statistics for path: ${statStatus.path}") + attributes.write(statStatus.path, conf, overwrite = true) } + rawIterator.hasNext + } + + override def next(): Array[Object] = { + val arr = rawIterator.next + for (i <- 0 until numColumns) { + if (statisticsIndex.contains(i)) { + val key = statisticsIndex(i).internalColumn.getColumnName + attributes.updateStatistics(key, arr(i)) + } + } + arr } - arr } + } else { + logDebug(s"Statistics file ${statStatus.path} already exists, skip writing") + rawIterator } } else { - logDebug(s"Statistics file ${statStatus.path} already exists, skip writing") + logDebug("Statistics are disabled, skip writing") rawIterator } - } else { - logDebug("Statistics are disabled, skip writing") - rawIterator - } - // Conversion iterator, applies defined modification for convertable fields - val withConversionsIterator = if (applyConversion) { - // For each field we check if possible conversion is available. If it is we apply direct - // conversion, otherwise return unchanged value. Note that this should be in sync with - // `applyConversion` and updated schema from `ResolvedInterface`. - writableIterator.map(arr => { - for (i <- 0 until numColumns) { - resolvedColumns(i).convertFunction match { - case Some(func) => arr(i) = func.direct(arr(i)) - case None => // do nothing + // Conversion iterator, applies defined modification for convertable fields + val withConversionsIterator = if (applyConversion) { + // For each field we check if possible conversion is available. If it is we apply direct + // conversion, otherwise return unchanged value. Note that this should be in sync with + // `applyConversion` and updated schema from `ResolvedInterface`. + writableIterator.map(arr => { + for (i <- 0 until numColumns) { + resolvedColumns(i).convertFunction match { + case Some(func) => arr(i) = func.direct(arr(i)) + case None => // do nothing + } } - } - arr - }) - } else { - writableIterator - } + arr + }) + } else { + writableIterator + } - buffer = buffer ++ withConversionsIterator + buffer = buffer ++ withConversionsIterator + } } new InterruptibleIterator(context, new Iterator[SQLRow] { diff --git a/src/test/java/com/github/sadikovi/netflowlib/NetFlowHeaderSuite.java b/src/test/java/com/github/sadikovi/netflowlib/NetFlowHeaderSuite.java new file mode 100644 index 0000000..0fa1d7a --- /dev/null +++ b/src/test/java/com/github/sadikovi/netflowlib/NetFlowHeaderSuite.java @@ -0,0 +1,59 @@ +/* + * Copyright 2016 sadikovi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sadikovi.netflowlib; + +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.junit.Test; +import org.junit.Ignore; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertEquals; + +import com.github.sadikovi.netflowlib.CorruptNetFlowHeader; +import com.github.sadikovi.netflowlib.NetFlowHeader; + +public class NetFlowHeaderSuite { + @Test + public void testInitNetFlowHeader() { + NetFlowHeader header = new NetFlowHeader((short) 5, ByteOrder.BIG_ENDIAN); + assertEquals(header.getStreamVersion(), 5); + assertEquals(header.getByteOrder(), ByteOrder.BIG_ENDIAN); + assertEquals(header.isValid(), true); + } + + @Test + public void testInitCorruptNetFlowHeader() { + NetFlowHeader header = new CorruptNetFlowHeader(); + assertEquals(header.isValid(), false); + } + + @Test(expected = UnsupportedOperationException.class) + public void testFailOnCorruptHeaderMethod1() { + NetFlowHeader header = new CorruptNetFlowHeader(); + header.getStreamVersion(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testFailOnCorruptHeaderMethod2() { + NetFlowHeader header = new CorruptNetFlowHeader(); + header.getByteOrder(); + } +} diff --git a/src/test/java/com/github/sadikovi/netflowlib/NetFlowReaderSuite.java b/src/test/java/com/github/sadikovi/netflowlib/NetFlowReaderSuite.java index 11cd091..322aeed 100644 --- a/src/test/java/com/github/sadikovi/netflowlib/NetFlowReaderSuite.java +++ b/src/test/java/com/github/sadikovi/netflowlib/NetFlowReaderSuite.java @@ -68,11 +68,21 @@ public void testReadingCorrupt() throws IOException { try { NetFlowReader nr = NetFlowReader.prepareReader(stm, 30000); - } catch (UnsupportedOperationException uoe) { - assertThat(uoe.getMessage(), containsString("Corrupt NetFlow file. Wrong magic number")); + } catch (IOException ioe) { + assertThat(ioe.getMessage(), containsString("Corrupt NetFlow file. Wrong magic number")); } } + // test reader on corrupt input, should not fail + @Test + public void testReadingCorruptWithIgnoreCorrupt() throws IOException { + String file = getClass().getResource("/corrupt/ftv5.2016-01-13.compress.9.sample-01").getPath(); + FSDataInputStream stm = getTestStream(file); + + NetFlowReader nr = NetFlowReader.prepareReader(stm, 30000, true); + assertEquals(nr.isValid(), false); + } + @Test public void testReadingCompressed() throws IOException { String file = getClass(). @@ -82,6 +92,7 @@ public void testReadingCompressed() throws IOException { NetFlowReader nr = NetFlowReader.prepareReader(stm, 30000); NetFlowHeader header = nr.getHeader(); + assertEquals(nr.isValid(), true); assertEquals(header.getFlowVersion(), (short) 7); assertEquals(header.isCompressed(), true); assertEquals(header.getStartCapture(), 0L); @@ -103,6 +114,7 @@ public void testReadingUncompressed() throws IOException { NetFlowReader nr = NetFlowReader.prepareReader(stm, 30000); NetFlowHeader header = nr.getHeader(); + assertEquals(nr.isValid(), true); assertEquals(header.getFlowVersion(), (short) 5); assertEquals(header.isCompressed(), false); assertEquals(header.getStartCapture(), 0L); @@ -124,6 +136,7 @@ public void testReadingUncompressedEmpty() throws IOException { NetFlowReader nr = NetFlowReader.prepareReader(stm, 30000); NetFlowHeader header = nr.getHeader(); + assertEquals(nr.isValid(), true); assertEquals(header.getFlowVersion(), (short) 5); assertEquals(header.isCompressed(), false); assertEquals(header.getStartCapture(), 0L); @@ -145,6 +158,7 @@ public void testReadingCompressedEmpty() throws IOException { NetFlowReader nr = NetFlowReader.prepareReader(stm, 30000); NetFlowHeader header = nr.getHeader(); + assertEquals(nr.isValid(), true); assertEquals(header.getFlowVersion(), (short) 5); assertEquals(header.isCompressed(), true); assertEquals(header.getStartCapture(), 0L); diff --git a/src/test/java/com/github/sadikovi/netflowlib/UtilSuite.java b/src/test/java/com/github/sadikovi/netflowlib/UtilSuite.java new file mode 100644 index 0000000..dfd1ba4 --- /dev/null +++ b/src/test/java/com/github/sadikovi/netflowlib/UtilSuite.java @@ -0,0 +1,186 @@ +/* + * Copyright 2016 sadikovi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sadikovi.netflowlib; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.junit.Test; +import org.junit.Ignore; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertEquals; + +import com.github.sadikovi.netflowlib.util.FilterIterator; +import com.github.sadikovi.netflowlib.util.SafeIterator; + +public class UtilSuite { + @Test(expected = NoSuchElementException.class) + public void testFilterIteratorFailIfEmpty() { + List values = new ArrayList(); + FilterIterator iter = new FilterIterator(values.iterator()); + iter.next(); + } + + @Test + public void testFilterIteratorReturnAll() { + List values = new ArrayList(); + values.add("a"); + values.add("a"); + values.add("a"); + FilterIterator iter = new FilterIterator(values.iterator()); + + int count = 0; + while (iter.hasNext()) { + assertSame(iter.next(), "a"); + count++; + } + + assertEquals(count, values.size()); + } + + @Test + public void testFilterIteratorReturnSome() { + List values = new ArrayList(); + values.add("a"); + values.add(null); + values.add("a"); + FilterIterator iter = new FilterIterator(values.iterator()); + + int count = 0; + while (iter.hasNext()) { + assertSame(iter.next(), "a"); + count++; + } + + assertEquals(count, 2); + } + + @Test + public void testFilterIteratorReturnNone() { + List values = new ArrayList(); + values.add(null); + values.add(null); + values.add(null); + FilterIterator iter = new FilterIterator(values.iterator()); + + int count = 0; + while (iter.hasNext()) { + count++; + } + + assertEquals(count, 0); + } + + @Test(expected = NoSuchElementException.class) + public void testSafeIteratorFailIfEmpty() { + List values = new ArrayList(); + SafeIterator iter = new SafeIterator(values.iterator()); + iter.next(); + } + + @Test + public void testSafeIteratorReturnAll() { + List values = new ArrayList(); + values.add("a"); + values.add("a"); + values.add("a"); + SafeIterator iter = new SafeIterator(values.iterator()); + + int count = 0; + while (iter.hasNext()) { + assertSame(iter.next(), "a"); + count++; + } + + assertEquals(count, values.size()); + } + + @Test + public void testSafeIteratorTerminateOnErrorInHasNext() { + final List values = new ArrayList(); + values.add("a"); + values.add(null); + values.add("a"); + Iterator delegate = new Iterator() { + private Iterator parent = values.iterator(); + private String current; + + @Override + public boolean hasNext() { + current = parent.next(); + if (current == null) { + throw new IllegalStateException("Test"); + } + return parent.hasNext(); + } + + @Override + public String next() { + return current; + } + + @Override + public void remove() { } + }; + SafeIterator iter = new SafeIterator(delegate); + + int count = 0; + while (iter.hasNext()) { + assertSame(iter.next(), "a"); + count++; + } + + // Expect one record only, since second record fails with state exception + assertEquals(count, 1); + } + + @Test + public void testSafeIteratorTerminateOnErrorInNext() { + final List values = new ArrayList(); + values.add("a"); + values.add(null); + values.add("a"); + Iterator delegate = new Iterator() { + private Iterator parent = values.iterator(); + + @Override + public boolean hasNext() { + return parent.hasNext(); + } + + @Override + public String next() { + return parent.next().toString(); + } + + @Override + public void remove() { } + }; + SafeIterator iter = new SafeIterator(delegate); + + int count = 0; + while (iter.hasNext()) { + assertSame(iter.next(), "a"); + count++; + } + + // Expect one record only, since second record fails with null pointer exception + assertEquals(count, 1); + } +} diff --git a/src/test/scala/com/github/sadikovi/spark/netflow/NetFlowSuite.scala b/src/test/scala/com/github/sadikovi/spark/netflow/NetFlowSuite.scala index 8c426da..ba30b5e 100644 --- a/src/test/scala/com/github/sadikovi/spark/netflow/NetFlowSuite.scala +++ b/src/test/scala/com/github/sadikovi/spark/netflow/NetFlowSuite.scala @@ -26,8 +26,6 @@ import org.apache.spark.sql.{SQLContext, DataFrame, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.sources._ -import org.scalatest.ConfigMap - import com.github.sadikovi.netflowlib.Buffers.RecordBuffer import com.github.sadikovi.netflowlib.version.NetFlowV5 import com.github.sadikovi.spark.netflow.sources._ @@ -36,16 +34,9 @@ import com.github.sadikovi.spark.util.Utils import com.github.sadikovi.testutil.{UnitTestSpec, SparkLocal} import com.github.sadikovi.testutil.implicits._ -class NetFlowSuite extends UnitTestSpec with SparkLocal { - override def beforeAll(configMap: ConfigMap) { - startSparkContext() - } - - override def afterAll(configMap: ConfigMap) { - stopSparkContext() - } - - private def readNetFlow( +/** Common functionality to read NetFlow files */ +abstract class SparkNetFlowTestSuite extends UnitTestSpec with SparkLocal { + protected def readNetFlow( sqlContext: SQLContext, version: Short, path: String, @@ -76,6 +67,16 @@ class NetFlowSuite extends UnitTestSpec with SparkLocal { val path12 = getClass().getResource("/anomaly/ftv5.2016-03-15.compress9.bigend.records1").getPath // version 5 file that starts with large byte 255 val path13 = getClass().getResource("/anomaly/ftv5.2016-04-09.compress9.large-byte-start").getPath +} + +class NetFlowSuite extends SparkNetFlowTestSuite { + override def beforeAll() { + startSparkContext() + } + + override def afterAll() { + stopSparkContext() + } test("read uncompressed v5 format") { val sqlContext = new SQLContext(sc) @@ -156,7 +157,7 @@ class NetFlowSuite extends UnitTestSpec with SparkLocal { load(s"file:${path3}").count() } val msg = err.getMessage() - assert(msg.contains("java.lang.UnsupportedOperationException: " + + assert(msg.contains("java.io.IOException: " + "Corrupt NetFlow file. Wrong magic number")) } @@ -637,3 +638,35 @@ class NetFlowSuite extends UnitTestSpec with SparkLocal { } } } + +/** Suite to test `ignoreCorruptFiles` option */ +class NetFlowIgnoreCorruptSuite extends SparkNetFlowTestSuite { + override def beforeAll() { + startSparkContext(Map("spark.files.ignoreCorruptFiles" -> "true")) + } + + override def afterAll() { + stopSparkContext() + } + + test("return empty iterator, when file is not a NetFlow file (header failure)") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.read.format("com.github.sadikovi.spark.netflow").option("version", "5"). + load(s"file:${path3}") + df.count() should be (0) + } + + test("return partial data, when NetFlow file is corrupt") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.read.format("com.github.sadikovi.spark.netflow").option("version", "5"). + load(s"file:${path4}") + df.count() should be (553) + } + + test("return full data, when NetFlow file is correct") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.read.format("com.github.sadikovi.spark.netflow").option("version", "5"). + load(s"file:${path2}") + df.count() should be (1000) + } +} diff --git a/src/test/scala/com/github/sadikovi/testutil/SparkBase.scala b/src/test/scala/com/github/sadikovi/testutil/SparkBase.scala index 6d2b7a3..e78e0b7 100644 --- a/src/test/scala/com/github/sadikovi/testutil/SparkBase.scala +++ b/src/test/scala/com/github/sadikovi/testutil/SparkBase.scala @@ -23,8 +23,13 @@ import org.apache.spark.{SparkContext, SparkConf} private[testutil] trait SparkBase { @transient private[testutil] var _sc: SparkContext = null - /** Start (or init) Spark context. */ + /** Initialize Spark context with default parameters */ def startSparkContext() { + startSparkContext(Map.empty) + } + + /** Start (or init) Spark context. */ + def startSparkContext(sparkOptions: Map[String, String]) { // stop previous Spark context stopSparkContext() _sc = new SparkContext() diff --git a/src/test/scala/com/github/sadikovi/testutil/SparkLocal.scala b/src/test/scala/com/github/sadikovi/testutil/SparkLocal.scala index b54f71b..7604008 100644 --- a/src/test/scala/com/github/sadikovi/testutil/SparkLocal.scala +++ b/src/test/scala/com/github/sadikovi/testutil/SparkLocal.scala @@ -31,9 +31,12 @@ trait SparkLocal extends SparkBase { set("spark.executor.memory", "2g") } - override def startSparkContext() { + override def startSparkContext(sparkOptions: Map[String, String]) { setLoggingLevel(Level.ERROR) val conf = localConf() + sparkOptions.foreach { case (key, value) => + conf.set(key, value) + } _sc = new SparkContext(conf) } }