From ba1931c66d33dd2b62a40e441078c7ddf9300f54 Mon Sep 17 00:00:00 2001 From: Andreas Grill <1265844+andreasgrill@users.noreply.github.com> Date: Mon, 12 Aug 2024 12:46:22 +0200 Subject: [PATCH] feat: add callback when mqtt connection is lost (#107) --- .../com/open200/xesar/connect/XesarConnect.kt | 11 ++++- .../open200/xesar/connect/XesarMqttClient.kt | 4 +- .../connect/it/MqttConnectionLostTest.kt | 42 +++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 xesar-connect/src/test/kotlin/com/open200/xesar/connect/it/MqttConnectionLostTest.kt diff --git a/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarConnect.kt b/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarConnect.kt index cd52d2f..1c38dab 100644 --- a/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarConnect.kt +++ b/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarConnect.kt @@ -34,6 +34,9 @@ class XesarConnect(private val client: IXesarMqttClient, val config: Config) { private val coroutineScopeForSendCommand = CoroutineScope(config.dispatcherForCommandsAndCleanUp) private val coroutineScopeForCleanUp = CoroutineScope(config.dispatcherForCommandsAndCleanUp) + + /** Callback for when the mqtt connection is lost. */ + var onConnectionLost: ((ConnectionFailedException) -> Unit)? = null lateinit var token: Token /** @@ -43,12 +46,18 @@ class XesarConnect(private val client: IXesarMqttClient, val config: Config) { * @property token The token to be included in the request. */ data class RequestConfig(val timeout: Long = 5000L) + internal fun buildRequestConfig(): RequestConfig { return RequestConfig() } init { - client.onDisconnect = { connectionChannel.trySend(ConnectionEvent.DISCONNECTED) } + client.onDisconnect = { ex -> + connectionChannel.trySend(ConnectionEvent.DISCONNECTED) + if (ex != null) { + onConnectionLost?.invoke(ex) + } + } client.onMessage = { topic, message -> // call all listeners on the topic val decodedMessage = message.decodeToString() diff --git a/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarMqttClient.kt b/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarMqttClient.kt index 9ee6c59..a0e3c61 100644 --- a/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarMqttClient.kt +++ b/xesar-connect/src/main/kotlin/com/open200/xesar/connect/XesarMqttClient.kt @@ -25,7 +25,9 @@ class XesarMqttClient(private val client: MqttAsyncClient) : IXesarMqttClient { override fun connectionLost(cause: Throwable?) { log.error("lost connection: $cause") - val exception = ConnectionFailedException("lost connection: $cause") + val exception = + cause?.let { ConnectionFailedException("lost connection", it) } + ?: ConnectionFailedException("lost connection") onDisconnect(exception) } diff --git a/xesar-connect/src/test/kotlin/com/open200/xesar/connect/it/MqttConnectionLostTest.kt b/xesar-connect/src/test/kotlin/com/open200/xesar/connect/it/MqttConnectionLostTest.kt new file mode 100644 index 0000000..e98a2b4 --- /dev/null +++ b/xesar-connect/src/test/kotlin/com/open200/xesar/connect/it/MqttConnectionLostTest.kt @@ -0,0 +1,42 @@ +package com.open200.xesar.connect.it + +import com.open200.xesar.connect.XesarConnect +import com.open200.xesar.connect.XesarMqttClient +import io.kotest.common.runBlocking +import io.kotest.core.spec.style.FunSpec +import io.kotest.extensions.testcontainers.perTest +import java.util.* +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout + +class MqttConnectionLostTest : + FunSpec({ + val container = MosquittoContainer.container() + val config = MosquittoContainer.config(container) + listener(container.perTest()) + + test("connect with custom mqtt client id") { + runBlocking { + val connected = CompletableDeferred() + val connectionLost = CompletableDeferred() + launch { + val xesarMqttClient = XesarMqttClient.connectAsync(config).await() + connected.complete(Unit) + + val xesarConnect = XesarConnect(xesarMqttClient, config) + xesarConnect.onConnectionLost = { cause -> connectionLost.complete(Unit) } + try { + xesarConnect.delay() + } catch (e: Exception) { + // ignore + } + } + launch { + withTimeout(1000) { connected.await() } + container.stop() + withTimeout(1000) { connectionLost.await() } + } + } + } + })