Skip to content

Commit

Permalink
Reduce SocketAppender overhead (#449)
Browse files Browse the repository at this point in the history
* Add a reconnection test
  • Loading branch information
swebb2066 authored Jan 17, 2025
1 parent 0c5ffd5 commit 1989b75
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 119 deletions.
5 changes: 5 additions & 0 deletions src/main/cpp/aprsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,10 @@ void APRSocket::close()
}
}

apr_socket_t* APRSocket::getSocketPtr() const
{
return _priv->socket;
}

} //namespace helpers
} //namespace log4cxx
98 changes: 66 additions & 32 deletions src/main/cpp/socketappenderskeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <log4cxx/helpers/optionconverter.h>
#include <log4cxx/helpers/stringhelper.h>
#include <log4cxx/spi/loggingevent.h>
#include <log4cxx/helpers/threadutility.h>
#include <log4cxx/helpers/transcoder.h>
#include <log4cxx/helpers/bytearrayoutputstream.h>
#include <log4cxx/helpers/threadutility.h>
Expand All @@ -36,17 +37,17 @@ using namespace LOG4CXX_NS::net;
#define _priv static_cast<SocketAppenderSkeletonPriv*>(m_priv.get())

SocketAppenderSkeleton::SocketAppenderSkeleton(int defaultPort, int reconnectionDelay)
: AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(defaultPort, reconnectionDelay))
: AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(defaultPort, reconnectionDelay))
{
}

SocketAppenderSkeleton::SocketAppenderSkeleton(helpers::InetAddressPtr address, int port, int reconnectionDelay)
: AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(address, port, reconnectionDelay))
: AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(address, port, reconnectionDelay))
{
}

SocketAppenderSkeleton::SocketAppenderSkeleton(const LogString& host, int port, int reconnectionDelay)
: AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(host, port, reconnectionDelay))
: AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(host, port, reconnectionDelay))
{
}

Expand All @@ -68,8 +69,8 @@ void SocketAppenderSkeleton::activateOptions(Pool& p)

void SocketAppenderSkeleton::close()
{
_priv->stopMonitor();
cleanUp(_priv->pool);
_priv->stopMonitor();
cleanUp(_priv->pool);
}

void SocketAppenderSkeleton::connect(Pool& p)
Expand Down Expand Up @@ -136,22 +137,47 @@ void SocketAppenderSkeleton::setOption(const LogString& option, const LogString&
void SocketAppenderSkeleton::fireConnector()
{
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);

if ( !_priv->thread.joinable() )
{
LogLog::debug(LOG4CXX_STR("Connector thread not alive: starting monitor."));

_priv->thread = ThreadUtility::instance()->createThread( LOG4CXX_STR("SocketAppend"), &SocketAppenderSkeleton::monitor, this );
}
if (_priv->taskName.empty())
{
Pool p;
_priv->taskName = _priv->name + LOG4CXX_STR(":")
+ _priv->address->toString() + LOG4CXX_STR(":");
StringHelper::toString(_priv->port, p, _priv->taskName);
}
auto taskManager = ThreadUtility::instancePtr();
if (!taskManager->value().hasPeriodicTask(_priv->taskName))
{
Pool p;
if (LogLog::isDebugEnabled())
{
Pool p;
LogString msg(LOG4CXX_STR("Waiting "));
StringHelper::toString(_priv->reconnectionDelay, p, msg);
msg += LOG4CXX_STR(" ms before retrying [")
+ _priv->address->toString() + LOG4CXX_STR(":");
StringHelper::toString(_priv->port, p, msg);
msg += LOG4CXX_STR("].");
LogLog::debug(msg);
}
taskManager->value().addPeriodicTask(_priv->taskName
, std::bind(&SocketAppenderSkeleton::retryConnect, this)
, std::chrono::milliseconds(_priv->reconnectionDelay)
);
}
_priv->taskManager = taskManager;
}

