Skip to content

Commit

Permalink
revert queue changes
Browse files Browse the repository at this point in the history
Signed-off-by: Lawrence Lee <[email protected]>
  • Loading branch information
theasianpianist committed Jan 13, 2025
1 parent 5451955 commit a060b1c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 21 deletions.
34 changes: 14 additions & 20 deletions common/notificationconsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const st
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("Creating notification consumer for %s", m_channel.c_str());
m_queue = std::make_shared<std::queue<std::string>>();
while (true)
{
try
Expand Down Expand Up @@ -108,12 +106,12 @@ uint64_t swss::NotificationConsumer::readData()

bool swss::NotificationConsumer::hasData()
{
return m_queue->size() > 0;
return m_queue.size() > 0;
}

bool swss::NotificationConsumer::hasCachedData()
{
return m_queue->size() > 1;
return m_queue.size() > 1;
}

void swss::NotificationConsumer::processReply(redisReply *reply)
Expand Down Expand Up @@ -141,24 +139,23 @@ void swss::NotificationConsumer::processReply(redisReply *reply)

SWSS_LOG_DEBUG("got message: %s", msg.c_str());

m_queue->push(msg);
SWSS_LOG_INFO("%s queue size is %zu", m_channel.c_str(), m_queue->size());
m_queue.push(msg);
}

void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();

if (m_queue->empty())
if (m_queue.empty())
{
SWSS_LOG_ERROR("notification queue is empty, can't pop");
throw std::runtime_error("notification queue is empty, can't pop");
}

std::string msg = m_queue->front();
m_queue->pop();
std::string msg = m_queue.front();
m_queue.pop();

if (m_queue->empty())
if (m_queue.empty())
{
/***
* If there is a burst of notifications that causes the queue to grow in size,
Expand All @@ -167,19 +164,16 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve
*
* Force the memory to be released by destroying existing queue and creating a new one.
*/
SWSS_LOG_INFO("%s queue is empty, recreating", m_channel.c_str());
SWSS_LOG_DEBUG("%s queue is empty, calling malloc_trim()", m_channel.c_str());
int rv = malloc_trim(0);
if (rv == 1)
{
SWSS_LOG_INFO("Memory released successfully");
SWSS_LOG_DEBUG("Memory released successfully");
}
else
{
SWSS_LOG_INFO("No memory released by malloc_trim");
SWSS_LOG_DEBUG("No memory released by malloc_trim");
}
m_queue.reset();
m_queue = nullptr;
m_queue = std::make_shared<std::queue<std::string>>();
}

values.clear();
Expand All @@ -198,9 +192,9 @@ void swss::NotificationConsumer::pops(std::deque<KeyOpFieldsValuesTuple> &vkco)
SWSS_LOG_ENTER();

vkco.clear();
while(!m_queue->empty())
while(!m_queue.empty())
{
while(!m_queue->empty())
while(!m_queue.empty())
{
std::string op;
std::string data;
Expand All @@ -226,7 +220,7 @@ void swss::NotificationConsumer::pops(std::deque<KeyOpFieldsValuesTuple> &vkco)
int swss::NotificationConsumer::peek()
{
SWSS_LOG_ENTER();
if (m_queue->empty())
if (m_queue.empty())
{
// Peek for more data in redis socket
int rc = swss::peekRedisContext(m_subscribe->getContext());
Expand All @@ -237,5 +231,5 @@ int swss::NotificationConsumer::peek()
readData();
}

return m_queue->empty() ? 0 : 1;
return m_queue.empty() ? 0 : 1;
}
2 changes: 1 addition & 1 deletion common/notificationconsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class NotificationConsumer : public Selectable
swss::DBConnector *m_db;
swss::DBConnector *m_subscribe;
std::string m_channel;
std::shared_ptr<std::queue<std::string>> m_queue;
std::queue<std::string> m_queue;
};

}
Expand Down

0 comments on commit a060b1c

Please sign in to comment.