Skip to content

Commit

Permalink
Remove netty ByteBuf dependency, refactor IO (#72)
Browse files Browse the repository at this point in the history
* add wrapped byte buf

* add endian support for byte buffer

* remove netty bytebuf

* make benchmark single thread

* update benchmark

* update readme
  • Loading branch information
sadikovi authored Jun 10, 2017
1 parent 228965d commit 1af9de3
Show file tree
Hide file tree
Showing 13 changed files with 528 additions and 188 deletions.
47 changes: 17 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,37 +117,28 @@ Latest benchmarks:
- Iterations: 5
- Files: file:/Users/sadikovi/developer/spark-netflow/temp/ftn/0[1,2,3]/ft*
- Version: 5
Running benchmark: NetFlow full scan
Running case: Scan, stringify = F
Running case: Scan, stringify = T
Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Mac OS X 10.12.4
Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
NetFlow full scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Scan, stringify = F 364 / 462 2748.0 36389.9 1.0X
Scan, stringify = T 923 / 935 1082.9 92347.8 0.4X
Running benchmark: NetFlow predicate scan
Running case: Predicate pushdown = F, high
Running case: Predicate pushdown = T, high
Running case: Predicate pushdown = F, low
Running case: Predicate pushdown = T, low
NetFlow full scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Scan, stringify = F 567 / 633 0.0 56726.7 1.0X
Scan, stringify = T 968 / 1049 0.0 96824.6 0.6X
Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Mac OS X 10.12.4
Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
NetFlow predicate scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Predicate pushdown = F, high 1009 / 1094 991.2 100892.6 1.0X
Predicate pushdown = T, high 1056 / 2029 947.2 105570.7 1.0X
Predicate pushdown = F, low 766 / 833 1304.9 76633.3 1.3X
Predicate pushdown = T, low 175 / 181 5709.3 17515.4 5.8X
Running benchmark: NetFlow aggregated report
Running case: Aggregated report
NetFlow predicate scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Predicate pushdown = F, high 1148 / 1200 0.0 114845.4 1.0X
Predicate pushdown = T, high 1208 / 1257 0.0 120818.0 1.0X
Predicate pushdown = F, low 706 / 732 0.0 70559.3 1.6X
Predicate pushdown = T, low 226 / 243 0.0 22575.0 5.1X
Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Mac OS X 10.12.4
Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
NetFlow aggregated report: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Aggregated report 1362 / 2242 734.3 136183.3 1.0X
NetFlow aggregated report: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Aggregated report 2171 / 2270 0.0 217089.9 1.0X
```

## Using `netflowlib` library separately
Expand All @@ -166,10 +157,6 @@ to file header and iterator of rows, allows to pass additional predicate and sta
- `com.github.sadikovi.netflowlib.NetFlowHeader` header information can be accessed using this
class from `NetFlowReader.getHeader()`, see class for more information on flags available

Note that library has only one external dependency on `io.netty.buffer.ByteBuf` buffers, which
could be replaced with standard Java buffer functionality, but since it was built for being used as
part of a spark-package, this dependency comes with Spark.

Here is the general usage pattern:
```scala

Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client")
)

// check deprecation without manual restart
// check deprecation and unchecked without manual restart
javacOptions in ThisBuild ++= Seq("-Xlint:unchecked")
scalacOptions in ThisBuild ++= Seq("-unchecked", "-deprecation", "-feature")

// Display full-length stacktraces from ScalaTest:
Expand Down
20 changes: 4 additions & 16 deletions src/main/java/com/github/sadikovi/netflowlib/Buffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import com.github.sadikovi.netflowlib.record.RecordMaterializer;
import com.github.sadikovi.netflowlib.util.FilterIterator;
import com.github.sadikovi.netflowlib.util.ReadAheadInputStream;
import com.github.sadikovi.netflowlib.util.SafeIterator;
import com.github.sadikovi.netflowlib.util.WrappedByteBuf;

