Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: in progress draft of periodic deployment manager changes #7252

Draft
wants to merge 13 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -644,21 +644,16 @@ lazy val flinkPeriodicDeploymentManager = (project in flink("management/periodic
name := "nussknacker-flink-periodic-manager",
libraryDependencies ++= {
Seq(
"org.typelevel" %% "cats-core" % catsV % Provided,
"com.typesafe.slick" %% "slick" % slickV % Provided,
"com.typesafe.slick" %% "slick-hikaricp" % slickV % "provided, test",
"com.github.tminglei" %% "slick-pg" % slickPgV,
"org.hsqldb" % "hsqldb" % hsqldbV % Test,
"org.flywaydb" % "flyway-core" % flywayV % Provided,
"com.cronutils" % "cron-utils" % cronParserV,
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % Test,
"com.typesafe.slick" %% "slick-hikaricp" % slickV % "provided, test",
"org.hsqldb" % "hsqldb" % hsqldbV % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % Test,
)
}
)
.dependsOn(
commonPeriodicDeploymentManager,
flinkDeploymentManager,
deploymentManagerApi % Provided,
scenarioCompiler % Provided,
Expand Down Expand Up @@ -1790,6 +1785,25 @@ lazy val commonComponentsTests = (project in engine("common/components-tests"))
flinkComponentsTestkit % Test
)

lazy val commonPeriodicDeploymentManager = (project in engine("common/periodic-deployment-manager"))
.settings(commonSettings)
.settings(publishAssemblySettings: _*)
.settings(
name := "nussknacker-common-periodic-deployment-manager",
libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"org.typelevel" %% "cats-core" % catsV % Provided,
"com.cronutils" % "cron-utils" % cronParserV,
"com.softwaremill.retry" %% "retry" % retryV,
"com.github.tminglei" %% "slick-pg" % slickPgV,
)
}
)
.dependsOn(
deploymentManagerApi % Provided,
)

lazy val flinkBaseComponents = (project in flink("components/base"))
.settings(commonSettings)
.settings(assemblyNoScala("flinkBase.jar"): _*)
Expand Down Expand Up @@ -2069,7 +2083,7 @@ lazy val designer = (project in file("designer/server"))
liteEmbeddedDeploymentManager % Provided,
liteK8sDeploymentManager % Provided,
developmentTestsDeploymentManager % Provided,
flinkPeriodicDeploymentManager % Provided,
flinkPeriodicDeploymentManager % "provided, test->test",
schemedKafkaComponentsUtils % Provided,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package pl.touk.nussknacker.engine

