Skip to content

Commit

Permalink
Merge pull request #10 from OKTW-Network/feature/origin-colored
Browse files Browse the repository at this point in the history
Make message from other galaxy displayed in light grey
Panic the server when bad init
  • Loading branch information
mmis1000 authored Aug 4, 2019
2 parents b678ea4 + 275b83c commit 0b3a658
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 72 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.3.0-M2'
implementation 'com.uchuhimo:konf:0.13.3'
compile 'com.rabbitmq:amqp-client:5.6.0'
compile 'com.rabbitmq:amqp-client:5.7.3'
implementation 'org.mongodb:bson:3.10.2'
}

Expand Down
150 changes: 95 additions & 55 deletions src/main/kotlin/one/oktw/galaxy/proxy/Main.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package one.oktw.galaxy.proxy

import com.google.inject.Inject
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.TopologyRecoveryException
import com.rabbitmq.client.impl.DefaultExceptionHandler
import com.uchuhimo.konf.Config
import com.velocitypowered.api.event.Subscribe
import com.velocitypowered.api.event.player.ServerPreConnectEvent
Expand All @@ -11,14 +15,12 @@ import com.velocitypowered.api.proxy.ProxyServer
import com.velocitypowered.api.proxy.server.RegisteredServer
import com.velocitypowered.api.proxy.server.ServerInfo
import io.fabric8.kubernetes.client.internal.readiness.Readiness
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import one.oktw.galaxy.proxy.event.ChatExchange
import kotlinx.coroutines.*
import one.oktw.galaxy.proxy.command.Lobby
import one.oktw.galaxy.proxy.config.CoreSpec
import one.oktw.galaxy.proxy.config.GalaxySpec
import one.oktw.galaxy.proxy.config.GalaxySpec.Storage.storageClass
import one.oktw.galaxy.proxy.event.ChatExchange
import one.oktw.galaxy.proxy.event.GalaxyPacket
import one.oktw.galaxy.proxy.event.PlayerListWatcher
import one.oktw.galaxy.proxy.event.TabListUpdater
Expand Down Expand Up @@ -66,70 +68,108 @@ class Main {
.from.toml.file("config/galaxy-proxy.toml")
.from.env()
}

lateinit var manager: Manager

@Inject
fun init(proxy: ProxyServer, logger: Logger) {
main = this
this.proxy = proxy
this.logger = logger

this.kubernetesClient = KubernetesClient()
this.redisClient = RedisClient()

val factory = ConnectionFactory()
factory.host = config[CoreSpec.rabbitMqHost]
factory.port = config[CoreSpec.rabbitMqPort]
factory.username = config[CoreSpec.rabbitMqUsername]
factory.password = config[CoreSpec.rabbitMqPassword]
val connection = factory.newConnection()
connection.addShutdownListener {
logger.error("conn killed", it)
}
val channel = connection.createChannel()
manager = Manager(channel, config[CoreSpec.rabbitMqExchange])
manager.subscribe(MESSAGE_TOPIC)
try {
main = this
this.proxy = proxy
this.logger = logger

this.kubernetesClient = KubernetesClient()
this.redisClient = RedisClient()

val factory = ConnectionFactory()
factory.host = config[CoreSpec.rabbitMqHost]
factory.port = config[CoreSpec.rabbitMqPort]
factory.username = config[CoreSpec.rabbitMqUsername]
factory.password = config[CoreSpec.rabbitMqPassword]
factory.isAutomaticRecoveryEnabled = true
factory.isTopologyRecoveryEnabled = true

factory.exceptionHandler =
object : DefaultExceptionHandler(),
CoroutineScope by CoroutineScope(Dispatchers.Default + SupervisorJob()) {
override fun handleTopologyRecoveryException(
conn: Connection?,
ch: Channel?,
exception: TopologyRecoveryException?
) {
logger.error("Error while recovery", exception)
}
}

val connection = factory.newConnection()
connection.addShutdownListener {
logger.error("conn killed", it)
}

runBlocking {
logger.info("Kubernetes Version: ${kubernetesClient.info().gitVersion}")
logger.info("Redis version: ${redisClient.version()}")
}
val channel = connection.createChannel()
channel.addShutdownListener {
logger.error("channel killed", it)
}

manager = Manager(channel, config[CoreSpec.rabbitMqExchange])
manager.subscribe(MESSAGE_TOPIC)

proxy.channelRegistrar.register(ChatExchange.eventId)
proxy.channelRegistrar.register(ChatExchange.eventIdResponse)
logger.info("Galaxy Init!")
runBlocking {
logger.info("Kubernetes Version: ${kubernetesClient.info().gitVersion}")
logger.info("Redis version: ${redisClient.version()}")
}

proxy.channelRegistrar.register(ChatExchange.eventId)
proxy.channelRegistrar.register(ChatExchange.eventIdResponse)
logger.info("Galaxy Init!")
} catch (err: Throwable) {
logger.error("Failed to init the proxy!", err)
exitProcess(1)
}
}

