Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq_new
  • Loading branch information
divyagayathri-hcl committed Jan 13, 2025
2 parents 1c2c16b + c872f42 commit 8fbef1a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 37 deletions.
1 change: 1 addition & 0 deletions base-tooling-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pympler ==0.8 --hash=sha256:f74cd2982c5cd92ded55561191945616f2bb904a0ae5cdacdb566c6696bdb922
4 changes: 4 additions & 0 deletions common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ namespace swss {
#define COUNTERS_RIF_TYPE_MAP "COUNTERS_RIF_TYPE_MAP"
#define COUNTERS_RIF_NAME_MAP "COUNTERS_RIF_NAME_MAP"
#define COUNTERS_TRAP_NAME_MAP "COUNTERS_TRAP_NAME_MAP"
#define COUNTERS_POLICER_NAME_MAP "COUNTERS_POLICER_NAME_MAP"
#define COUNTERS_CRM_TABLE "CRM"
#define COUNTERS_BUFFER_POOL_NAME_MAP "COUNTERS_BUFFER_POOL_NAME_MAP"
#define COUNTERS_SWITCH_NAME_MAP "COUNTERS_SWITCH_NAME_MAP"
Expand Down Expand Up @@ -279,10 +280,13 @@ namespace swss {
#define TUNNEL_ATTR_ID_LIST "TUNNEL_ATTR_ID_LIST"
#define ACL_COUNTER_ATTR_ID_LIST "ACL_COUNTER_ATTR_ID_LIST"
#define FLOW_COUNTER_ID_LIST "FLOW_COUNTER_ID_LIST"
#define POLICER_COUNTER_ID_LIST "POLICER_COUNTER_ID_LIST"
#define PLUGIN_TABLE "PLUGIN_TABLE"
#define LUA_PLUGIN_TYPE "LUA_PLUGIN_TYPE"
#define SAI_OBJECT_TYPE "SAI_OBJECT_TYPE"

#define BULK_CHUNK_SIZE_FIELD "BULK_CHUNK_SIZE"
#define BULK_CHUNK_SIZE_PER_PREFIX_FIELD "BULK_CHUNK_SIZE_PER_PREFIX"
#define POLL_INTERVAL_FIELD "POLL_INTERVAL"
#define STATS_MODE_FIELD "STATS_MODE"
#define STATS_MODE_READ "STATS_MODE_READ"
Expand Down
14 changes: 6 additions & 8 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,11 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

//TODO: To implement the wait() method later.
bool ZmqClient::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_ENTER();

return false;
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
bool ZmqClient::wait(
const std::string &dbName, const std::string &tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
return false;
}
}
13 changes: 6 additions & 7 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,11 @@ void ZmqServer::mqPollThread()
SWSS_LOG_NOTICE("mqPollThread end");
}

//TODO: To implement the sendMsg() method later.
void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{

return;
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
void ZmqServer::sendMsg(
const std::string &dbName, const std::string &tableName,
const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
return;
}

}
8 changes: 2 additions & 6 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,12 @@ TEST(c_api, ZmqConsumerProducerStateTable) {

if (flag == 0)
for (uint64_t i = 0; i < arr.len; i++)
{
SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues);
}
else
{
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
}
sleep(2);

ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA);
sleep(2);
arr = SWSSZmqConsumerStateTable_pops(cst);

vector<KeyOpFieldsValuesTuple> kfvs = takeKeyOpFieldValuesArray(arr);
Expand All @@ -295,7 +291,6 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
EXPECT_EQ(fieldValues1[0].first, "myfield3");
EXPECT_EQ(fieldValues1[0].second, "myvalue3");

sleep(2);
arr = SWSSZmqConsumerStateTable_pops(cst);
ASSERT_EQ(arr.len, 0);
freeKeyOpFieldValuesArray(arr);
Expand All @@ -309,6 +304,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
SWSSZmqProducerStateTable_del(pst, arr.data[i].key);
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
sleep(2);

ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA);
arr = SWSSZmqConsumerStateTable_pops(cst);
Expand Down
29 changes: 13 additions & 16 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence
}

// Wait for some time to write into the DB.

sleep(3);

allDataReceived = true;
Expand Down Expand Up @@ -480,21 +479,19 @@ TEST(ZmqProducerStateTableDeleteAfterSend, test)

static bool zmq_done = false;

static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence)
{
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";

cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint, "");
ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence);
//validate received data
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});

while (!zmq_done)
{
static void zmqConsumerWorker(string tableName, string endpoint,
bool dbPersistence) {
cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint, "");
ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence);
// validate received data
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{
"k", SET_COMMAND,
std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});

while (!zmq_done) {
sleep(10);
std::string rec_dbName, rec_tableName;
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> rec_kcos_ptrs;
Expand Down

0 comments on commit 8fbef1a

Please sign in to comment.