Skip to content

Commit

Permalink
Merge pull request getnelson#49 from adelbertc/nelson-on-k8s
Browse files Browse the repository at this point in the history
Kubernetes integration for Nelson
  • Loading branch information
timperrett authored Jan 12, 2018
2 parents 8dbbc68 + b71e36a commit 9fa33d9
Show file tree
Hide file tree
Showing 19 changed files with 738 additions and 33 deletions.
1 change: 1 addition & 0 deletions core/src/main/resources/nelson/defaults.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ nelson {
#
# infrastructure {
# scheduler {
# scheduler = "nomad"
# nomad {
# endpoint = "http://nomad.service.texas.your.company.com:1234/"
# timeout = 1 second
Expand Down
92 changes: 81 additions & 11 deletions core/src/main/scala/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
//: ----------------------------------------------------------------------------
package nelson

import java.io.FileInputStream
import javax.net.ssl.{SSLContext, X509TrustManager}
import java.nio.file.{Path, Paths}
import java.security.SecureRandom
import java.security.cert.{CertificateFactory, X509Certificate}
import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, ThreadFactory}

import journal.Logger
Expand All @@ -32,6 +36,7 @@ import notifications.{SlackHttp,SlackOp,EmailOp,EmailServer}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scalaz.Scalaz._
import scalaz.concurrent.Strategy
import scalaz.~>
Expand Down Expand Up @@ -530,18 +535,70 @@ object Config {
)((a,b,c,d,e,g) => {
val splunk = readSplunk
val loggingSidecar = readLoggingImage
val uri = org.http4s.Uri.fromString(a).toOption.yolo(s"nomad.endpoint -- $a -- is an invalid Uri")
val uri = Uri.fromString(a).toOption.yolo(s"nomad.endpoint -- $a -- is an invalid Uri")
Infrastructure.Nomad(uri,b,c,d,e,loggingSidecar,g,splunk)
})
}

/*
* Datacenters currently only support one scheduler
*/
def readScheduler(kfg: KConfig, proxy: Option[Infrastructure.ProxyCredentials]): Option[SchedulerOp ~> Task] =
readNomadInfrastructure(kfg.subconfig("nomad"))
def readKubernetesInfrastructure(kfg: KConfig): Option[Infrastructure.Kubernetes] = {
(kfg.lookup[String]("endpoint") |@| kfg.lookup[Duration]("timeout")) { (endpoint, timeout) =>
val uri = Uri.fromString(endpoint).toOption.yolo(s"kubernetes.endpoint -- $endpoint -- is an invalid Uri")
Infrastructure.Kubernetes(uri, timeout)
}
}

def readNomadScheduler(kfg: KConfig): Option[SchedulerOp ~> Task] =
readNomadInfrastructure(kfg)
.map(n => new scheduler.NomadHttp(nomadcfg, n, http4sClient(n.timeout)))

// Create a X509TrustManager that trusts a whitelist of certificates, similar to `curl --cacert`
def cacert(certs: Array[X509Certificate]): X509TrustManager =
new X509TrustManager {
def getAcceptedIssuers(): Array[X509Certificate] = certs
def checkClientTrusted(certs: Array[X509Certificate], authType: String): Unit = ()
def checkServerTrusted(certs: Array[X509Certificate], authType: String): Unit = ()
}

def getKubernetesPodCert(): Option[SSLContext] = {
// Auto-mounted at this path for pods
// https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod
val certBundle = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"

val is = new FileInputStream(certBundle)
val x509Cert: Option[X509Certificate] = try {
val cf = CertificateFactory.getInstance("X.509")
cf.generateCertificate(is) match {
case c: X509Certificate => Some(c)
case _ => None
}
} catch {
case NonFatal(_) => None
} finally {
is.close()
}

x509Cert.map { cert =>
val trustManager = cacert(Array(cert))
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(null, Array(trustManager), new SecureRandom())
sslContext
}
}

def readKubernetesClient(kfg: KConfig): Option[KubernetesClient] =
(readKubernetesInfrastructure(kfg) |@| getKubernetesPodCert()) {
case (kubernetes, sslContext) =>
// Auto-mounted at this path for pods
// https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod
val path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
val serviceAccountToken = scala.io.Source.fromFile(path).getLines.toList.head
new KubernetesClient(
kubernetes.endpoint,
http4sClient(kubernetes.timeout, sslContext = Some(sslContext)),
serviceAccountToken
)
}

