Skip to content

Commit

Permalink
Ignore corrupt files 2 (#59)
Browse files Browse the repository at this point in the history
* add ignore corrupt to library

* add ignore corrupt to rdd, update tests

* update readme

* fix iterators

* fix bugs, address comments

* fix exception and tests

* remove unused imports

* address comments
  • Loading branch information
sadikovi authored Dec 12, 2016
1 parent 9d405a6 commit c43e030
Show file tree
Hide file tree
Showing 14 changed files with 832 additions and 173 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ specified when selecting data, columns that are selected contain all statistics
all data is scanned/requested. The easiest way to trigger that is running `count()` on DataFrame.
Using statistics does not require any special conditions apart from enabling option.

### Dealing with corrupt files
Package supports Spark option `spark.files.ignoreCorruptFiles`. When set to `true`, corrupt files
are ignored (corrupt header, wrong format) or partially read (corrupt data block in a middle of a
file). By default option is set to `false`, similar to Spark.

## Example

### Scala API
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/com/github/sadikovi/netflowlib/Buffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.github.sadikovi.netflowlib.record.RecordMaterializer;
import com.github.sadikovi.netflowlib.util.FilterIterator;
import com.github.sadikovi.netflowlib.util.ReadAheadInputStream;
import com.github.sadikovi.netflowlib.util.SafeIterator;

/**
* All buffers supported in NetFlow reader.
Expand Down Expand Up @@ -95,7 +96,8 @@ public ScanRecordBuffer(
int recordSize,
ByteOrder byteOrder,
boolean isCompressed,
int bufferLength) {
int bufferLength,
boolean ignoreCorrupt) {
if (isCompressed) {
inflater = new Inflater();
// InflaterInputStream is replaced with ReadAheadInputStream to allow to resolve EOF before
Expand All @@ -111,6 +113,7 @@ public ScanRecordBuffer(

this.recordMaterializer = recordMaterializer;
this.recordSize = recordSize;
this.ignoreCorrupt = ignoreCorrupt;
primary = new byte[recordSize];
secondary = new byte[recordSize];
buffer = Unpooled.wrappedBuffer(primary).order(byteOrder);
Expand All @@ -119,7 +122,7 @@ public ScanRecordBuffer(

@Override
public Iterator<Object[]> iterator() {
return new Iterator<Object[]>() {
Iterator<Object[]> iter = new Iterator<Object[]>() {
@Override
public boolean hasNext() {
// `stream.available()` returns either [0, 1] in case of compressed stream and
Expand Down Expand Up @@ -188,12 +191,19 @@ public void remove() {
throw new UnsupportedOperationException("Remove operation is not supported");
}
};

// when ignoring corrupt records, wrap it into iterator with safe termination on failures
if (ignoreCorrupt) {
return new SafeIterator<Object[]>(iter);
} else {
return iter;
}
}

@Override
public String toString() {
return "Record buffer: " + getClass().getCanonicalName() + "[compression: " + compression +
", record size: " + recordSize + "]";
", record size: " + recordSize + ", ignoreCorrupt: " + ignoreCorrupt + "]";
}

// Whether or not input stream is compressed
Expand All @@ -214,6 +224,8 @@ public String toString() {
private final int recordSize;
// Record materializer to process individual record
private final RecordMaterializer recordMaterializer;
// Ignore corrupt records and terminate iterator once encountered
private final boolean ignoreCorrupt;
}

/**
Expand All @@ -228,8 +240,10 @@ public FilterRecordBuffer(
int recordSize,
ByteOrder byteOrder,
boolean isCompressed,
int bufferLength) {
super(in, recordMaterializer, recordSize, byteOrder, isCompressed, bufferLength);
int bufferLength,
boolean ignoreCorrupt) {
super(in, recordMaterializer, recordSize, byteOrder, isCompressed, bufferLength,
ignoreCorrupt);
}

@Override
Expand Down
193 changes: 193 additions & 0 deletions src/main/java/com/github/sadikovi/netflowlib/CorruptNetFlowHeader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2016 sadikovi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sadikovi.netflowlib;

import java.nio.ByteOrder;

/**
* Class to indicate that header is corrupt. Overwrites parent method `isValid()` to hint on
* incorrectness of header.
*/
public class CorruptNetFlowHeader extends NetFlowHeader {
public CorruptNetFlowHeader() { }

////////////////////////////////////////////////////////////
// Setters API
////////////////////////////////////////////////////////////
@Override
public void setFlowVersion(short version) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setStartCapture(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setEndCapture(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setHeaderFlags(long flags) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setRotation(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setNumFlows(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setNumDropped(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setNumMisordered(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setHostname(String hostname) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setComments(String comments) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setVendor(int value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setAggVersion(short value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setAggMethod(short value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setExporterIP(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setNumCorrupt(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setSeqReset(long value) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setInterfaceName(long ip, int ifIndex, String name) {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public void setInterfaceAlias(long ip, int ifIndexCount, int ifIndex, String alias) {
throw new UnsupportedOperationException("Header is corrupt");
}

////////////////////////////////////////////////////////////
// Getters API
////////////////////////////////////////////////////////////
@Override
public short getStreamVersion() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public ByteOrder getByteOrder() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public int getHeaderSize() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public short getFlowVersion() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public long getStartCapture() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public long getEndCapture() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public long getHeaderFlags() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public boolean isCompressed() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public String getHostname() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public String getComments() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public short getAggMethod() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public short getAggVersion() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public long getFields() {
throw new UnsupportedOperationException("Header is corrupt");
}

@Override
public boolean isValid() {
return false;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/github/sadikovi/netflowlib/NetFlowHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public NetFlowHeader(short streamVersion, ByteOrder byteOrder, int headerSize) {
this.headerSize = headerSize;
}

/** For subclasses to overwrite */
protected NetFlowHeader() { }

////////////////////////////////////////////////////////////
// Setters API
////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -257,6 +260,14 @@ public long getFields() {
return this.FIELDS;
}

/**
* By default standard NetFlow header is always valid. Subclasses should overwrite this method,
* if that is not the case, or method should be conditional.
*/
public boolean isValid() {
return true;
}

// bit vector of fields
private long FIELDS = 0;
// flow stream format version either 1 or 3
Expand Down
Loading

0 comments on commit c43e030

Please sign in to comment.