/**
* All buffers supported in NetFlow reader.
Expand Down Expand Up @@ -115,7 +113,7 @@ public ScanRecordBuffer(
this.recordSize = recordSize;
this.ignoreCorrupt = ignoreCorrupt;
recordBytes = new byte[recordSize];
buffer = Unpooled.wrappedBuffer(recordBytes).order(byteOrder);
buffer = WrappedByteBuf.init(recordBytes, byteOrder);
numBytesRead = 0;
}

Expand All @@ -141,12 +139,6 @@ public boolean hasNext() {
} catch (IOException io) {
stream = null;
}

// Release buffer after EOF
if (buffer != null && buffer.refCnt() > 0) {
buffer.release(buffer.refCnt());
}

buffer = null;
}
}
Expand Down Expand Up @@ -190,11 +182,7 @@ public void remove() {
};

// when ignoring corrupt records, wrap it into iterator with safe termination on failures
if (ignoreCorrupt) {
return new SafeIterator<Object[]>(iter);
} else {
return iter;
}
return ignoreCorrupt ? new SafeIterator<Object[]>(iter) : iter;
}

@Override
Expand All @@ -212,7 +200,7 @@ public String toString() {
// Array of bytes for a record, updated partially when compression buffer needs to be refilled
private final byte[] recordBytes;
// Buffer for the record
private ByteBuf buffer;
private WrappedByteBuf buffer;
// Number of bytes currently have been read
private int numBytesRead;
// Size of record, depends on NetFlow format
Expand Down
30 changes: 8 additions & 22 deletions src/main/java/com/github/sadikovi/netflowlib/NetFlowReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,23 @@
import java.nio.ByteOrder;
import java.util.HashMap;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.sadikovi.netflowlib.ScanPlanner;
import com.github.sadikovi.netflowlib.Strategies.ScanStrategy;

import com.github.sadikovi.netflowlib.Buffers.RecordBuffer;
import com.github.sadikovi.netflowlib.Buffers.EmptyRecordBuffer;
import com.github.sadikovi.netflowlib.Buffers.FilterRecordBuffer;
import com.github.sadikovi.netflowlib.Buffers.ScanRecordBuffer;

import com.github.sadikovi.netflowlib.predicate.Columns.Column;
import com.github.sadikovi.netflowlib.predicate.Operators.FilterPredicate;

import com.github.sadikovi.netflowlib.statistics.Statistics;
import com.github.sadikovi.netflowlib.statistics.StatisticsTypes.LongStatistics;

import com.github.sadikovi.netflowlib.version.NetFlow;
import com.github.sadikovi.netflowlib.version.NetFlowV5;
import com.github.sadikovi.netflowlib.version.NetFlowV7;
import com.github.sadikovi.netflowlib.util.WrappedByteBuf;

/**
* [[NetFlowReader]] is a main entry to process input stream of NetFlow file either from local
Expand Down Expand Up @@ -116,7 +110,7 @@ private NetFlowReader(
bufferLength = buffer;
ignoreCorrupt = ignoreCorruptFile;
byte[] metadata = null;
ByteBuf buf = null;
WrappedByteBuf buf = null;

try {
metadata = new byte[METADATA_LENGTH];
Expand All @@ -125,7 +119,7 @@ private NetFlowReader(
// Parse metadata, byte order does not really matter, so we go for big endian. Metadata contains
// magic numbers to verify consistency of the NetFlow file, byte order encoded as either 1 or 2,
// and stream version which affects header parsing (currently only 1 and 3 are supported).
buf = Unpooled.wrappedBuffer(metadata).order(ByteOrder.BIG_ENDIAN);
buf = WrappedByteBuf.init(metadata, ByteOrder.BIG_ENDIAN);
short magic1 = buf.getUnsignedByte(0);
short magic2 = buf.getUnsignedByte(1);
short order = buf.getUnsignedByte(2);
Expand Down Expand Up @@ -164,10 +158,7 @@ private NetFlowReader(
}
} finally {
metadata = null;
if (buf != null) {
buf.release();
buf = null;
}
buf = null;
}
}

Expand All @@ -183,7 +174,7 @@ private NetFlowHeader prepareHeader() throws IOException {
NetFlowHeader internalHeader;
int numBytesRead = 0;
int lenRead = 0;
ByteBuf buf;
WrappedByteBuf buf;
byte[] headerArray;

// Read header depending on stream version (different from flow version)
Expand All @@ -200,7 +191,7 @@ private NetFlowHeader prepareHeader() throws IOException {
throw new UnsupportedOperationException("Short read while loading header offset");
}

buf = Unpooled.wrappedBuffer(headerArray).order(byteOrder);
buf = WrappedByteBuf.init(headerArray, byteOrder);
int headerSize = (int)buf.getUnsignedInt(0);
if (headerSize <= 0) {
throw new UnsupportedOperationException("Failed to load header of size " + headerSize);
Expand All @@ -218,7 +209,7 @@ private NetFlowHeader prepareHeader() throws IOException {
throw new UnsupportedOperationException("Short read while loading header data");
}
// build buffer
buf = Unpooled.wrappedBuffer(headerArray).order(byteOrder);
buf = WrappedByteBuf.init(headerArray, byteOrder);

// resolve stream version (either 1 or 3)
if (streamVersion == 1) {
Expand Down Expand Up @@ -286,7 +277,7 @@ private NetFlowHeader prepareHeader() throws IOException {
break;
// FT_TLV_EX_VER
case 0x2:
internalHeader.setFlowVersion((short)buf.getUnsignedShort(tlv_v));
internalHeader.setFlowVersion((short) buf.getUnsignedShort(tlv_v));
break;
// FT_TLV_AGG_VER
case 0x3:
Expand Down Expand Up @@ -396,11 +387,6 @@ private NetFlowHeader prepareHeader() throws IOException {
break;
}
}

if (buf != null && buf.refCnt() > 0) {
buf.release(buf.refCnt());
}

buf = null;
pr = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.github.sadikovi.netflowlib;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.io.Serializable;

import com.github.sadikovi.netflowlib.predicate.Inspectors.ValueInspector;
import com.github.sadikovi.netflowlib.statistics.Statistics;
import com.github.sadikovi.netflowlib.util.WrappedByteBuf;

/**
* [[Column]] is a base class for all typed columns for all NetFlow versions. They contain basic
Expand Down Expand Up @@ -56,6 +58,12 @@ public int getColumnOffset() {
return columnOffset;
}

/** Read field from byte buffer and return value that confirms to column class */
public abstract Object readField(WrappedByteBuf buffer);

