Skip to content

Commit

Permalink
minor changes, added version check
Browse files Browse the repository at this point in the history
  • Loading branch information
sadikovi committed Jan 20, 2016
1 parent 383d30d commit e32485e
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ project/plugins/project/
# Scala-IDE specific
.scala_dependencies
.worksheet

metastore_db/

temp/
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ name := "spark-netflow"

organization := "com.github.sadikovi"

scalaVersion := "2.10.4"
scalaVersion := "2.10.5"

crossScalaVersions := Seq("2.10.5", "2.11.7")

spName := "sadikovi/spark-netflow"

sparkVersion := "1.5.1"
sparkVersion := "1.4.1"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/github/sadikovi/netflow/RecordBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class RecordBuffer implements Iterable<Object[]> {
// length of the buffer, usually 32768
public static final int BUFFER_LENGTH = 32768;

// length of buffer (option 1)
public static final int BUFFER_LENGTH_1 = 3276800;

/**
* Create RecordBuffer with default buffer length.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ private[netflow] class NetflowRelation(
private val parameters: Map[String, String])
(@transient val sqlContext: SQLContext) extends HadoopFsRelation with Logging {

// Netflow version
private val version = parameters.getOrElse("version",
// Resolve Netflow version
private val possibleVersion = parameters.getOrElse("version",
sys.error("'version' must be specified for Netflow data"))
SchemaResolver.validateVersion(version)
private val version = SchemaResolver.validateVersion(possibleVersion).getOrElse(
sys.error(s"Invalid version specified: ${possibleVersion}"))

override def dataSchema: StructType = SchemaResolver.getSchemaForVersion(version)

Expand All @@ -73,7 +74,7 @@ private[netflow] class NetflowRelation(
// return union of NetflowFileRDD which are designed to read only one file and store data in
// one partition
new UnionRDD[Row](sqlContext.sparkContext, inputFiles.map { status =>
val metadata = Seq(NetflowMetadata(status.getPath.toString, fields))
val metadata = Seq(NetflowMetadata(version, status.getPath.toString, fields))
new NetflowFileRDD(sqlContext.sparkContext, metadata, 1)
})
}
Expand Down
17 changes: 11 additions & 6 deletions src/main/scala/com/github/sadikovi/spark/netflow/interfaces.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.github.sadikovi.spark.netflow

import scala.util.Try

import org.apache.spark.Logging
import org.apache.spark.sql.types.{StructType, StructField, DataType, LongType, IntegerType, ShortType}

Expand All @@ -27,23 +29,26 @@ import com.github.sadikovi.netflow.version.NetflowV5
/** Schema resolver for Netflow versions. Also provides mapping for a particular column name. */
private[netflow] object SchemaResolver extends Logging {
// Netflow version 5
final val V5 = "5"
final val V5: Short = 5

/** Get specific mapper for Netflow version */
def getMapperForVersion(version: String): Mapper = version match {
def getMapperForVersion(version: Short): Mapper = version match {
case V5 => MapperV5
case other => throw new UnsupportedOperationException(
s"Netflow version ${other} is not supported")
}

/** Resolve schema for a specific version */
def getSchemaForVersion(version: String): StructType = {
def getSchemaForVersion(version: Short): StructType = {
getMapperForVersion(version).getFullSchema()
}

/** validate version of Netflow. If version is not supported throws exception */
def validateVersion(version: String): Unit = {
val mapper = getMapperForVersion(version)
/**
* Validate version of Netflow statically.
* Will check actual supported version when creating mapper.
*/
def validateVersion(possibleVersion: String): Option[Short] = {
Try(possibleVersion.toShort).toOption
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.github.sadikovi.netflow.{NetflowReader, NetflowHeader, RecordBuffer}
import com.github.sadikovi.netflow.version.NetflowV5

/** Netflow metadata includes path to the file and columns to fetch */
private[spark] case class NetflowMetadata(path: String, fields: Array[Long])
private[spark] case class NetflowMetadata(version: Short, path: String, fields: Array[Long])

/** NetflowFilePartition to hold sequence of file paths */
private[spark] class NetflowFilePartition[T<:NetflowMetadata: ClassTag] (
Expand Down Expand Up @@ -94,9 +94,15 @@ private[spark] class NetflowFileRDD[T<:SQLRow: ClassTag] (
// build Netflow reader and check whether it can be read
val nr = new NetflowReader(stm)
val hr = nr.readHeader()
// actual version of the file
val actualVersion = hr.getFlowVersion()
// compression flag is second bit in header flags
val isCompressed = (hr.getHeaderFlags() & 0x2) > 0

// currently we cannot resolve version and proceed with parsing, we require pre-set version.
require(actualVersion == elem.version,
s"Expected version ${elem.version}, got ${actualVersion}")

logInfo(s"""
> Netflow: {
> File: ${elem.path}
Expand All @@ -108,7 +114,7 @@ private[spark] class NetflowFileRDD[T<:SQLRow: ClassTag] (
> }
""".stripMargin('>'))

val recordBuffer = nr.readData(hr, elem.fields, RecordBuffer.BUFFER_LENGTH)
val recordBuffer = nr.readData(hr, elem.fields, RecordBuffer.BUFFER_LENGTH_1)
buffer = buffer ++ recordBuffer.iterator().asScala
}

Expand Down

0 comments on commit e32485e

Please sign in to comment.