void SocketAppenderSkeleton::monitor()
void SocketAppenderSkeleton::retryConnect()
{
Pool p;
SocketPtr socket;

while (!is_closed())
if (is_closed())
{
if (auto pManager = _priv->taskManager.lock())
pManager->value().removePeriodicTask(_priv->taskName);
}
else
{
Pool p;
SocketPtr socket;
try
{
if (LogLog::isDebugEnabled())
Expand All @@ -166,8 +192,14 @@ void SocketAppenderSkeleton::monitor()
setSocket(socket, p);
if (LogLog::isDebugEnabled())
{
LogLog::debug(LOG4CXX_STR("Connection established. Exiting connector thread."));
LogString msg(LOG4CXX_STR("Connection established to [")
+ _priv->address->toString() + LOG4CXX_STR(":"));
StringHelper::toString(_priv->port, p, msg);
msg += LOG4CXX_STR("].");
LogLog::debug(msg);
}
if (auto pManager = _priv->taskManager.lock())
pManager->value().removePeriodicTask(_priv->taskName);
return;
}
catch (ConnectException& e)
Expand Down Expand Up @@ -197,26 +229,17 @@ void SocketAppenderSkeleton::monitor()
msg += LOG4CXX_STR("].");
LogLog::debug(msg);
}

std::unique_lock<std::mutex> lock( _priv->interrupt_mutex );
if (_priv->interrupt.wait_for( lock, std::chrono::milliseconds( _priv->reconnectionDelay ),
std::bind(&SocketAppenderSkeleton::is_closed, this) ))
break;
}
}
}

void SocketAppenderSkeleton::SocketAppenderSkeletonPriv::stopMonitor()
{
{
std::lock_guard<std::mutex> lock(this->interrupt_mutex);
if (this->closed)
return;
this->closed = true;
}
this->interrupt.notify_all();
if (this->thread.joinable())
this->thread.join();
this->closed = true;
if (this->taskName.empty())
;
else if (auto pManager = this->taskManager.lock())
pManager->value().removePeriodicTask(this->taskName);
}

bool SocketAppenderSkeleton::is_closed()
Expand Down Expand Up @@ -258,6 +281,17 @@ bool SocketAppenderSkeleton::getLocationInfo() const
void SocketAppenderSkeleton::setReconnectionDelay(int reconnectionDelay1)
{
_priv->reconnectionDelay = reconnectionDelay1;
if (_priv->taskName.empty())
return;
auto pManager = _priv->taskManager.lock();
if (pManager && pManager->value().hasPeriodicTask(_priv->taskName))
{
pManager->value().removePeriodicTask(_priv->taskName);
pManager->value().addPeriodicTask(_priv->taskName
, std::bind(&SocketAppenderSkeleton::retryConnect, this)
, std::chrono::milliseconds(_priv->reconnectionDelay)
);
}
}

int SocketAppenderSkeleton::getReconnectionDelay() const
Expand Down
45 changes: 24 additions & 21 deletions src/main/cpp/threadutility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ struct ThreadUtility::priv_data
LogString name;
Period delay;
TimePoint nextRun;
int errorCount;
std::function<void()> f;
int errorCount;
bool removed;
};
using JobStore = std::list<NamedPeriodicFunction>;
JobStore jobs;
std::mutex job_mutex;
std::recursive_mutex job_mutex;
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;
bool terminated{false};
bool terminated{ false };
int retryCount{ 2 };
Period maxDelay{ 0 };

