Skip to content

Commit

Permalink
chore: simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
gclaramunt committed Nov 28, 2024
1 parent 30911a7 commit 1b9c7ed
Showing 1 changed file with 24 additions and 22 deletions.
46 changes: 24 additions & 22 deletions src/main/scala/org/constellation/lb/Manager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(
.resource

private lazy val hostsRef =
Ref.unsafe[IO, NonEmptyMap[Addr, Option[List[Info]]]](init.map(addr => addr -> None).toNem)
Ref.unsafe[IO, NonEmptyMap[Addr, Option[NonEmptyList[Info]]]](init.map(addr => addr -> None).toNem)

def node(addr: Addr)(implicit http: Client[IO]) = new RestNodeApi(addr, config.networkCredentials)

Expand Down Expand Up @@ -74,7 +74,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(
case (activeHosts, inactiveHosts) =>
val clusterHosts = (activeHosts ++ inactiveHosts) // TODO: Add Eviction of inactive hosts
.filterNot(addr => status.contains(addr))
.foldLeft(status)((acc, addr) => acc.add(addr, Option.empty[List[Info]]))
.foldLeft(status)((acc, addr) => acc.add(addr, None))

hostsRef.set(clusterHosts).flatTap(_ => updateLbSetup(activeHosts))
}
Expand Down Expand Up @@ -164,39 +164,37 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(

private val settingsRoutes = HttpRoutes.of[IO] {
case POST -> Root / "maintenance" =>
enableMaintenanceMode.flatMap(_ => Ok())
enableMaintenanceMode >> Ok()
case DELETE -> Root / "maintenance" =>
disableMaintenanceMode.flatMap(_ => Ok())
disableMaintenanceMode >> Ok()
}

def findNewHosts(clusterInfo: NonEmptyMap[Addr, Option[List[Info]]]): SortedSet[Addr] =
def findNewHosts(clusterInfo: NonEmptyMap[Addr, Option[NonEmptyList[Info]]]): SortedSet[Addr] =
SortedSet[Addr]() ++ clusterInfo.toNel.foldLeft(List.empty[Addr])(
(acc, bcc) =>
bcc match {
case (_, Some(hosts: List[Info])) => acc ++ hosts.map(n => Addr(n.ip, n.publicPort)).filterNot(clusterInfo(_).isDefined)
case _ => acc
case (_, Some(hosts)) => acc ++ hosts.map(n => Addr(n.ip, n.publicPort)).filterNot(clusterInfo(_).isDefined)
case _ => acc
}
)

def buildClusterStatus(init: NonEmptyMap[Addr, Option[List[Info]]]): IO[(Set[Addr], Set[Addr])] = {
def buildClusterStatus(init: NonEmptyMap[Addr, Option[NonEmptyList[Info]]]): IO[(Set[Addr], Set[Addr])] = {
val tresholdLevel = init.keys.size / 2

def isActive(addr: Addr, proof: List[Info]) =
init(addr).nonEmpty && proof.count(_.state == NodeState.Ready) > tresholdLevel

IO {
val (active, other) = init.toNel
.collect {
case (_, Some(el)) => el
val (active, other) = init.toSortedMap.values
.flatMap {
case Some(el) => el.toList
case _ => Nil
}
.flatten
.groupBy(n => Addr(n.ip, n.publicPort))
.toList
.partition {
case (addr: Addr, proof: List[Info]) => isActive(addr, proof)
case (addr, proof) => isActive(addr, proof.toList)
}

active.map(_._1).toSet -> other.map(_._1).toSet
(active.keySet , other.keySet)
}.flatTap {
case (active, other) =>
logger.info(s"As a result of status analysis we have ${active} hosts and ${other} not ready")
Expand All @@ -205,25 +203,29 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(

def getClusterInfo(
hosts: NonEmptySet[Addr]
)(implicit client: Client[IO]): IO[NonEmptyMap[Addr, Option[List[Info]]]] = {
)(implicit client: Client[IO]): IO[NonEmptyMap[Addr, Option[NonEmptyList[Info]]]] = {
IO.apply(logger.info(s"Fetch cluster status from following ${hosts.size} hosts: ${hosts.toList.take(5)}"))
.flatMap(
_ =>
hosts.toNonEmptyList
.map(
addr =>
node(addr).getInfo
.flatMap(
result =>
.flatMap {
case result@x :: xs =>
logger
.debug(s"Node $addr returned $result")
.map(_ => addr -> Option(result))
)
.map(_ => addr -> Some(NonEmptyList(x,xs)))
case Nil =>
logger
.warn(s"Node $addr returned empty node list")
.map(_ => addr -> None)
}
.recoverWith {
case error =>
logger
.info(s"Cannot retrieve cluster info from addr=$addr error=$error")
.map(_ => addr -> Option.empty[List[Info]])
.map(_ => addr -> None)
}
)
.parSequence
Expand Down

0 comments on commit 1b9c7ed

Please sign in to comment.