diff --git a/sr_consume.c b/sr_consume.c index 22e0209..10269b2 100644 --- a/sr_consume.c +++ b/sr_consume.c @@ -76,21 +76,22 @@ int sr_consume_cleanup(struct sr_context *sr_c) return (1); } -int sr_consume_setup(struct sr_context *sr_c) +signed int sr_consume_queue_declare(struct sr_context *sr_c, amqp_boolean_t passive) /* - declare a queue and bind it to the configured exchange. + declare a queue it to the configured exchange. + + passive means don't actually declare the queue, just pretend, used to get the message count */ { amqp_rpc_reply_t reply; - amqp_boolean_t passive = 0; amqp_boolean_t exclusive = 0; amqp_boolean_t auto_delete = 0; amqp_queue_declare_ok_t *r; - struct sr_binding_s *t; static amqp_basic_properties_t props; static amqp_table_t table; static amqp_table_entry_t table_entries[2]; + signed int message_count; int tecnt = 0; @@ -121,6 +122,7 @@ int sr_consume_setup(struct sr_context *sr_c) msg.user_headers = NULL; //amqp_queue_declare_ok_t *r = + message_count=-2; if (sr_c->cfg->queueDeclare) { r = amqp_queue_declare(sr_c->cfg->broker->conn, 2, @@ -131,13 +133,29 @@ int sr_consume_setup(struct sr_context *sr_c) if (r) { sr_log_msg(LOG_INFO, "queue declared: %p messages in queue: %d\n", sr_c->cfg->queuename, r->message_count ); + message_count = r->message_count; + sr_c->metrics.brokerQueuedMessageCount = message_count; } reply = amqp_get_rpc_reply(sr_c->cfg->broker->conn); if (reply.reply_type != AMQP_RESPONSE_NORMAL) { sr_amqp_reply_print(reply, "queue declare failed"); - return (0); + message_count = -1; } } + return(message_count); +} + +int sr_consume_setup(struct sr_context *sr_c) +{ + struct sr_binding_s *t; + amqp_rpc_reply_t reply; + int messageCount; + + messageCount = sr_consume_queue_declare(sr_c, 0); + + if (messageCount< 0 ) { + return(0); + } /* FIXME: topic bindings are not working properly... @@ -698,6 +716,8 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c) amqp_rpc_reply_t reply; amqp_frame_t frame; int result; + static time_t next_qdeclare_time=0; + static time_t now=0; static char buf[SR_SARRAC_MAXIMUM_MESSAGE_LEN]; amqp_basic_deliver_t *d; amqp_basic_properties_t *p; @@ -709,6 +729,31 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c) char tag[AMQP_MAX_SS]; char value[AMQP_MAX_SS]; struct sr_header_s *tmph; + static time_t this_second = 0; + static int consumed_this_second = 0; + + + + if (now == 0) { + time(&now); + } + + // rate limiting. + sr_log_msg( LOG_INFO, "rateMax: %d, consumed_this_second: %d\n", + sr_c->cfg->messageRateMax, consumed_this_second ); + if (sr_c->cfg->messageRateMax > 0) { + if (consumed_this_second >= sr_c->cfg->messageRateMax) { + sr_log_msg(LOG_INFO, "messageRateMax %d per second sleeping for a second.\n", + sr_c->cfg->messageRateMax); + sleep(1); + time(&now); + } + if (now > this_second) { + this_second = now; + consumed_this_second = 0; + } + consumed_this_second++; + } while (msg.user_headers) { tmph = msg.user_headers; @@ -717,6 +762,14 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c) msg.user_headers = tmph->next; free(tmph); } + time(&now); + if (next_qdeclare_time == 0) { + next_qdeclare_time=now+20; + } else if ( now > next_qdeclare_time ) + { + sr_consume_queue_declare(sr_c, 1); + next_qdeclare_time += 20; + } /* basic_ack added as per michel's algorithm prior to consuming next. diff --git a/sr_context.c b/sr_context.c index 4f6058b..760eedc 100644 --- a/sr_context.c +++ b/sr_context.c @@ -286,8 +286,9 @@ void sr_context_metrics_cumulative_write(struct sr_context *sr_c) datestamp[8] = c; f = fopen( cumulativeFilename, "a+" ); - fprintf( f, "\"%s\": { \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f } }, \n" , - datestamp, sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping + fprintf( f, "\"%s\": { \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f, \"brokerQueuedMessageCount\": %d } }, \n" , + datestamp, sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, + sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping, sr_c->metrics.brokerQueuedMessageCount ); fclose(f); @@ -299,6 +300,7 @@ void sr_context_metrics_reset(struct sr_context *sr_c) struct timespec tnow; sr_context_metrics_cumulative_write(sr_c); + sr_c->metrics.brokerQueuedMessageCount = 0; sr_c->metrics.rxGoodCount = 0; sr_c->metrics.rxBadCount = 0; sr_c->metrics.rejectCount = 0; @@ -457,9 +459,9 @@ void sr_context_metrics_write(struct sr_context *sr_c) FILE *f; f = fopen( sr_c->cfg->metricsFilename, "w" ); - fprintf( f, "{ \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f } }\n" , - sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping - ); + fprintf( f, "{ \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f , \"brokerQueuedMessageCount\": %d } }\n" , + sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, + sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping, sr_c->metrics.brokerQueuedMessageCount ); fclose(f); } diff --git a/sr_context.h b/sr_context.h index 82c7093..696c8fc 100644 --- a/sr_context.h +++ b/sr_context.h @@ -44,6 +44,7 @@ #include "sr_config.h" struct sr_metrics_s { + int brokerQueuedMessageCount; int rxGoodCount; int rxBadCount; int rejectCount;