@SuppressWarnings(Array("org.brianmckenna.wartremover.warts.NoNeedForMonad"))
def readDatacenter(id: String, kfg: KConfig): Datacenter = {
val proxyCreds =
Expand Down Expand Up @@ -583,8 +640,19 @@ object Config {
InstrumentedVaultClient(endpoint, rawClient)
}).yolo("We really really need vault. Seriously vault must be configured")

val sched = readScheduler(kfg.subconfig("infrastructure.scheduler"), proxyCreds)
.yolo("At least one scheduler must be defined per datacenter")
val schedConfig = kfg.subconfig("infrastructure.scheduler")

val (sched, healthChecker) = (schedConfig.lookup[String]("scheduler") match {
case Some("nomad") =>
val s = readNomadScheduler(schedConfig.subconfig("nomad"))
val h = health.Http4sConsulHealthClient(consul)
s.map((_, h))
case Some("kubernetes") =>
readKubernetesClient(schedConfig.subconfig("kubernetes")).map { client =>
(new scheduler.KubernetesHttp(client), health.KubernetesHealthClient(client))
}
case _ => None
}).yolo("At least one scheduler must be defined per datacenter")

val interpreters = Infrastructure.Interpreters(
scheduler = sched,
Expand All @@ -594,7 +662,7 @@ object Config {
logger = logger,
docker = dockerClient,
control = WorkflowControlOp.trans,
health = health.Http4sConsulHealthClient(consul)
health = healthChecker
)

val trafficShift = readTrafficShift(kfg.subconfig("traffic-shift"))
Expand Down Expand Up @@ -775,9 +843,11 @@ object Config {
pkiPath = cfg.lookup[String]("pki-path")
)

private def http4sClient(timeout: Duration, maxTotalConnections: Int = 10): Client = {
private def http4sClient(timeout: Duration, maxTotalConnections: Int = 10, sslContext: Option[SSLContext] = None): Client = {
val config = BlazeClientConfig.defaultConfig.copy(
requestTimeout = timeout)
requestTimeout = timeout,
sslContext = sslContext
)
PooledHttp1Client(maxTotalConnections = maxTotalConnections, config = config)
}
}
34 changes: 20 additions & 14 deletions core/src/main/scala/Datacenter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,29 @@
//: ----------------------------------------------------------------------------
package nelson

import scalaz.{Order, ValidationNel, ~>}
import scalaz.std.string._
import scalaz.syntax.monoid._
import scalaz.std.set._
import scalaz.syntax.std.option._
import scalaz.syntax.foldable._
import scalaz.concurrent.Task
import java.net.URI
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

import com.amazonaws.regions.Region
import concurrent.duration._
import helm.ConsulOp
import health.HealthCheckOp
import storage.StoreOp
import scheduler.SchedulerOp
import Workflow.WorkflowOp
import docker.DockerOp
import helm.ConsulOp
import loadbalancers.LoadbalancerOp
import logging.LoggingOp
import org.http4s.Uri
import scalaz.concurrent.Task
import scalaz.std.set._
import scalaz.std.string._
import scalaz.syntax.foldable._
import scalaz.syntax.monoid._
import scalaz.syntax.std.option._
import scalaz.{Order, ValidationNel, ~>}
import scheduler.SchedulerOp
import storage.StoreOp
import vault.Vault
import loadbalancers.LoadbalancerOp
import com.amazonaws.regions.Region
import Workflow.WorkflowOp


object Infrastructure {
Expand All @@ -61,7 +62,7 @@ object Infrastructure {
)

final case class Nomad(
endpoint: org.http4s.Uri,
endpoint: Uri,
timeout: Duration,
dockerRepoUser: String,
dockerRepoPassword: String,
Expand All @@ -71,6 +72,11 @@ object Infrastructure {
splunk: Option[SplunkConfig]
)

final case class Kubernetes(
endpoint: Uri,
timeout: Duration
)

final case class SplunkConfig(
splunkUrl: String,
splunkToken: String
Expand Down
Loading

0 comments on commit 9fa33d9

Please sign in to comment.