From 1989b756d76fa635b699c0e0a6f7a388f6e01879 Mon Sep 17 00:00:00 2001 From: Stephen Webb Date: Fri, 17 Jan 2025 11:53:40 +1100 Subject: [PATCH] Reduce SocketAppender overhead (#449) * Add a reconnection test --- src/main/cpp/aprsocket.cpp | 5 + src/main/cpp/socketappenderskeleton.cpp | 98 ++++++--- src/main/cpp/threadutility.cpp | 45 ++-- .../log4cxx/net/socketappenderskeleton.h | 2 +- src/main/include/log4cxx/private/aprsocket.h | 2 + .../private/socketappenderskeleton_priv.h | 29 ++- src/test/cpp/net/CMakeLists.txt | 1 + src/test/cpp/net/socketappendertestcase.cpp | 204 +++++++++++++----- 8 files changed, 267 insertions(+), 119 deletions(-) diff --git a/src/main/cpp/aprsocket.cpp b/src/main/cpp/aprsocket.cpp index c879b5138..2ed76bda7 100644 --- a/src/main/cpp/aprsocket.cpp +++ b/src/main/cpp/aprsocket.cpp @@ -159,5 +159,10 @@ void APRSocket::close() } } +apr_socket_t* APRSocket::getSocketPtr() const +{ + return _priv->socket; +} + } //namespace helpers } //namespace log4cxx diff --git a/src/main/cpp/socketappenderskeleton.cpp b/src/main/cpp/socketappenderskeleton.cpp index 993294bb9..fdba18a89 100644 --- a/src/main/cpp/socketappenderskeleton.cpp +++ b/src/main/cpp/socketappenderskeleton.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -36,17 +37,17 @@ using namespace LOG4CXX_NS::net; #define _priv static_cast(m_priv.get()) SocketAppenderSkeleton::SocketAppenderSkeleton(int defaultPort, int reconnectionDelay) - : AppenderSkeleton(std::make_unique(defaultPort, reconnectionDelay)) + : AppenderSkeleton(std::make_unique(defaultPort, reconnectionDelay)) { } SocketAppenderSkeleton::SocketAppenderSkeleton(helpers::InetAddressPtr address, int port, int reconnectionDelay) - : AppenderSkeleton(std::make_unique(address, port, reconnectionDelay)) + : AppenderSkeleton(std::make_unique(address, port, reconnectionDelay)) { } SocketAppenderSkeleton::SocketAppenderSkeleton(const LogString& host, int port, int reconnectionDelay) - : AppenderSkeleton(std::make_unique(host, port, reconnectionDelay)) + : AppenderSkeleton(std::make_unique(host, port, reconnectionDelay)) { } @@ -68,8 +69,8 @@ void SocketAppenderSkeleton::activateOptions(Pool& p) void SocketAppenderSkeleton::close() { - _priv->stopMonitor(); - cleanUp(_priv->pool); + _priv->stopMonitor(); + cleanUp(_priv->pool); } void SocketAppenderSkeleton::connect(Pool& p) @@ -136,22 +137,47 @@ void SocketAppenderSkeleton::setOption(const LogString& option, const LogString& void SocketAppenderSkeleton::fireConnector() { std::lock_guard lock(_priv->mutex); - - if ( !_priv->thread.joinable() ) - { - LogLog::debug(LOG4CXX_STR("Connector thread not alive: starting monitor.")); - - _priv->thread = ThreadUtility::instance()->createThread( LOG4CXX_STR("SocketAppend"), &SocketAppenderSkeleton::monitor, this ); - } + if (_priv->taskName.empty()) + { + Pool p; + _priv->taskName = _priv->name + LOG4CXX_STR(":") + + _priv->address->toString() + LOG4CXX_STR(":"); + StringHelper::toString(_priv->port, p, _priv->taskName); + } + auto taskManager = ThreadUtility::instancePtr(); + if (!taskManager->value().hasPeriodicTask(_priv->taskName)) + { + Pool p; + if (LogLog::isDebugEnabled()) + { + Pool p; + LogString msg(LOG4CXX_STR("Waiting ")); + StringHelper::toString(_priv->reconnectionDelay, p, msg); + msg += LOG4CXX_STR(" ms before retrying [") + + _priv->address->toString() + LOG4CXX_STR(":"); + StringHelper::toString(_priv->port, p, msg); + msg += LOG4CXX_STR("]."); + LogLog::debug(msg); + } + taskManager->value().addPeriodicTask(_priv->taskName + , std::bind(&SocketAppenderSkeleton::retryConnect, this) + , std::chrono::milliseconds(_priv->reconnectionDelay) + ); + } + _priv->taskManager = taskManager; } -void SocketAppenderSkeleton::monitor() +void SocketAppenderSkeleton::retryConnect() { - Pool p; - SocketPtr socket; - - while (!is_closed()) + if (is_closed()) { + if (auto pManager = _priv->taskManager.lock()) + pManager->value().removePeriodicTask(_priv->taskName); + } + else + { + Pool p; + SocketPtr socket; try { if (LogLog::isDebugEnabled()) @@ -166,8 +192,14 @@ void SocketAppenderSkeleton::monitor() setSocket(socket, p); if (LogLog::isDebugEnabled()) { - LogLog::debug(LOG4CXX_STR("Connection established. Exiting connector thread.")); + LogString msg(LOG4CXX_STR("Connection established to [") + + _priv->address->toString() + LOG4CXX_STR(":")); + StringHelper::toString(_priv->port, p, msg); + msg += LOG4CXX_STR("]."); + LogLog::debug(msg); } + if (auto pManager = _priv->taskManager.lock()) + pManager->value().removePeriodicTask(_priv->taskName); return; } catch (ConnectException& e) @@ -197,26 +229,17 @@ void SocketAppenderSkeleton::monitor() msg += LOG4CXX_STR("]."); LogLog::debug(msg); } - - std::unique_lock lock( _priv->interrupt_mutex ); - if (_priv->interrupt.wait_for( lock, std::chrono::milliseconds( _priv->reconnectionDelay ), - std::bind(&SocketAppenderSkeleton::is_closed, this) )) - break; } } } void SocketAppenderSkeleton::SocketAppenderSkeletonPriv::stopMonitor() { - { - std::lock_guard lock(this->interrupt_mutex); - if (this->closed) - return; - this->closed = true; - } - this->interrupt.notify_all(); - if (this->thread.joinable()) - this->thread.join(); + this->closed = true; + if (this->taskName.empty()) + ; + else if (auto pManager = this->taskManager.lock()) + pManager->value().removePeriodicTask(this->taskName); } bool SocketAppenderSkeleton::is_closed() @@ -258,6 +281,17 @@ bool SocketAppenderSkeleton::getLocationInfo() const void SocketAppenderSkeleton::setReconnectionDelay(int reconnectionDelay1) { _priv->reconnectionDelay = reconnectionDelay1; + if (_priv->taskName.empty()) + return; + auto pManager = _priv->taskManager.lock(); + if (pManager && pManager->value().hasPeriodicTask(_priv->taskName)) + { + pManager->value().removePeriodicTask(_priv->taskName); + pManager->value().addPeriodicTask(_priv->taskName + , std::bind(&SocketAppenderSkeleton::retryConnect, this) + , std::chrono::milliseconds(_priv->reconnectionDelay) + ); + } } int SocketAppenderSkeleton::getReconnectionDelay() const diff --git a/src/main/cpp/threadutility.cpp b/src/main/cpp/threadutility.cpp index 72e872485..b9dec099e 100644 --- a/src/main/cpp/threadutility.cpp +++ b/src/main/cpp/threadutility.cpp @@ -69,16 +69,17 @@ struct ThreadUtility::priv_data LogString name; Period delay; TimePoint nextRun; - int errorCount; std::function f; + int errorCount; + bool removed; }; using JobStore = std::list; JobStore jobs; - std::mutex job_mutex; + std::recursive_mutex job_mutex; std::thread thread; std::condition_variable interrupt; std::mutex interrupt_mutex; - bool terminated{false}; + bool terminated{ false }; int retryCount{ 2 }; Period maxDelay{ 0 }; @@ -264,11 +265,11 @@ ThreadStartPost ThreadUtility::postStartFunction() */ void ThreadUtility::addPeriodicTask(const LogString& name, std::function f, const Period& delay) { - std::lock_guard lock(m_priv->job_mutex); + std::lock_guard lock(m_priv->job_mutex); if (m_priv->maxDelay < delay) m_priv->maxDelay = delay; auto currentTime = std::chrono::system_clock::now(); - m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, currentTime + delay, 0, f} ); + m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, currentTime + delay, f, 0, false} ); if (!m_priv->thread.joinable()) { m_priv->terminated = false; @@ -283,10 +284,10 @@ void ThreadUtility::addPeriodicTask(const LogString& name, std::function */ bool ThreadUtility::hasPeriodicTask(const LogString& name) { - std::lock_guard lock(m_priv->job_mutex); + std::lock_guard lock(m_priv->job_mutex); auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end() , [&name](const priv_data::NamedPeriodicFunction& item) - { return name == item.name; } + { return !item.removed && name == item.name; } ); return m_priv->jobs.end() != pItem; } @@ -297,7 +298,7 @@ bool ThreadUtility::hasPeriodicTask(const LogString& name) void ThreadUtility::removeAllPeriodicTasks() { { - std::lock_guard lock(m_priv->job_mutex); + std::lock_guard lock(m_priv->job_mutex); while (!m_priv->jobs.empty()) m_priv->jobs.pop_back(); } @@ -309,14 +310,14 @@ void ThreadUtility::removeAllPeriodicTasks() */ void ThreadUtility::removePeriodicTask(const LogString& name) { - std::lock_guard lock(m_priv->job_mutex); + std::lock_guard lock(m_priv->job_mutex); auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end() , [&name](const priv_data::NamedPeriodicFunction& item) - { return name == item.name; } + { return !item.removed && name == item.name; } ); if (m_priv->jobs.end() != pItem) { - m_priv->jobs.erase(pItem); + pItem->removed = true; m_priv->interrupt.notify_one(); } } @@ -328,14 +329,14 @@ void ThreadUtility::removePeriodicTasksMatching(const LogString& namePrefix) { while (1) { - std::lock_guard lock(m_priv->job_mutex); + std::lock_guard lock(m_priv->job_mutex); auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end() , [&namePrefix](const priv_data::NamedPeriodicFunction& item) - { return namePrefix.size() <= item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; } + { return !item.removed && namePrefix.size() <= item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; } ); if (m_priv->jobs.end() == pItem) break; - m_priv->jobs.erase(pItem); + pItem->removed = true; } m_priv->interrupt.notify_one(); } @@ -348,14 +349,14 @@ void ThreadUtility::priv_data::doPeriodicTasks() auto currentTime = std::chrono::system_clock::now(); TimePoint nextOperationTime = currentTime + this->maxDelay; { - std::lock_guard lock(this->job_mutex); - if (this->jobs.empty()) - break; + std::lock_guard lock(this->job_mutex); for (auto& item : this->jobs) { if (this->terminated) return; - if (item.nextRun <= currentTime) + if (item.removed) + ; + else if (item.nextRun <= currentTime) { try { @@ -380,17 +381,19 @@ void ThreadUtility::priv_data::doPeriodicTasks() nextOperationTime = item.nextRun; } } - // Remove faulty tasks + // Delete removed and faulty tasks while (1) { - std::lock_guard lock(this->job_mutex); + std::lock_guard lock(this->job_mutex); auto pItem = std::find_if(this->jobs.begin(), this->jobs.end() , [this](const NamedPeriodicFunction& item) - { return this->retryCount < item.errorCount; } + { return item.removed || this->retryCount < item.errorCount; } ); if (this->jobs.end() == pItem) break; this->jobs.erase(pItem); + if (this->jobs.empty()) + return; } std::unique_lock lock(this->interrupt_mutex); diff --git a/src/main/include/log4cxx/net/socketappenderskeleton.h b/src/main/include/log4cxx/net/socketappenderskeleton.h index c0edada57..0a166738a 100644 --- a/src/main/include/log4cxx/net/socketappenderskeleton.h +++ b/src/main/include/log4cxx/net/socketappenderskeleton.h @@ -164,7 +164,7 @@ class LOG4CXX_EXPORT SocketAppenderSkeleton : public AppenderSkeleton connection is droppped. */ - void monitor(); + void retryConnect(); bool is_closed(); SocketAppenderSkeleton(const SocketAppenderSkeleton&); SocketAppenderSkeleton& operator=(const SocketAppenderSkeleton&); diff --git a/src/main/include/log4cxx/private/aprsocket.h b/src/main/include/log4cxx/private/aprsocket.h index 4b6e6dd71..9c566af62 100644 --- a/src/main/include/log4cxx/private/aprsocket.h +++ b/src/main/include/log4cxx/private/aprsocket.h @@ -41,6 +41,8 @@ class LOG4CXX_EXPORT APRSocket : public helpers::Socket /** Closes this socket. */ virtual void close(); + apr_socket_t* getSocketPtr() const; + private: struct APRSocketPriv; }; diff --git a/src/main/include/log4cxx/private/socketappenderskeleton_priv.h b/src/main/include/log4cxx/private/socketappenderskeleton_priv.h index 64ef6c35a..9ed67f41d 100644 --- a/src/main/include/log4cxx/private/socketappenderskeleton_priv.h +++ b/src/main/include/log4cxx/private/socketappenderskeleton_priv.h @@ -20,10 +20,7 @@ #include #include #include - -#if LOG4CXX_EVENTS_AT_EXIT -#include -#endif +#include namespace LOG4CXX_NS { @@ -39,9 +36,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele port(defaultPort), reconnectionDelay(reconnectionDelay), locationInfo(false) -#if LOG4CXX_EVENTS_AT_EXIT - , atExitRegistryRaii([this]{stopMonitor();}) -#endif { } SocketAppenderSkeletonPriv(helpers::InetAddressPtr address, int defaultPort, int reconnectionDelay) : @@ -51,9 +45,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele port(defaultPort), reconnectionDelay(reconnectionDelay), locationInfo(false) -#if LOG4CXX_EVENTS_AT_EXIT - , atExitRegistryRaii([this]{stopMonitor();}) -#endif { } SocketAppenderSkeletonPriv(const LogString& host, int port, int delay) : @@ -63,9 +54,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele port(port), reconnectionDelay(delay), locationInfo(false) -#if LOG4CXX_EVENTS_AT_EXIT - , atExitRegistryRaii([this]{stopMonitor();}) -#endif { } ~SocketAppenderSkeletonPriv() @@ -84,15 +72,22 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele int port; int reconnectionDelay; bool locationInfo; +#if LOG4CXX_ABI_VERSION <= 15 std::thread thread; std::condition_variable interrupt; std::mutex interrupt_mutex; - -#if LOG4CXX_EVENTS_AT_EXIT - helpers::AtExitRegistry::Raii atExitRegistryRaii; #endif - void stopMonitor(); + + /** + Manages asynchronous reconnection attempts. + */ + helpers::ThreadUtility::ManagerWeakPtr taskManager; + + /** + The reconnection task name. + */ + LogString taskName; }; } // namespace net diff --git a/src/test/cpp/net/CMakeLists.txt b/src/test/cpp/net/CMakeLists.txt index 555a0cf9c..7d43f7908 100644 --- a/src/test/cpp/net/CMakeLists.txt +++ b/src/test/cpp/net/CMakeLists.txt @@ -20,6 +20,7 @@ if(LOG4CXX_NETWORKING_SUPPORT) set(NET_TESTS syslogappendertestcase telnetappendertestcase + socketappendertestcase xmlsocketappendertestcase ) else() diff --git a/src/test/cpp/net/socketappendertestcase.cpp b/src/test/cpp/net/socketappendertestcase.cpp index 2271930f3..b64c1ee07 100644 --- a/src/test/cpp/net/socketappendertestcase.cpp +++ b/src/test/cpp/net/socketappendertestcase.cpp @@ -16,10 +16,21 @@ */ #include "../appenderskeletontestcase.h" -#include "apr.h" - -using namespace log4cxx; -using namespace log4cxx::helpers; +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace LOG4CXX_NS { namespace net { + using SocketAppender = XMLSocketAppender; +} } + +using namespace LOG4CXX_NS; /** Unit tests of log4cxx::SocketAppender @@ -32,61 +43,158 @@ class SocketAppenderTestCase : public AppenderSkeletonTestCase // LOGUNIT_TEST(testDefaultThreshold); LOGUNIT_TEST(testSetOptionThreshold); - LOGUNIT_TEST(testInvalidHost); + LOGUNIT_TEST(testRetryConnect); LOGUNIT_TEST_SUITE_END(); - - public: - - void setUp() - { +#ifdef _DEBUG + struct Fixture + { + Fixture() { + helpers::LogLog::setInternalDebugging(true); } + } suiteFixture; +#endif - void tearDown() - { - BasicConfigurator::resetConfiguration(); - } + + public: AppenderSkeleton* createAppenderSkeleton() const { return new log4cxx::net::SocketAppender(); } - void testInvalidHost(){ -// log4cxx::net::SocketAppenderPtr appender = std::make_shared(); -// log4cxx::PatternLayoutPtr layout = std::make_shared(LOG4CXX_STR("%m%n")); - -// log4cxx::helpers::ServerSocket serverSocket(4445); - -// appender->setLayout(layout); -// appender->setRemoteHost(LOG4CXX_STR("localhost")); -// appender->setReconnectionDelay(1); -// appender->setPort(4445); -// log4cxx::helpers::Pool pool; -// appender->activateOptions(pool); - -// BasicConfigurator::configure(appender); - -// log4cxx::Logger::getRootLogger()->setLevel(log4cxx::Level::getAll()); - -// std::thread th1( [](){ -// for( int x = 0; x < 3000; x++ ){ -// LOG4CXX_INFO(Logger::getLogger(LOG4CXX_STR("test")), "Some message" ); -// } -// }); -// std::thread th2( [](){ -// for( int x = 0; x < 3000; x++ ){ -// LOG4CXX_INFO(Logger::getLogger(LOG4CXX_STR("test")), "Some message" ); -// } -// }); - -// SocketPtr incomingSocket = serverSocket.accept(); -// incomingSocket->close(); - -// // If we do not get here, we have deadlocked -// th1.join(); -// th2.join(); + void testRetryConnect() + { + int tcpPort = 44445; + auto appender = std::make_shared(); + appender->setLayout(std::make_shared(LOG4CXX_STR("%d [%T] %m%n"))); + appender->setRemoteHost(LOG4CXX_STR("localhost")); + appender->setReconnectionDelay(50); // milliseconds + appender->setPort(tcpPort); + helpers::Pool pool; + appender->activateOptions(pool); + + BasicConfigurator::configure(appender); + + helpers::ServerSocketUniquePtr serverSocket; + try + { + serverSocket = helpers::ServerSocket::create(tcpPort); + } + catch (std::exception& ex) + { + helpers::LogLog::error(LOG4CXX_STR("ServerSocket::create failed"), ex); + LOGUNIT_FAIL("ServerSocket::create"); + } + serverSocket->setSoTimeout(1000); // milliseconds + + auto logger = Logger::getLogger("test"); + int logEventCount = 3000; + auto doLogging = [logger, logEventCount]() + { + for( int x = 0; x < logEventCount; x++ ){ + LOG4CXX_INFO(logger, "Message " << x); + if (0 == x % 1000) + apr_sleep(50000); // 50 millisecond + } + }; + std::vector loggingThread; + for (auto i : {0, 1}) + loggingThread.emplace_back(doLogging); + + helpers::SocketPtr incomingSocket; + try + { + incomingSocket = serverSocket->accept(); + } + catch (std::exception& ex) + { + helpers::LogLog::error(LOG4CXX_STR("ServerSocket::accept failed"), ex); + for (auto& t : loggingThread) + t.join(); + serverSocket->close(); + LOGUNIT_FAIL("accept failed"); + } + auto aprSocket = std::dynamic_pointer_cast(incomingSocket); + LOGUNIT_ASSERT(aprSocket); + auto pSocket = aprSocket->getSocketPtr(); + LOGUNIT_ASSERT(pSocket); + apr_socket_timeout_set(pSocket, 200000); // 200 millisecond + std::vector messageCount; + char buffer[8*1024]; + apr_size_t len = sizeof(buffer); + apr_status_t status; + while (APR_SUCCESS == (status = apr_socket_recv(pSocket, buffer, &len))) + { + auto pStart = &buffer[0]; + auto pEnd = pStart + len; + for (auto pChar = pStart; pChar < pEnd; ++pChar) + { + if ('\n' == *pChar) + { + std::string line(pStart, pChar); + auto pos = line.rfind(' '); + if (line.npos != pos && pos + 1 < line.size()) + { + try + { + auto msgNumber = std::stoi(line.substr(pos)); + if (messageCount.size() <= msgNumber) + messageCount.resize(msgNumber + 1); + ++messageCount[msgNumber]; + } + catch (std::exception const& ex) + { + LogString msg; + helpers::Transcoder::decode(ex.what(), msg); + msg += LOG4CXX_STR(" processing\n"); + helpers::Transcoder::decode(line, msg); + helpers::LogLog::warn(msg); + } + } + pStart = pChar + 1; + } + } + len = sizeof(buffer); + } + if (helpers::LogLog::isDebugEnabled()) + { + LogString msg = LOG4CXX_STR("apr_socket_recv terminated"); + char err_buff[1024] = {0}; + apr_strerror(status, err_buff, sizeof(err_buff)); + if (0 == err_buff[0] || 0 == strncmp(err_buff, "APR does not understand", 23)) + { + msg.append(LOG4CXX_STR(": error code ")); + helpers::Pool p; + helpers::StringHelper::toString(status, p, msg); + } + else + { + msg.append(LOG4CXX_STR(" - ")); + std::string sMsg = err_buff; + LOG4CXX_DECODE_CHAR(lsMsg, sMsg); + msg.append(lsMsg); + } + helpers::LogLog::debug(msg); + } + incomingSocket->close(); + serverSocket->close(); + for (auto& t : loggingThread) + t.join(); + + if (helpers::LogLog::isDebugEnabled()) + { + helpers::Pool p; + LogString msg(LOG4CXX_STR("messageCount ")); + for (auto item : messageCount) + { + msg += logchar(' '); + helpers::StringHelper::toString(item, p, msg); + } + helpers::LogLog::debug(msg); + } + LOGUNIT_ASSERT_EQUAL(logEventCount, (int)messageCount.size()); } };