@Subscribe
fun onProxyInitialize(event: ProxyInitializeEvent) {
proxy.commandManager.unregister("server") // Disable server command
proxy.commandManager.register(Lobby(), "lobby")

proxy.channelRegistrar.register(GalaxyPacket.MESSAGE_CHANNEL_ID)

proxy.eventManager.register(this, PlayerListWatcher(config[CoreSpec.protocolVersion]))
proxy.eventManager.register(this, TabListUpdater())
proxy.eventManager.register(this, GalaxyPacket())

// Start lobby TODO auto scale lobby
GlobalScope.launch {
try {
lobby = kubernetesClient.getOrCreateGalaxyAndVolume("galaxy-lobby", config[storageClass], "10Gi")
.let { if (!Readiness.isReady(it)) kubernetesClient.waitReady(it) else it }
.let { proxy.registerServer(ServerInfo("galaxy-lobby", InetSocketAddress(it.status.podIP, 25565))) }
} catch (e: Exception) {
exitProcess(1)
try {
proxy.commandManager.unregister("server") // Disable server command
proxy.commandManager.register(Lobby(), "lobby")

proxy.channelRegistrar.register(GalaxyPacket.MESSAGE_CHANNEL_ID)

proxy.eventManager.register(this, PlayerListWatcher(config[CoreSpec.protocolVersion]))
proxy.eventManager.register(this, TabListUpdater())
proxy.eventManager.register(this, GalaxyPacket())

// Start lobby TODO auto scale lobby
GlobalScope.launch {
try {
lobby = kubernetesClient.getOrCreateGalaxyAndVolume("galaxy-lobby", config[storageClass], "10Gi")
.let { if (!Readiness.isReady(it)) kubernetesClient.waitReady(it) else it }
.let {
proxy.registerServer(
ServerInfo(
"galaxy-lobby",
InetSocketAddress(it.status.podIP, 25565)
)
)
}
} catch (e: Exception) {
exitProcess(1)
}
}
}

// Connect player to lobby
proxy.eventManager.register(this, ServerPreConnectEvent::class.java) {
if (it.player.currentServer.isPresent || !this::lobby.isInitialized) return@register // Ignore exist player
// Connect player to lobby
proxy.eventManager.register(this, ServerPreConnectEvent::class.java) {
if (it.player.currentServer.isPresent || !this::lobby.isInitialized) return@register // Ignore exist player

it.result = ServerPreConnectEvent.ServerResult.allowed(lobby)
}
it.result = ServerPreConnectEvent.ServerResult.allowed(lobby)
}

chatExchange = ChatExchange(MESSAGE_TOPIC)
proxy.eventManager.register(this, chatExchange)
chatExchange = ChatExchange(MESSAGE_TOPIC)
proxy.eventManager.register(this, chatExchange)
} catch (err: Throwable) {
logger.error("Failed to init the proxy!", err)
exitProcess(1)
}
}
}
25 changes: 24 additions & 1 deletion src/main/kotlin/one/oktw/galaxy/proxy/event/ChatExchange.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.velocitypowered.api.proxy.messages.MinecraftChannelIdentifier
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import net.kyori.text.TranslatableComponent
import net.kyori.text.format.TextColor
import net.kyori.text.serializer.gson.GsonComponentSerializer
import one.oktw.galaxy.proxy.Main.Companion.main
import one.oktw.galaxy.proxy.api.ProxyAPI
Expand Down Expand Up @@ -114,9 +116,30 @@ class ChatExchange(private val topic: String) {
val textComponent = GsonComponentSerializer.INSTANCE.deserialize(event.data.packet.message)

main.proxy.allPlayers.forEach { player ->
val playerSource = player.currentServer.orElse(null)?.let {
try {
UUID.fromString(it.serverInfo.name)
} catch (err: Throwable) {
null
}
} ?: ProxyAPI.dummyUUID

event.data.packet.targets.forEach { target ->
listenMap.computeIfAbsent(player.uniqueId) { listOf(player.uniqueId, ProxyAPI.globalChatChannel) }
.let { if (target in it) player.sendMessage(textComponent) }
.let {
if (target in it) {
if (event.data.server != playerSource && textComponent is TranslatableComponent) {
val newStyle = textComponent.style().color(TextColor.GRAY)
val newText =
TranslatableComponent.builder(textComponent.key()).args(textComponent.args())
.style(newStyle).append(textComponent.children()).build()

player.sendMessage(newText)
} else {
player.sendMessage(textComponent)
}
}
}
}
}
}
Expand Down
22 changes: 7 additions & 15 deletions src/main/kotlin/one/oktw/galaxy/proxy/pubsub/Manager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,27 @@ class Manager(private val channel: Channel, private val exchange: String) {
}
}

private val queues: HashMap<String, String> = HashMap()
private val tags: HashMap<String, String> = HashMap()
private val mqOpts: HashMap<String, Any> = HashMap<String, Any>().apply { this["x-message-ttl"] = 0 }
private val queueAndTag: HashMap<String, Pair<String, String>> = HashMap()
private val instanceId: UUID = UUID.randomUUID()

init {
}

fun subscribe(topic: String) {
if (queues[topic] != null) return
if (queueAndTag[topic] != null) return

channel.exchangeDeclare("$exchange-$topic", "fanout")
val queue = channel.queueDeclare().queue

channel.queueBind(queue, "$exchange-$topic", "")

queues[topic] = queue
tags[topic] = channel.basicConsume(queue, ConsumerWrapper(topic, this))
queueAndTag[topic] = queue to channel.basicConsume(queue, ConsumerWrapper(topic, this))
}

fun unsubscribe(topic: String) {
if (queues[topic] == null) return
if (tags[topic] == null) return
if (queueAndTag[topic] == null) return

channel.queueUnbind(queues[topic], "$exchange-$topic", "")
channel.basicCancel(tags[topic])
channel.queueUnbind(queueAndTag[topic]!!.first, "$exchange-$topic", "")
channel.basicCancel(queueAndTag[topic]!!.second)

queues.remove(topic)
tags.remove(topic)
queueAndTag.remove(topic)
}

fun handleDelivery(topic: String, body: ByteArray) {
Expand Down

0 comments on commit 0b3a658

Please sign in to comment.