/** Update value inspector with value from buffer */
public abstract void updateValueInspector(WrappedByteBuf buffer, ValueInspector vi);

@Override
public String toString() {
return getClass().getSimpleName() + "(name=" + columnName + ", offset=" + columnOffset + ")";
Expand Down Expand Up @@ -102,6 +110,16 @@ public ByteColumn(String name, int offset) {
this(name, offset, (byte) 0, Byte.MAX_VALUE);
}

@Override
public Object readField(WrappedByteBuf buffer) {
return buffer.getByte(getColumnOffset());
}

@Override
public void updateValueInspector(WrappedByteBuf buffer, ValueInspector vi) {
vi.update(buffer.getByte(getColumnOffset()));
}

@Override
public Object getMin() {
return minValue;
Expand Down Expand Up @@ -130,6 +148,16 @@ public ShortColumn(String name, int offset) {
this(name, offset, (short) 0, Short.MAX_VALUE);
}

@Override
public Object readField(WrappedByteBuf buffer) {
return buffer.getUnsignedByte(getColumnOffset());
}

@Override
public void updateValueInspector(WrappedByteBuf buffer, ValueInspector vi) {
vi.update(buffer.getUnsignedByte(getColumnOffset()));
}

@Override
public Object getMin() {
return minValue;
Expand Down Expand Up @@ -158,6 +186,16 @@ public IntColumn(String name, int offset) {
this(name, offset, (int) 0, Integer.MAX_VALUE);
}

@Override
public Object readField(WrappedByteBuf buffer) {
return buffer.getUnsignedShort(getColumnOffset());
}

@Override
public void updateValueInspector(WrappedByteBuf buffer, ValueInspector vi) {
vi.update(buffer.getUnsignedShort(getColumnOffset()));
}

@Override
public Object getMin() {
return minValue;
Expand Down Expand Up @@ -186,6 +224,16 @@ public LongColumn(String name, int offset) {
this(name, offset, (long) 0, Long.MAX_VALUE);
}

@Override
public Object readField(WrappedByteBuf buffer) {
return buffer.getUnsignedInt(getColumnOffset());
}

@Override
public void updateValueInspector(WrappedByteBuf buffer, ValueInspector vi) {
vi.update(buffer.getUnsignedInt(getColumnOffset()));
}

@Override
public Object getMin() {
return minValue;
Expand Down
Loading

0 comments on commit 1af9de3

Please sign in to comment.