From 684ca893bdb3a3d736cc802ec8a2aded85e397fe Mon Sep 17 00:00:00 2001 From: Timothy Pansino <11214426+TimPansino@users.noreply.github.com> Date: Tue, 27 Sep 2022 09:26:02 -0700 Subject: [PATCH] Kafka Expanded Version Testing (#629) * Expand kafka testing range to older versions * Remove confluent-kafka 1.5 * Remove confluent-kafka 1.5 * Fix flakey confluent-kafka tests * Fixup: fix flakey tests * Fixup: fix kafka-python flakey tests * Fixup: fix kafka-python flakey tests * Remove confluent-kafka 1.8 tests The following is an unresolved issue occuring in the setup of confluent-kafka 1.8.2: https://github.com/asweigart/PyGetWindow/issues/9 Co-authored-by: Hannah Stepanek --- .../messagebroker_confluentkafka/conftest.py | 6 ++- .../test_consumer.py | 10 +++- .../test_serialization.py | 18 +++++-- tests/messagebroker_kafkapython/conftest.py | 17 +++---- .../test_consumer.py | 24 ++++++---- .../test_serialization.py | 47 +++++++++++-------- tox.ini | 11 ++++- 7 files changed, 87 insertions(+), 46 deletions(-) diff --git a/tests/messagebroker_confluentkafka/conftest.py b/tests/messagebroker_confluentkafka/conftest.py index c37831df08..6e19005d85 100644 --- a/tests/messagebroker_confluentkafka/conftest.py +++ b/tests/messagebroker_confluentkafka/conftest.py @@ -63,7 +63,7 @@ def skip_if_not_serializing(client_type): @pytest.fixture(scope="function") -def producer(client_type, json_serializer): +def producer(topic, client_type, json_serializer): from confluent_kafka import Producer, SerializingProducer if client_type == "cimpl": @@ -86,7 +86,9 @@ def producer(client_type, json_serializer): ) yield producer - producer.purge() + + if hasattr(producer, "purge"): + producer.purge() @pytest.fixture(scope="function") diff --git a/tests/messagebroker_confluentkafka/test_consumer.py b/tests/messagebroker_confluentkafka/test_consumer.py index 0edcc73fd8..61f532a784 100644 --- a/tests/messagebroker_confluentkafka/test_consumer.py +++ b/tests/messagebroker_confluentkafka/test_consumer.py @@ -165,7 +165,15 @@ def _consume(): @cache_kafka_consumer_headers() def _test(): # Start the transaction but don't exit it. - consumer.poll(0.5) + # Keep polling until we get the record or the timeout is exceeded. + timeout = 10 + attempts = 0 + record = None + while not record and attempts < timeout: + record = consumer.poll(0.5) + if not record: + attempts += 1 + continue _test() diff --git a/tests/messagebroker_confluentkafka/test_serialization.py b/tests/messagebroker_confluentkafka/test_serialization.py index 350b5ea355..4d948713d1 100644 --- a/tests/messagebroker_confluentkafka/test_serialization.py +++ b/tests/messagebroker_confluentkafka/test_serialization.py @@ -107,8 +107,12 @@ def test_deserialization_errors(skip_if_not_serializing, monkeypatch, topic, pro @background_task() def test(): with pytest.raises(error_cls): - record = consumer.poll(0.5) - assert record is not None, "No record consumed." + timeout = 10 + attempts = 0 + while attempts < timeout: + if not consumer.poll(0.5): + attempts += 1 + continue test() @@ -128,14 +132,20 @@ def _test(): send_producer_message() record_count = 0 - while True: + + timeout = 10 + attempts = 0 + record = None + while not record and attempts < timeout: record = consumer.poll(0.5) if not record: - break + attempts += 1 + continue assert not record.error() assert record.value() == {"foo": 1} record_count += 1 + consumer.poll(0.5) # Exit the transaction. assert record_count == 1, "Incorrect count of records consumed: %d. Expected 1." % record_count diff --git a/tests/messagebroker_kafkapython/conftest.py b/tests/messagebroker_kafkapython/conftest.py index 1caccd6d63..098486f34c 100644 --- a/tests/messagebroker_kafkapython/conftest.py +++ b/tests/messagebroker_kafkapython/conftest.py @@ -138,11 +138,6 @@ def consumer(topic, producer, client_type, json_deserializer, json_callable_dese group_id="test", ) - # The first time the kafka consumer is created and polled, it returns a StopIterator - # exception. To by-pass this, loop over the consumer before using it. - # NOTE: This seems to only happen in Python2.7. - for record in consumer: - pass yield consumer consumer.close() @@ -230,9 +225,15 @@ def _test(): send_producer_message() record_count = 0 - for record in consumer: - assert deserialize(record.value) == {"foo": 1} - record_count += 1 + + timeout = 10 + attempts = 0 + record = None + while not record and attempts < timeout: + for record in consumer: + assert deserialize(record.value) == {"foo": 1} + record_count += 1 + attempts += 1 assert record_count == 1, "Incorrect count of records consumed: %d. Expected 1." % record_count diff --git a/tests/messagebroker_kafkapython/test_consumer.py b/tests/messagebroker_kafkapython/test_consumer.py index bb736fb641..f53b2acb31 100644 --- a/tests/messagebroker_kafkapython/test_consumer.py +++ b/tests/messagebroker_kafkapython/test_consumer.py @@ -14,25 +14,23 @@ import pytest from conftest import cache_kafka_consumer_headers - -from newrelic.common.object_names import callable_name - from testing_support.fixtures import ( + reset_core_stats_engine, validate_attributes, validate_error_event_attributes_outside_transaction, validate_transaction_errors, validate_transaction_metrics, - reset_core_stats_engine, -) -from testing_support.validators.validate_transaction_count import ( - validate_transaction_count, ) from testing_support.validators.validate_distributed_trace_accepted import ( validate_distributed_trace_accepted, ) +from testing_support.validators.validate_transaction_count import ( + validate_transaction_count, +) from newrelic.api.background_task import background_task from newrelic.api.transaction import end_of_transaction +from newrelic.common.object_names import callable_name from newrelic.packages import six @@ -117,8 +115,7 @@ def test_consumer_errors(get_consumer_record, consumer_next_raises): @reset_core_stats_engine() @validate_error_event_attributes_outside_transaction( - num_errors=1, - exact_attrs={"intrinsic": {"error.class": callable_name(exc_class)}, "agent": {}, "user": {}} + num_errors=1, exact_attrs={"intrinsic": {"error.class": callable_name(exc_class)}, "agent": {}, "user": {}} ) def _test(): with pytest.raises(exc_class): @@ -160,7 +157,14 @@ def _consume(): @cache_kafka_consumer_headers def _test(): # Start the transaction but don't exit it. - next(consumer_iter) + timeout = 10 + attempts = 0 + record = None + while not record and attempts < timeout: + try: + record = next(consumer_iter) + except StopIteration: + attempts += 1 _test() diff --git a/tests/messagebroker_kafkapython/test_serialization.py b/tests/messagebroker_kafkapython/test_serialization.py index c4af60f6d6..b83b4e85ce 100644 --- a/tests/messagebroker_kafkapython/test_serialization.py +++ b/tests/messagebroker_kafkapython/test_serialization.py @@ -12,20 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + import pytest from testing_support.fixtures import ( + reset_core_stats_engine, + validate_error_event_attributes_outside_transaction, validate_transaction_errors, validate_transaction_metrics, - validate_error_event_attributes_outside_transaction, - reset_core_stats_engine, ) from newrelic.api.background_task import background_task -from newrelic.packages import six - from newrelic.common.object_names import callable_name +from newrelic.packages import six -import json def test_serialization_metrics(skip_if_not_serializing, topic, send_producer_message): txn_name = "test_serialization:test_serialization_metrics..test" if six.PY3 else "test_serialization:test" @@ -48,10 +48,13 @@ def test(): test() -@pytest.mark.parametrize("key,value", ( - (object(), "A"), - ("A", object()), -)) +@pytest.mark.parametrize( + "key,value", + ( + (object(), "A"), + ("A", object()), + ), +) def test_serialization_errors(skip_if_not_serializing, topic, producer, key, value): error_cls = TypeError @@ -64,13 +67,16 @@ def test(): test() -@pytest.mark.parametrize("key,value", ( - (b"%", b"{}"), - (b"{}", b"%"), -)) +@pytest.mark.parametrize( + "key,value", + ( + (b"%", b"{}"), + (b"{}", b"%"), + ), +) def test_deserialization_errors(skip_if_not_serializing, monkeypatch, topic, producer, consumer, key, value): error_cls = json.decoder.JSONDecodeError if six.PY3 else ValueError - + # Remove serializers to cause intentional issues monkeypatch.setitem(producer.config, "value_serializer", None) monkeypatch.setitem(producer.config, "key_serializer", None) @@ -80,13 +86,16 @@ def test_deserialization_errors(skip_if_not_serializing, monkeypatch, topic, pro @reset_core_stats_engine() @validate_error_event_attributes_outside_transaction( - num_errors=1, - exact_attrs={"intrinsic": {"error.class": callable_name(error_cls)}, "agent": {}, "user": {}} + num_errors=1, exact_attrs={"intrinsic": {"error.class": callable_name(error_cls)}, "agent": {}, "user": {}} ) def test(): with pytest.raises(error_cls): - for record in consumer: - pass - assert record is not None, "No record consumed." + timeout = 10 + attempts = 0 + record = None + while not record and attempts < timeout: + for record in consumer: + pass + attempts += 1 test() diff --git a/tox.ini b/tox.ini index e2fc168861..f046721ae6 100644 --- a/tox.ini +++ b/tox.ini @@ -150,7 +150,9 @@ envlist = rabbitmq-messagebroker_pika-{py27,py37,py38,py39,pypy,pypy37}-pika0.13, rabbitmq-messagebroker_pika-{py37,py38,py39,py310,pypy37}-pikalatest, kafka-messagebroker_confluentkafka-{py27,py37,py38,py39,py310}-confluentkafkalatest, + kafka-messagebroker_confluentkafka-{py27,py39}-confluentkafka{0107,0106}, kafka-messagebroker_kafkapython-{pypy,py27,py37,py38,pypy37}-kafkapythonlatest, + kafka-messagebroker_kafkapython-{py27,py38}-kafkapython{020001,020000,0104}, python-template_mako-{py27,py37,py38,py39,py310} [pytest] @@ -355,8 +357,13 @@ deps = messagebroker_pika-pikalatest: pika messagebroker_pika: tornado<5 messagebroker_pika-{py27,pypy}: enum34 - messagebroker_confluentkafka: confluent-kafka - messagebroker_kafkapython: kafka-python + messagebroker_confluentkafka-confluentkafkalatest: confluent-kafka + messagebroker_confluentkafka-confluentkafka0107: confluent-kafka<1.8 + messagebroker_confluentkafka-confluentkafka0106: confluent-kafka<1.7 + messagebroker_kafkapython-kafkapythonlatest: kafka-python + messagebroker_kafkapython-kafkapython020001: kafka-python<2.0.2 + messagebroker_kafkapython-kafkapython020000: kafka-python<2.0.1 + messagebroker_kafkapython-kafkapython0104: kafka-python<1.5 template_mako: mako<1.2 setenv =