import akka.actor.ActorSystem
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.deployment.periodic.{PeriodicProcessesManager, PeriodicProcessesManagerProvider}
import pl.touk.nussknacker.engine.api.deployment.{
ProcessingTypeActionService,
ProcessingTypeDeployedScenariosProvider,
ScenarioActivityManager
}
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import sttp.client3.SttpBackend

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -15,6 +16,7 @@ case class DeploymentManagerDependencies(
deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider,
actionService: ProcessingTypeActionService,
scenarioActivityManager: ScenarioActivityManager,
periodicProcessesManagerProvider: PeriodicProcessesManagerProvider,
executionContext: ExecutionContext,
actorSystem: ActorSystem,
sttpBackend: SttpBackend[Future, Any],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package pl.touk.nussknacker.engine.api.deployment.periodic

import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
import pl.touk.nussknacker.engine.api.deployment.periodic.PeriodicProcessesManager.ScheduleProperty
import pl.touk.nussknacker.engine.api.deployment.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus
import pl.touk.nussknacker.engine.api.deployment.periodic.model._
import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess

import java.time.LocalDateTime
import scala.concurrent.Future

trait PeriodicProcessesManager {

def create(
deploymentWithRuntimeParams: DeploymentWithRuntimeParams,
scheduleProperty: ScheduleProperty,
processActionId: ProcessActionId,
): Future[PeriodicProcess]

def markInactive(processId: PeriodicProcessId): Future[Unit]

def schedule(
id: PeriodicProcessId,
scheduleName: ScheduleName,
runAt: LocalDateTime,
deployMaxRetries: Int,
): Future[PeriodicProcessDeployment]

def findProcessData(id: PeriodicProcessDeploymentId): Future[PeriodicProcessDeployment]

def findToBeDeployed: Future[Seq[PeriodicProcessDeployment]]

def findToBeRetried: Future[Seq[PeriodicProcessDeployment]]

def markDeployed(id: PeriodicProcessDeploymentId): Future[Unit]

def markFinished(id: PeriodicProcessDeploymentId): Future[Unit]

def markFailed(id: PeriodicProcessDeploymentId): Future[Unit]

def markFailedOnDeployWithStatus(
id: PeriodicProcessDeploymentId,
status: PeriodicProcessDeploymentStatus,
deployRetries: Int,
retryAt: Option[LocalDateTime]
): Future[Unit]

def getSchedulesState(
scenarioName: ProcessName
): Future[SchedulesState]

def getLatestDeploymentsForActiveSchedules(
processName: ProcessName,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState]

def getLatestDeploymentsForLatestInactiveSchedules(
processName: ProcessName,
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState]

def findActiveSchedulesForProcessesHavingDeploymentWithMatchingStatus(
expectedDeploymentStatuses: Set[PeriodicProcessDeploymentStatus],
): Future[SchedulesState]

def fetchCanonicalProcess(
processName: ProcessName,
versionId: VersionId,
): Future[Option[CanonicalProcess]]

}

object PeriodicProcessesManager {

sealed trait ScheduleProperty

sealed trait SingleScheduleProperty extends ScheduleProperty

case class MultipleScheduleProperty(schedules: Map[String, SingleScheduleProperty]) extends ScheduleProperty

case class CronScheduleProperty(labelOrCronExpr: String) extends SingleScheduleProperty

}

object NoOpPeriodicProcessesManager extends PeriodicProcessesManager {

override def create(
deploymentWithRuntimeParams: DeploymentWithRuntimeParams,
scheduleProperty: ScheduleProperty,
processActionId: ProcessActionId,
): Future[PeriodicProcess] = notImplemented

override def markInactive(processId: PeriodicProcessId): Future[Unit] = notImplemented

override def schedule(
id: PeriodicProcessId,
scheduleName: ScheduleName,
runAt: LocalDateTime,
deployMaxRetries: Int
): Future[PeriodicProcessDeployment] = notImplemented

override def findProcessData(
id: PeriodicProcessDeploymentId,
): Future[PeriodicProcessDeployment] = notImplemented

override def findToBeDeployed: Future[Seq[PeriodicProcessDeployment]] = notImplemented

override def findToBeRetried: Future[Seq[PeriodicProcessDeployment]] = notImplemented

override def markDeployed(id: PeriodicProcessDeploymentId): Future[Unit] = notImplemented

override def markFinished(id: PeriodicProcessDeploymentId): Future[Unit] = notImplemented

override def markFailed(id: PeriodicProcessDeploymentId): Future[Unit] = notImplemented

override def markFailedOnDeployWithStatus(
id: PeriodicProcessDeploymentId,
status: PeriodicProcessDeploymentStatus,
deployRetries: Int,
retryAt: Option[LocalDateTime]
): Future[Unit] = notImplemented

override def getSchedulesState(scenarioName: ProcessName): Future[SchedulesState] = notImplemented

override def getLatestDeploymentsForActiveSchedules(
processName: ProcessName,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState] = notImplemented

override def getLatestDeploymentsForLatestInactiveSchedules(
processName: ProcessName,
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState] = notImplemented

override def findActiveSchedulesForProcessesHavingDeploymentWithMatchingStatus(
expectedDeploymentStatuses: Set[PeriodicProcessDeploymentStatus],
): Future[SchedulesState] = notImplemented

override def fetchCanonicalProcess(
processName: ProcessName,
versionId: VersionId
): Future[Option[CanonicalProcess]] = notImplemented

private def notImplemented: Future[Nothing] =
Future.failed(new NotImplementedError())

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.touk.nussknacker.engine.api.deployment.periodic

trait PeriodicProcessesManagerProvider {

def provide(
deploymentManagerName: String,
processingType: String,
): PeriodicProcessesManager

}

object NoOpPeriodicProcessesManagerProvider extends PeriodicProcessesManagerProvider {

override def provide(
deploymentManagerName: String,
processingType: String
): PeriodicProcessesManager = NoOpPeriodicProcessesManager

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.ProcessVersion

case class DeploymentWithRuntimeParams(
processVersion: ProcessVersion,
inputConfigDuringExecutionJson: String,
runtimeParams: RuntimeParams,
)

final case class RuntimeParams(params: Map[String, String])
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package pl.touk.nussknacker.engine.management.periodic.model
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
import pl.touk.nussknacker.engine.management.periodic.ScheduleProperty
import slick.lifted.MappedTo
import pl.touk.nussknacker.engine.api.deployment.periodic.PeriodicProcessesManager.ScheduleProperty

import java.time.LocalDateTime

case class PeriodicProcessId(value: Long) extends MappedTo[Long]
case class PeriodicProcessId(value: Long)

case class PeriodicProcess[ProcessRep](
case class PeriodicProcess(
id: PeriodicProcessId,
deploymentData: DeploymentWithJarData[ProcessRep],
deploymentData: DeploymentWithRuntimeParams,
scheduleProperty: ScheduleProperty,
active: Boolean,
createdAt: LocalDateTime,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.deployment.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus

import java.time.LocalDateTime

// TODO: We should separate schedules concept from deployments - fully switch to ScheduleData and ScheduleDeploymentData
case class PeriodicProcessDeployment(
id: PeriodicProcessDeploymentId,
periodicProcess: PeriodicProcess,
createdAt: LocalDateTime,
runAt: LocalDateTime,
scheduleName: ScheduleName,
retriesLeft: Int,
nextRetryAt: Option[LocalDateTime],
state: PeriodicProcessDeploymentState
) {

def display: String =
s"${periodicProcess.processVersion} with scheduleName=${scheduleName.display} and deploymentId=$id"

}

case class PeriodicProcessDeploymentState(
deployedAt: Option[LocalDateTime],
completedAt: Option[LocalDateTime],
status: PeriodicProcessDeploymentStatus
)

case class PeriodicProcessDeploymentId(value: Long) {
override def toString: String = value.toString
}

object PeriodicProcessDeploymentStatus extends Enumeration {
type PeriodicProcessDeploymentStatus = Value

val Scheduled, Deployed, Finished, Failed, RetryingDeploy, FailedOnDeploy = Value
}

case class ScheduleName(value: Option[String]) {
def display: String = value.getOrElse("[default]")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.management.periodic.model
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.management.periodic.db.{PeriodicProcessDeploymentEntity, PeriodicProcessesRepository}
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap

import java.time.LocalDateTime
Expand Down Expand Up @@ -35,7 +34,7 @@ case class SchedulesState(schedules: Map[ScheduleId, ScheduleData]) {
// For most operations it will contain only one latest deployment but for purpose of statuses of historical deployments
// it has list instead of one element.
// This structure should contain SingleScheduleProperty as well. See note above
case class ScheduleData(process: PeriodicProcess[Unit], latestDeployments: List[ScheduleDeploymentData])
case class ScheduleData(process: PeriodicProcess, latestDeployments: List[ScheduleDeploymentData])

// To identify schedule we need scheduleName - None for SingleScheduleProperty and Some(key) for MultipleScheduleProperty keys
// Also we need PeriodicProcessId to distinguish between active schedules and some inactive from the past for the same PeriodicProcessId
Expand All @@ -53,41 +52,17 @@ case class ScheduleDeploymentData(
) {

def toFullDeploymentData(
process: PeriodicProcess[Unit],
process: PeriodicProcess,
scheduleName: ScheduleName
): PeriodicProcessDeployment[Unit] =
): PeriodicProcessDeployment =
PeriodicProcessDeployment(id, process, createdAt, runAt, scheduleName, retriesLeft, nextRetryAt, state)

def display = s"deploymentId=$id"

}

object ScheduleDeploymentData {

def apply(deployment: PeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
deployment.createdAt,
deployment.runAt,
deployment.deployedAt,
deployment.retriesLeft,
deployment.nextRetryAt,
PeriodicProcessesRepository.createPeriodicDeploymentState(deployment)
)
}

}

// These below are temporary structures, see notice next to SchedulesState
case class PeriodicProcessScheduleData(
process: PeriodicProcess[Unit],
deployments: List[PeriodicProcessDeployment[Unit]]
) {
def existsDeployment(predicate: PeriodicProcessDeployment[Unit] => Boolean): Boolean = deployments.exists(predicate)

def display: String = {
val deploymentsForSchedules = deployments.map(_.display)
s"processName=${process.processVersion.processName}, deploymentsForSchedules=$deploymentsForSchedules"
}

}
process: PeriodicProcess,
deployments: List[PeriodicProcessDeployment]
)
Loading