Skip to content

Commit

Permalink
for #120, adding reporting of the metric.
Browse files Browse the repository at this point in the history
also messageRateMax implemented for consumer.
  • Loading branch information
petersilva committed Nov 12, 2023
1 parent af865c9 commit 6549a46
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 10 deletions.
63 changes: 58 additions & 5 deletions sr_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,22 @@ int sr_consume_cleanup(struct sr_context *sr_c)
return (1);
}

int sr_consume_setup(struct sr_context *sr_c)
signed int sr_consume_queue_declare(struct sr_context *sr_c, amqp_boolean_t passive)
/*
declare a queue and bind it to the configured exchange.
declare a queue it to the configured exchange.
passive means don't actually declare the queue, just pretend, used to get the message count
*/
{
amqp_rpc_reply_t reply;
amqp_boolean_t passive = 0;
amqp_boolean_t exclusive = 0;
amqp_boolean_t auto_delete = 0;
amqp_queue_declare_ok_t *r;
struct sr_binding_s *t;
static amqp_basic_properties_t props;
static amqp_table_t table;
static amqp_table_entry_t table_entries[2];
signed int message_count;

int tecnt = 0;

Expand Down Expand Up @@ -121,6 +122,7 @@ int sr_consume_setup(struct sr_context *sr_c)
msg.user_headers = NULL;

//amqp_queue_declare_ok_t *r =
message_count=-2;
if (sr_c->cfg->queueDeclare) {
r = amqp_queue_declare(sr_c->cfg->broker->conn,
2,
Expand All @@ -131,13 +133,29 @@ int sr_consume_setup(struct sr_context *sr_c)
if (r) {
sr_log_msg(LOG_INFO, "queue declared: %p messages in queue: %d\n",
sr_c->cfg->queuename, r->message_count );
message_count = r->message_count;
sr_c->metrics.brokerQueuedMessageCount = message_count;
}
reply = amqp_get_rpc_reply(sr_c->cfg->broker->conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
sr_amqp_reply_print(reply, "queue declare failed");
return (0);
message_count = -1;
}
}
return(message_count);
}

int sr_consume_setup(struct sr_context *sr_c)
{
struct sr_binding_s *t;
amqp_rpc_reply_t reply;
int messageCount;

messageCount = sr_consume_queue_declare(sr_c, 0);

if (messageCount< 0 ) {
return(0);
}

/*
FIXME: topic bindings are not working properly...
Expand Down Expand Up @@ -698,6 +716,8 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
amqp_rpc_reply_t reply;
amqp_frame_t frame;
int result;
static time_t next_qdeclare_time=0;
static time_t now=0;
static char buf[SR_SARRAC_MAXIMUM_MESSAGE_LEN];
amqp_basic_deliver_t *d;
amqp_basic_properties_t *p;
Expand All @@ -709,6 +729,31 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
char tag[AMQP_MAX_SS];
char value[AMQP_MAX_SS];
struct sr_header_s *tmph;
static time_t this_second = 0;
static int consumed_this_second = 0;



if (now == 0) {
time(&now);
}

// rate limiting.
sr_log_msg( LOG_INFO, "rateMax: %d, consumed_this_second: %d\n",
sr_c->cfg->messageRateMax, consumed_this_second );
if (sr_c->cfg->messageRateMax > 0) {
if (consumed_this_second >= sr_c->cfg->messageRateMax) {
sr_log_msg(LOG_INFO, "messageRateMax %d per second sleeping for a second.\n",
sr_c->cfg->messageRateMax);
sleep(1);
time(&now);
}
if (now > this_second) {
this_second = now;
consumed_this_second = 0;
}
consumed_this_second++;
}

while (msg.user_headers) {
tmph = msg.user_headers;
Expand All @@ -717,6 +762,14 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
msg.user_headers = tmph->next;
free(tmph);
}
time(&now);
if (next_qdeclare_time == 0) {
next_qdeclare_time=now+20;
} else if ( now > next_qdeclare_time )
{
sr_consume_queue_declare(sr_c, 1);
next_qdeclare_time += 20;
}

/*
basic_ack added as per michel's algorithm prior to consuming next.
Expand Down
12 changes: 7 additions & 5 deletions sr_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ void sr_context_metrics_cumulative_write(struct sr_context *sr_c)
datestamp[8] = c;

f = fopen( cumulativeFilename, "a+" );
fprintf( f, "\"%s\": { \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f } }, \n" ,
datestamp, sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping
fprintf( f, "\"%s\": { \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f, \"brokerQueuedMessageCount\": %d } }, \n" ,
datestamp, sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount,
sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping, sr_c->metrics.brokerQueuedMessageCount
);
fclose(f);

Expand All @@ -299,6 +300,7 @@ void sr_context_metrics_reset(struct sr_context *sr_c)
struct timespec tnow;

sr_context_metrics_cumulative_write(sr_c);
sr_c->metrics.brokerQueuedMessageCount = 0;
sr_c->metrics.rxGoodCount = 0;
sr_c->metrics.rxBadCount = 0;
sr_c->metrics.rejectCount = 0;
Expand Down Expand Up @@ -457,9 +459,9 @@ void sr_context_metrics_write(struct sr_context *sr_c)
FILE *f;

f = fopen( sr_c->cfg->metricsFilename, "w" );
fprintf( f, "{ \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f } }\n" ,
sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping
);
fprintf( f, "{ \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f , \"brokerQueuedMessageCount\": %d } }\n" ,
sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount,
sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping, sr_c->metrics.brokerQueuedMessageCount );
fclose(f);
}

Expand Down
1 change: 1 addition & 0 deletions sr_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "sr_config.h"

struct sr_metrics_s {
int brokerQueuedMessageCount;
int rxGoodCount;
int rxBadCount;
int rejectCount;
Expand Down

0 comments on commit 6549a46

Please sign in to comment.