Skip to content

Commit

Permalink
Merge pull request #11 from OKTW-Network/fuck-rabbitmq
Browse files Browse the repository at this point in the history
Replace rabbitmq with redis
  • Loading branch information
mmis1000 authored Aug 14, 2019
2 parents 0b3a658 + 9215d2e commit 3c855f8
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 102 deletions.
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.3.0-M2'
implementation 'com.uchuhimo:konf:0.13.3'
compile 'com.rabbitmq:amqp-client:5.7.3'
implementation 'org.mongodb:bson:3.10.2'
}

Expand Down Expand Up @@ -73,7 +72,6 @@ shadowJar {
include dependency("io.projectreactor:")
include dependency("io.netty:")
include dependency("org.reactivestreams:")
include dependency("com.rabbitmq:")
}

// relocate "kotlin", "one.oktw.relocate.kotlin"
Expand Down
37 changes: 1 addition & 36 deletions src/main/kotlin/one/oktw/galaxy/proxy/Main.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package one.oktw.galaxy.proxy

import com.google.inject.Inject
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.TopologyRecoveryException
import com.rabbitmq.client.impl.DefaultExceptionHandler
import com.uchuhimo.konf.Config
import com.velocitypowered.api.event.Subscribe
import com.velocitypowered.api.event.player.ServerPreConnectEvent
Expand Down Expand Up @@ -81,37 +76,7 @@ class Main {
this.kubernetesClient = KubernetesClient()
this.redisClient = RedisClient()

val factory = ConnectionFactory()
factory.host = config[CoreSpec.rabbitMqHost]
factory.port = config[CoreSpec.rabbitMqPort]
factory.username = config[CoreSpec.rabbitMqUsername]
factory.password = config[CoreSpec.rabbitMqPassword]
factory.isAutomaticRecoveryEnabled = true
factory.isTopologyRecoveryEnabled = true

factory.exceptionHandler =
object : DefaultExceptionHandler(),
CoroutineScope by CoroutineScope(Dispatchers.Default + SupervisorJob()) {
override fun handleTopologyRecoveryException(
conn: Connection?,
ch: Channel?,
exception: TopologyRecoveryException?
) {
logger.error("Error while recovery", exception)
}
}

val connection = factory.newConnection()
connection.addShutdownListener {
logger.error("conn killed", it)
}

val channel = connection.createChannel()
channel.addShutdownListener {
logger.error("channel killed", it)
}

manager = Manager(channel, config[CoreSpec.rabbitMqExchange])
manager = Manager(config[CoreSpec.redisPubSubPrefix])
manager.subscribe(MESSAGE_TOPIC)

runBlocking {
Expand Down
6 changes: 1 addition & 5 deletions src/main/kotlin/one/oktw/galaxy/proxy/config/CoreSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,5 @@ object CoreSpec : ConfigSpec() {
val protocolVersion by required<Int>(description = "Minecraft protocol version")
val redis by required<String>(description = "Redis connection URI")
val kubernetes by optional<String?>(default = null, description = "Kubernetes API address")
val rabbitMqHost by required<String>(description = "rabbitMq URI")
val rabbitMqPort by required<Int>(description = "rabbitMq Port")
val rabbitMqUsername by required<String>(description = "rabbitMq username")
val rabbitMqPassword by required<String>(description = "rabbitMq password")
val rabbitMqExchange by required<String>(description = "rabbitMq exchange")
val redisPubSubPrefix by required<String>(description = "prefix of pubsub")
}
87 changes: 35 additions & 52 deletions src/main/kotlin/one/oktw/galaxy/proxy/pubsub/Manager.kt
Original file line number Diff line number Diff line change
@@ -1,79 +1,64 @@
package one.oktw.galaxy.proxy.pubsub

import com.rabbitmq.client.*
import io.lettuce.core.RedisClient
import io.lettuce.core.codec.ByteArrayCodec
import io.lettuce.core.pubsub.RedisPubSubAdapter
import one.oktw.galaxy.proxy.Main.Companion.main
import one.oktw.galaxy.proxy.api.ProxyAPI
import one.oktw.galaxy.proxy.api.packet.Packet
import one.oktw.galaxy.proxy.config.CoreSpec
import one.oktw.galaxy.proxy.event.MessageDeliveryEvent
import one.oktw.galaxy.proxy.pubsub.data.MessageWrapper
import java.util.*
import kotlin.collections.HashMap
import java.util.concurrent.ConcurrentHashMap

class Manager(private val channel: Channel, private val exchange: String) {
companion object {
class ConsumerWrapper(private val topic: String, private val manager: Manager) : Consumer {
override fun handleRecoverOk(consumerTag: String?) {
// TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}

override fun handleConsumeOk(consumerTag: String?) {
// TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}

override fun handleShutdownSignal(consumerTag: String?, sig: ShutdownSignalException?) {
// TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}

override fun handleCancel(consumerTag: String?) {
// TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
class Manager(prefix: String) {
private val client = RedisClient.create(main.config[CoreSpec.redis])
private val subscribeConnection = client.connectPubSub(ByteArrayCodec())
private val publishConnection = client.connect(ByteArrayCodec())
private val queries: ConcurrentHashMap<String, Boolean> = ConcurrentHashMap()
private val instanceId: UUID = UUID.randomUUID()

override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) {
manager.channel.basicAck(envelope.deliveryTag, false)
manager.handleDelivery(topic, body)
init {
subscribeConnection.addListener(object : RedisPubSubAdapter<ByteArray, ByteArray>() {
override fun message(channel: ByteArray, message: ByteArray) {
handleDelivery(channel.toTopic() ?: return, message)
}
})
}

override fun handleCancelOk(consumerTag: String?) {
// TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
private val channelPrefix = "$prefix-chat-"
private fun ByteArray.toTopic(): String? {
return String(this).let {
if (it.startsWith(channelPrefix)) {
it.drop(channelPrefix.length)
} else {
null
}
}
}

private val queueAndTag: HashMap<String, Pair<String, String>> = HashMap()
private val instanceId: UUID = UUID.randomUUID()
private fun String.toChannel() = "$channelPrefix$this".toByteArray()

fun subscribe(topic: String) {
if (queueAndTag[topic] != null) return

channel.exchangeDeclare("$exchange-$topic", "fanout")
val queue = channel.queueDeclare().queue

channel.queueBind(queue, "$exchange-$topic", "")

queueAndTag[topic] = queue to channel.basicConsume(queue, ConsumerWrapper(topic, this))
if (queries.contains(topic)) return
queries[topic] = true
subscribeConnection.sync().subscribe(topic.toChannel())
}

fun unsubscribe(topic: String) {
if (queueAndTag[topic] == null) return

channel.queueUnbind(queueAndTag[topic]!!.first, "$exchange-$topic", "")
channel.basicCancel(queueAndTag[topic]!!.second)

queueAndTag.remove(topic)
if (!queries.contains(topic)) return
subscribeConnection.sync().unsubscribe(topic.toChannel())
queries.remove(topic)
}

fun handleDelivery(topic: String, body: ByteArray) {
val unwrappedData = try {
ProxyAPI.decode<MessageWrapper>(body)
} catch (err: Throwable) {
main.logger.error("Decode MessageWrapper packet fail", err)
null
} ?: return
return
}

// drop short circuited message
if (unwrappedData.source == instanceId) return
Expand All @@ -83,13 +68,11 @@ class Manager(private val channel: Channel, private val exchange: String) {
}

fun send(topic: String, item: Packet) {
MessageDeliveryEvent(topic, item)
.let { main.proxy.eventManager.fireAndForget(it) }

main.proxy.eventManager.fireAndForget(MessageDeliveryEvent(topic, item))
send(topic, ProxyAPI.encode(MessageWrapper(instanceId, item)))
}

fun send(topic: String, body: ByteArray) {
channel.basicPublish("$exchange-$topic", topic, null, body)
publishConnection.async().publish(topic.toChannel(), body)
}
}
10 changes: 3 additions & 7 deletions src/main/resources/config/galaxy-proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ protocolVersion = 498
# Redis connection URI
redis = ""

# pub/sub
redisPubSubPrefix = "galaxy"

# Kubernetes API address
kubernetes = "null"

# rabbitMq
rabbitMqHost = ""
rabbitMqPort = 5672
rabbitMqUsername = ""
rabbitMqPassword = ""
rabbitMqExchange = ""

[galaxy]
# Galaxy docker image
image = "registry.gitlab.com/oktw-network/galaxy"
Expand Down

0 comments on commit 3c855f8

Please sign in to comment.