diff --git a/build.sbt b/build.sbt index 341acab20..71e42f6cc 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ name := "codepropertygraph" // parsed by project/Versions.scala, updated by updateDependencies.sh -val flatgraphVersion = "0.1.2" +val flatgraphVersion = "0.1.3" inThisBuild( List( @@ -83,4 +83,4 @@ credentials += "maven.pkg.github.com", "Privado-Inc", sys.env.getOrElse("GITHUB_TOKEN", "N/A") - ) \ No newline at end of file + ) diff --git a/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala b/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala index 0c038975f..75705c5b3 100644 --- a/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala +++ b/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala @@ -51,23 +51,9 @@ abstract class CpgPass(cpg: Cpg, outName: String = "") extends ForkJoinParallelC * methods. This may be better than using the constructor or GC, because e.g. SCPG chains of passes construct * passes eagerly, and releases them only when the entire chain has run. * */ -abstract class ForkJoinParallelCpgPass[T <: AnyRef](cpg: Cpg, @nowarn outName: String = "") extends CpgPassBase { - type DiffGraphBuilder = io.shiftleft.codepropertygraph.generated.DiffGraphBuilder - // generate Array of parts that can be processed in parallel - def generateParts(): Array[? <: AnyRef] - // setup large data structures, acquire external resources - def init(): Unit = {} - // release large data structures and external resources - def finish(): Unit = {} - // main function: add desired changes to builder - def runOnPart(builder: DiffGraphBuilder, part: T): Unit - // Override this to disable parallelism of passes. Useful for debugging. - def isParallel: Boolean = true - abstract class ForkJoinParallelCpgPassWithTimeout[T <: AnyRef]( cpg: Cpg, @nowarn outName: String = "", - keyPool: Option[KeyPool] = None, timeout: Long = -1 ) extends NewStyleCpgPassBaseWithTimeout[T](timeout) { @@ -111,6 +97,11 @@ abstract class ForkJoinParallelCpgPassWithTimeout[T <: AnyRef]( } } + @deprecated("Please use createAndApply") + override def createApplySerializeAndStore(serializedCpg: SerializedCpg, prefix: String = ""): Unit = { + createAndApply() + } + } abstract class ForkJoinParallelCpgPass[T <: AnyRef](cpg: Cpg, @nowarn outName: String = "") extends CpgPassBase { @@ -213,7 +204,7 @@ abstract class ForkJoinParallelCpgPass[T <: AnyRef](cpg: Cpg, @nowarn outName: S } abstract class NewStyleCpgPassBaseWithTimeout[T <: AnyRef](timeout: Long) extends CpgPassBase { - type DiffGraphBuilder = overflowdb.BatchedUpdate.DiffGraphBuilder + type DiffGraphBuilder = io.shiftleft.codepropertygraph.generated.DiffGraphBuilder // generate Array of parts that can be processed in parallel def generateParts(): Array[? <: AnyRef] @@ -232,7 +223,7 @@ abstract class NewStyleCpgPassBaseWithTimeout[T <: AnyRef](timeout: Long) extend override def createAndApply(): Unit = createApplySerializeAndStore(null) - override def runWithBuilder(externalBuilder: BatchedUpdate.DiffGraphBuilder): Int = { + override def runWithBuilder(externalBuilder: DiffGraphBuilder): Int = { try { init() val parts = generateParts() diff --git a/codepropertygraph/src/main/scala/io/shiftleft/passes/ParallelCpgPass.scala b/codepropertygraph/src/main/scala/io/shiftleft/passes/ParallelCpgPass.scala deleted file mode 100644 index 22850945f..000000000 --- a/codepropertygraph/src/main/scala/io/shiftleft/passes/ParallelCpgPass.scala +++ /dev/null @@ -1,164 +0,0 @@ -package io.shiftleft.passes -import io.shiftleft.SerializedCpg -import io.shiftleft.codepropertygraph.generated.Cpg -import io.shiftleft.utils.{ExecutionContextProvider, StatsLogger} -import org.slf4j.MDC - -import java.util.concurrent.LinkedBlockingQueue -import scala.annotation.nowarn -import scala.collection.mutable -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} - -/* ConcurrentWriterCpgPass is a possible replacement for ParallelCpgPass and NewStylePass. - * - * Instead of returning an Iterator, generateParts() returns an Array. This means that the entire collection - * of parts must live on the heap at the same time; on the other hand, there are no possible issues with iterator invalidation, - * e.g. when running over all METHOD nodes and deleting some of them. - * - * Changes are applied sequentially, in the same order as the output of `runOnParts`, as opposed to `ParallelCpgPass`, - * where the ordering of change application is non-deterministic. For this reason, ConcurrentWriterCpgPass only accepts a single KeyPool. - * - * However, as opposed to NewStylePass, changes are not buffered and applied in one go; instead, they are applied as the respective - * `runOnPart` finishes, concurrently with other `runOnPart` invocations. - * - * Compared to NewStylePass, this avoids excessive peak memory consumption. On the other hand, `runOnPart` sees the CPG - * in an intermediate state: No promises are made about which previous changes are already applied; and changes are - * applied concurrently, such that all reads from the graph are potential race conditions. Furthermore, this variant has - * higher constant overhead per part than NewStylePass, i.e. is better suited to passes that create few large diffs. - * - * - * Initialization and cleanup of external resources or large datastructures can be done in the `init()` and `finish()` - * methods. This may be better than using the constructor or GC, because e.g. SCPG chains of passes construct - * passes eagerly, and releases them only when the entire chain has run. - * */ -object ConcurrentWriterCpgPass { - private val writerQueueCapacity = 4 - private val producerQueueCapacity = 2 + 4 * Runtime.getRuntime().availableProcessors() -} -abstract class ConcurrentWriterCpgPass[T <: AnyRef]( - cpg: Cpg, - @nowarn outName: String = "", - keyPool: Option[KeyPool] = None -) extends NewStyleCpgPassBase[T] { - - @volatile var nDiffT = -1 - - /** WARNING: runOnPart is executed in parallel to committing of graph modifications. The upshot is that it is unsafe - * to read ANY data from cpg, on pain of bad race conditions - * - * Only use ConcurrentWriterCpgPass if you are _very_ sure that you avoid races. - * - * E.g. adding a CFG edge to node X races with reading an AST edge of node X. - */ - override def createApplySerializeAndStore(serializedCpg: SerializedCpg, prefix: String = ""): Unit = { - import ConcurrentWriterCpgPass.producerQueueCapacity - baseLogger.info(s"Start of enhancement: $name") - StatsLogger.initiateNewStage(getClass.getSimpleName, Some(name), getClass.getSuperclass.getSimpleName) - val nanosStart = System.nanoTime() - var nParts = 0 - var nDiff = 0 - nDiffT = -1 - init() - val parts = generateParts() - nParts = parts.size - val partIter = parts.iterator - val completionQueue = mutable.ArrayDeque[Future[overflowdb.BatchedUpdate.DiffGraph]]() - val writer = new Writer(MDC.getCopyOfContextMap()) - val writerThread = new Thread(writer) - writerThread.setName("Writer") - writerThread.start() - implicit val ec: ExecutionContext = ExecutionContextProvider.getExecutionContext - try { - try { - // The idea is that we have a ringbuffer completionQueue that contains the workunits that are currently in-flight. - // We add futures to the end of the ringbuffer, and take futures from the front. - // then we await the future from the front, and add it to the writer-queue. - // the end result is that we get deterministic output (esp. deterministic order of changes), while having up to one - // writer-thread and up to producerQueueCapacity many threads in-flight. - // as opposed to ParallelCpgPass, there is no race between diffgraph-generators to enqueue into the writer -- everything - // is nice and ordered. Downside is that a very slow part may gum up the works (i.e. the completionQueue fills up and threads go idle) - var done = false - while (!done && writer.raisedException == null) { - if (writer.raisedException != null) - throw writer.raisedException // will be wrapped with good stacktrace in the finally block - - if (completionQueue.size < producerQueueCapacity && partIter.hasNext) { - val next = partIter.next() - // todo: Verify that we get FIFO scheduling; otherwise, do something about it. - // if this e.g. used LIFO with 4 cores and 18 size of ringbuffer, then 3 cores may idle while we block on the front item. - completionQueue.append(Future.apply { - val builder = Cpg.newDiffGraphBuilder - runOnPart(builder, next.asInstanceOf[T]) - builder.build() - }) - } else if (completionQueue.nonEmpty) { - val future = completionQueue.removeHead() - val res = Await.result(future, Duration.Inf) - nDiff += res.size - writer.queue.put(Some(res)) - } else { - done = true - } - } - } finally { - try { - // if the writer died on us, then the queue might be full and we could deadlock - if (writer.raisedException == null) writer.queue.put(None) - writerThread.join() - // we need to reraise exceptions - if (writer.raisedException != null) - throw new RuntimeException("Failure in diffgraph application", writer.raisedException) - - } finally { finish() } - } - } finally { - // the nested finally is somewhat ugly -- but we promised to clean up with finish(), we want to include finish() - // in the reported timings, and we must have our final log message if finish() throws - - val nanosStop = System.nanoTime() - - baseLogger.info( - f"Enhancement $name completed in ${(nanosStop - nanosStart) * 1e-6}%.0f ms. ${nDiff}%d + ${nDiffT - nDiff}%d changes committed from ${nParts}%d parts." - ) - StatsLogger.endLastStage() - } - } - - private class Writer(mdc: java.util.Map[String, String]) extends Runnable { - - val queue = - new LinkedBlockingQueue[Option[overflowdb.BatchedUpdate.DiffGraph]](ConcurrentWriterCpgPass.writerQueueCapacity) - - @volatile var raisedException: Exception = null - - override def run(): Unit = { - try { - nDiffT = 0 - // logback chokes on null context maps - if (mdc != null) MDC.setContextMap(mdc) - var terminate = false - var index: Int = 0 - while (!terminate) { - queue.take() match { - case None => - baseLogger.debug("Shutting down WriterThread") - terminate = true - case Some(diffGraph) => - nDiffT += overflowdb.BatchedUpdate - .applyDiff(cpg.graph, diffGraph, keyPool.getOrElse(null), null) - .transitiveModifications() - index += 1 - } - } - } catch { - case exception: InterruptedException => baseLogger.warn("Interrupted WriterThread", exception) - case exc: Exception => - raisedException = exc - queue.clear() - throw exc - } - } - } - -} diff --git a/codepropertygraph/src/test/scala/io/shiftleft/passes/ParallelCpgPassNewTests.scala b/codepropertygraph/src/test/scala/io/shiftleft/passes/ParallelCpgPassNewTests.scala index 3f7a9829f..7cc459ea0 100644 --- a/codepropertygraph/src/test/scala/io/shiftleft/passes/ParallelCpgPassNewTests.scala +++ b/codepropertygraph/src/test/scala/io/shiftleft/passes/ParallelCpgPassNewTests.scala @@ -4,115 +4,20 @@ import better.files.File import io.shiftleft.SerializedCpg import io.shiftleft.codepropertygraph.generated.Cpg import io.shiftleft.codepropertygraph.generated.Properties +import io.shiftleft.codepropertygraph.generated.language.* import io.shiftleft.codepropertygraph.generated.nodes.{NewCall, NewFile} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import overflowdb.traversal._ import java.nio.file.Files import scala.jdk.CollectionConverters._ -class ParallelCpgPassNewTests extends AnyWordSpec with Matchers { - - private object Fixture { - def apply(keyPools: Option[Iterator[KeyPool]] = None)(f: (Cpg, CpgPassBase) => Unit): Unit = { - val cpg = Cpg.empty - val pool = keyPools.flatMap(_.nextOption()) - class MyPass(cpg: Cpg) extends ConcurrentWriterCpgPass[String](cpg, "MyPass", pool) { - override def generateParts(): Array[String] = Array("foo", "bar") - - override def runOnPart(diffGraph: DiffGraphBuilder, part: String): Unit = { - diffGraph.addNode(NewFile().name(part)) - } - } - val pass = new MyPass(cpg) - f(cpg, pass) - } - } - - "ConcurrentWriterCpgPass" should { - "allow creating and applying result of pass" in Fixture() { (cpg, pass) => - pass.createAndApply() - cpg.graph.nodes.map(_.property(Properties.Name)).toSetMutable shouldBe Set("foo", "bar") - } - - "produce a serialized CPG file" in Fixture() { (_, pass) => - File.usingTemporaryFile("pass", ".zip") { file => - file.delete() - val filename = file.path.toString - val serializedCpg = new SerializedCpg(filename) - pass.createApplySerializeAndStore(serializedCpg) - serializedCpg.close() - file.exists shouldBe true - Files.size(file.path) should not be 0 - } - } - - val keyPools = Iterator(new IntervalKeyPool(10, 20), new IntervalKeyPool(30, 40)) - - "use only the first KeyPool for createAndApply" in Fixture(Some(keyPools)) { (cpg, pass) => - pass.createAndApply() - cpg.graph.V.asScala.map(_.id()).toSet shouldBe Set(10, 11) - } - - "fail for schema violations" in { - val cpg = Cpg.empty - val pass = new ConcurrentWriterCpgPass[String](cpg, "pass2") { - override def generateParts() = Array("a", "b") - override def runOnPart(diffGraph: DiffGraphBuilder, part: String): Unit = - part match { - case "a" => - // this is fine - diffGraph.addNode(NewFile().name(part)) - case "b" => - // schema violation - val file1 = NewFile().name("foo") - val file2 = NewFile().name("bar") - diffGraph - .addNode(file1) - .addNode(file2) - .addEdge(file1, file2, "illegal_edge_label") - - } - } - - // the above DiffGraph (part "b") is not schema conform, applying it must throw an exception - intercept[Exception] { - pass.createAndApply() - } - } - - "add NewNodes that are referenced in different parts only once" in { - val cpg = Cpg.empty - val pass = new ConcurrentWriterCpgPass[String](cpg, "pass2") { - val call1 = NewCall().name("call1") - val call2 = NewCall().name("call2") - val call3 = NewCall().name("call3") - - override def generateParts() = Array("a", "b") - override def runOnPart(diffGraph: DiffGraphBuilder, part: String): Unit = - part match { - case "a" => - diffGraph.addEdge(call1, call2, "AST") - case "b" => - diffGraph.addEdge(call2, call3, "AST") - } - } - pass.createAndApply() - cpg.graph.nodeCount() shouldBe 3 - } - } - -} - class ForkJoinParallelCpgPassNewTests extends AnyWordSpec with Matchers { private object Fixture { - def apply(keyPools: Option[Iterator[KeyPool]] = None, timeout: Long = -1)(f: (Cpg, CpgPassBase) => Unit): Unit = { - val cpg = Cpg.empty - val pool = keyPools.flatMap(_.nextOption()) - class MyPass(cpg: Cpg) - extends ForkJoinParallelCpgPassWithTimeout[String](cpg, "MyPass", pool, timeout = timeout) { + def apply(timeout: Long = -1)(f: (Cpg, CpgPassBase) => Unit): Unit = { + val cpg = Cpg.empty + class MyPass(cpg: Cpg) extends ForkJoinParallelCpgPassWithTimeout[String](cpg, "MyPass", timeout = timeout) { override def generateParts(): Array[String] = Range(1, 101).map(_.toString).toArray override def runOnPart(diffGraph: DiffGraphBuilder, part: String): Unit = { @@ -128,12 +33,12 @@ class ForkJoinParallelCpgPassNewTests extends AnyWordSpec with Matchers { "ForkJoinParallelPassWithTimeout" should { "generate partial result in case of timeout" in Fixture(timeout = 2) { (cpg, pass) => pass.createAndApply() - assert(cpg.graph.nodes.map(_.property(Properties.Name)).toList.size != 100) + assert(cpg.all.map(_.property(Properties.Name)).toList.size != 100) } "generate complete result without timeout" in Fixture() { (cpg, pass) => pass.createAndApply() - assert(cpg.graph.nodes.map(_.property(Properties.Name)).toList.size == 100) + assert(cpg.all.map(_.property(Properties.Name)).toList.size == 100) } } diff --git a/domainClasses/src/main/generated/io/shiftleft/codepropertygraph/generated/GraphSchema.scala b/domainClasses/src/main/generated/io/shiftleft/codepropertygraph/generated/GraphSchema.scala index ae3b63952..98dc997a4 100644 --- a/domainClasses/src/main/generated/io/shiftleft/codepropertygraph/generated/GraphSchema.scala +++ b/domainClasses/src/main/generated/io/shiftleft/codepropertygraph/generated/GraphSchema.scala @@ -50,7 +50,7 @@ object GraphSchema extends flatgraph.Schema { "UNKNOWN" ) val nodeKindByLabel = nodeLabels.zipWithIndex.toMap - val edgeLabels: Array[String] = Array( + val edgeLabels = Array( "ALIAS_OF", "ARGUMENT", "AST", @@ -236,7 +236,7 @@ object GraphSchema extends flatgraph.Schema { size => new Array[flatgraph.GNode](size), size => new Array[flatgraph.GNode](size) ) - val normalNodePropertyNames: Array[String] = Array( + val normalNodePropertyNames = Array( "ALIAS_TYPE_FULL_NAME", "ARGUMENT_INDEX", "ARGUMENT_NAME",