Skip to content

Commit

Permalink
Fetch status of all scenarios in DM in a single operation instead of …
Browse files Browse the repository at this point in the history
…separately for each scenario. (#7295)
  • Loading branch information
mgoworko authored Dec 19, 2024
1 parent 433ec63 commit 90e00a9
Show file tree
Hide file tree
Showing 25 changed files with 682 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentState
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.CustomActionDefinition
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
Expand Down Expand Up @@ -46,6 +47,8 @@ trait DeploymentManager extends AutoCloseable {

def deploymentSynchronisationSupport: DeploymentSynchronisationSupport

def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport

def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result]

final def getProcessState(
Expand All @@ -66,7 +69,7 @@ trait DeploymentManager extends AutoCloseable {
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
).map(state => statusDetailsWithFreshness.map(_ => state))
).map(statusDetailsWithFreshness.withValue)
} yield stateWithFreshness
}

Expand Down Expand Up @@ -109,6 +112,18 @@ trait ManagerSpecificScenarioActivitiesStoredByManager { self: DeploymentManager

}

sealed trait StateQueryForAllScenariosSupport

trait StateQueryForAllScenariosSupported extends StateQueryForAllScenariosSupport {

def getAllProcessesStates()(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]]

}

case object NoStateQueryForAllScenariosSupport extends StateQueryForAllScenariosSupport

sealed trait DeploymentSynchronisationSupport

trait DeploymentSynchronisationSupported extends DeploymentSynchronisationSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import scala.concurrent.duration._
class CachingProcessStateDeploymentManager(
delegate: DeploymentManager,
cacheTTL: FiniteDuration,
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport,
override val stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport,
) extends DeploymentManager {

private val cache: AsyncCache[ProcessName, List[StatusDetails]] = Caffeine
Expand Down Expand Up @@ -81,7 +82,12 @@ object CachingProcessStateDeploymentManager extends LazyLogging {
scenarioStateCacheTTL
.map { cacheTTL =>
logger.debug(s"Wrapping DeploymentManager: $delegate with caching mechanism with TTL: $cacheTTL")
new CachingProcessStateDeploymentManager(delegate, cacheTTL, delegate.deploymentSynchronisationSupport)
new CachingProcessStateDeploymentManager(
delegate,
cacheTTL,
delegate.deploymentSynchronisationSupport,
delegate.stateQueryForAllScenariosSupport
)
}
.getOrElse {
logger.debug(s"Skipping ProcessState caching for DeploymentManager: $delegate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport

override def close(): Unit = {}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pl.touk.nussknacker.engine.util

import pl.touk.nussknacker.engine.api.deployment.WithDataFreshnessStatus

object WithDataFreshnessStatusUtils {

implicit class WithDataFreshnessStatusMapOps[K, V](withDataFreshnessStatus: WithDataFreshnessStatus[Map[K, V]]) {

def get(k: K): Option[WithDataFreshnessStatus[V]] = withDataFreshnessStatus.map(_.get(k)) match {
case WithDataFreshnessStatus(Some(value), cached) => Some(WithDataFreshnessStatus(value, cached))
case WithDataFreshnessStatus(None, _) => None
}

def getOrElse(k: K, orElse: V): WithDataFreshnessStatus[V] = {
withDataFreshnessStatus.map(_.get(k)) match {
case WithDataFreshnessStatus(Some(value), cached) => WithDataFreshnessStatus(value, cached)
case WithDataFreshnessStatus(None, cached) => WithDataFreshnessStatus(orElse, cached)
}
}

}

implicit class WithDataFreshnessStatusOps[A, B](scenarioActivity: WithDataFreshnessStatus[A]) {

def withValue(v: B): WithDataFreshnessStatus[B] = {
scenarioActivity.map(_ => v)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.test.PatientScalaFutures
Expand All @@ -26,8 +26,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should ask delegate for a fresh state each time") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val results = List(
cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh),
Expand All @@ -41,8 +45,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should cache state for DataFreshnessPolicy.CanBeCached") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val firstInvocation = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.CanBeCached)
firstInvocation.cached shouldBe false
Expand All @@ -55,8 +63,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should reuse state updated by DataFreshnessPolicy.Fresh during reading with DataFreshnessPolicy.CanBeCached") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val resultForFresh = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh)
resultForFresh.cached shouldBe false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
import pl.touk.nussknacker.ui.security.api.LoggedUser

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.language.higherKinds

trait ProcessStateProvider {
Expand Down
Loading

0 comments on commit 90e00a9

Please sign in to comment.