Expand Down Expand Up @@ -264,11 +265,11 @@ ThreadStartPost ThreadUtility::postStartFunction()
*/
void ThreadUtility::addPeriodicTask(const LogString& name, std::function<void()> f, const Period& delay)
{
std::lock_guard<std::mutex> lock(m_priv->job_mutex);
std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
if (m_priv->maxDelay < delay)
m_priv->maxDelay = delay;
auto currentTime = std::chrono::system_clock::now();
m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, currentTime + delay, 0, f} );
m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, currentTime + delay, f, 0, false} );
if (!m_priv->thread.joinable())
{
m_priv->terminated = false;
Expand All @@ -283,10 +284,10 @@ void ThreadUtility::addPeriodicTask(const LogString& name, std::function<void()>
*/
bool ThreadUtility::hasPeriodicTask(const LogString& name)
{
std::lock_guard<std::mutex> lock(m_priv->job_mutex);
std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
, [&name](const priv_data::NamedPeriodicFunction& item)
{ return name == item.name; }
{ return !item.removed && name == item.name; }
);
return m_priv->jobs.end() != pItem;
}
Expand All @@ -297,7 +298,7 @@ bool ThreadUtility::hasPeriodicTask(const LogString& name)
void ThreadUtility::removeAllPeriodicTasks()
{
{
std::lock_guard<std::mutex> lock(m_priv->job_mutex);
std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
while (!m_priv->jobs.empty())
m_priv->jobs.pop_back();
}
Expand All @@ -309,14 +310,14 @@ void ThreadUtility::removeAllPeriodicTasks()
*/
void ThreadUtility::removePeriodicTask(const LogString& name)
{
std::lock_guard<std::mutex> lock(m_priv->job_mutex);
std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
, [&name](const priv_data::NamedPeriodicFunction& item)
{ return name == item.name; }
{ return !item.removed && name == item.name; }
);
if (m_priv->jobs.end() != pItem)
{
m_priv->jobs.erase(pItem);
pItem->removed = true;
m_priv->interrupt.notify_one();
}
}
Expand All @@ -328,14 +329,14 @@ void ThreadUtility::removePeriodicTasksMatching(const LogString& namePrefix)
{
while (1)
{
std::lock_guard<std::mutex> lock(m_priv->job_mutex);
std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
, [&namePrefix](const priv_data::NamedPeriodicFunction& item)
{ return namePrefix.size() <= item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; }
{ return !item.removed && namePrefix.size() <= item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; }
);
if (m_priv->jobs.end() == pItem)
break;
m_priv->jobs.erase(pItem);
pItem->removed = true;
}
m_priv->interrupt.notify_one();
}
Expand All @@ -348,14 +349,14 @@ void ThreadUtility::priv_data::doPeriodicTasks()
auto currentTime = std::chrono::system_clock::now();
TimePoint nextOperationTime = currentTime + this->maxDelay;
{
std::lock_guard<std::mutex> lock(this->job_mutex);
if (this->jobs.empty())
break;
std::lock_guard<std::recursive_mutex> lock(this->job_mutex);
for (auto& item : this->jobs)
{
if (this->terminated)
return;
if (item.nextRun <= currentTime)
if (item.removed)
;
else if (item.nextRun <= currentTime)
{
try
{
Expand All @@ -380,17 +381,19 @@ void ThreadUtility::priv_data::doPeriodicTasks()
nextOperationTime = item.nextRun;
}
}
// Remove faulty tasks
// Delete removed and faulty tasks
while (1)
{
std::lock_guard<std::mutex> lock(this->job_mutex);
std::lock_guard<std::recursive_mutex> lock(this->job_mutex);
auto pItem = std::find_if(this->jobs.begin(), this->jobs.end()
, [this](const NamedPeriodicFunction& item)
{ return this->retryCount < item.errorCount; }
{ return item.removed || this->retryCount < item.errorCount; }
);
if (this->jobs.end() == pItem)
break;
this->jobs.erase(pItem);
if (this->jobs.empty())
return;
}

std::unique_lock<std::mutex> lock(this->interrupt_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/main/include/log4cxx/net/socketappenderskeleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class LOG4CXX_EXPORT SocketAppenderSkeleton : public AppenderSkeleton
connection is droppped.
*/

void monitor();
void retryConnect();
bool is_closed();
SocketAppenderSkeleton(const SocketAppenderSkeleton&);
SocketAppenderSkeleton& operator=(const SocketAppenderSkeleton&);
Expand Down
2 changes: 2 additions & 0 deletions src/main/include/log4cxx/private/aprsocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class LOG4CXX_EXPORT APRSocket : public helpers::Socket
/** Closes this socket. */
virtual void close();

apr_socket_t* getSocketPtr() const;

private:
struct APRSocketPriv;
};
Expand Down
29 changes: 12 additions & 17 deletions src/main/include/log4cxx/private/socketappenderskeleton_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
#include <log4cxx/net/socketappenderskeleton.h>
#include <log4cxx/private/appenderskeleton_priv.h>
#include <log4cxx/helpers/inetaddress.h>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif
#include <log4cxx/helpers/threadutility.h>

namespace LOG4CXX_NS
{
Expand All @@ -39,9 +36,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele
port(defaultPort),
reconnectionDelay(reconnectionDelay),
locationInfo(false)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopMonitor();})
#endif
{ }

SocketAppenderSkeletonPriv(helpers::InetAddressPtr address, int defaultPort, int reconnectionDelay) :
Expand All @@ -51,9 +45,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele
port(defaultPort),
reconnectionDelay(reconnectionDelay),
locationInfo(false)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopMonitor();})
#endif
{ }

SocketAppenderSkeletonPriv(const LogString& host, int port, int delay) :
Expand All @@ -63,9 +54,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele
port(port),
reconnectionDelay(delay),
locationInfo(false)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopMonitor();})
#endif
{ }

~SocketAppenderSkeletonPriv()
Expand All @@ -84,15 +72,22 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele
int port;
int reconnectionDelay;
bool locationInfo;
#if LOG4CXX_ABI_VERSION <= 15
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;

#if LOG4CXX_EVENTS_AT_EXIT
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

void stopMonitor();

/**
Manages asynchronous reconnection attempts.
*/
helpers::ThreadUtility::ManagerWeakPtr taskManager;

/**
The reconnection task name.
*/
LogString taskName;
};

} // namespace net
Expand Down
Loading

0 comments on commit 1989b75

Please sign in to comment.