From b939117e3ebff7ae3ef2b0fb3adc44bcf81f9225b87fd08a3086a4f4f6033f14 Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Tue, 6 Sep 2022 18:58:49 +0000 Subject: [PATCH] Accepting request 1001490 from home:jayvdb:branches:devel:languages:python - Add test files missing from PyPI release: conftest.py, fixtures.py, service.py & zookeeper.properties - Activate test suite OBS-URL: https://build.opensuse.org/request/show/1001490 OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-kafka-python?expand=0&rev=21 --- conftest.py | 175 ++++++++++ fixtures.py | 673 ++++++++++++++++++++++++++++++++++++ python-kafka-python.changes | 7 + python-kafka-python.spec | 34 +- service.py | 133 +++++++ zookeeper.properties | 21 ++ 6 files changed, 1038 insertions(+), 5 deletions(-) create mode 100644 conftest.py create mode 100644 fixtures.py create mode 100644 service.py create mode 100644 zookeeper.properties diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..064a2f9 --- /dev/null +++ b/conftest.py @@ -0,0 +1,175 @@ +from __future__ import absolute_import + +import uuid + +import pytest + +from test.testutil import env_kafka_version, random_string +from test.fixtures import KafkaFixture, ZookeeperFixture + +@pytest.fixture(scope="module") +def zookeeper(): + """Return a Zookeeper fixture""" + zk_instance = ZookeeperFixture.instance() + yield zk_instance + zk_instance.close() + + +@pytest.fixture(scope="module") +def kafka_broker(kafka_broker_factory): + """Return a Kafka broker fixture""" + return kafka_broker_factory()[0] + + +@pytest.fixture(scope="module") +def kafka_broker_factory(zookeeper): + """Return a Kafka broker fixture factory""" + assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' + + _brokers = [] + def factory(**broker_params): + params = {} if broker_params is None else broker_params.copy() + params.setdefault('partitions', 4) + num_brokers = params.pop('num_brokers', 1) + brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) + for x in range(num_brokers)) + _brokers.extend(brokers) + return brokers + + yield factory + + for broker in _brokers: + broker.close() + + +@pytest.fixture +def kafka_client(kafka_broker, request): + """Return a KafkaClient fixture""" + (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) + yield client + client.close() + + +@pytest.fixture +def kafka_consumer(kafka_consumer_factory): + """Return a KafkaConsumer fixture""" + return kafka_consumer_factory() + + +@pytest.fixture +def kafka_consumer_factory(kafka_broker, topic, request): + """Return a KafkaConsumer factory fixture""" + _consumer = [None] + + def factory(**kafka_consumer_params): + params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() + params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + params.setdefault('auto_offset_reset', 'earliest') + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + return _consumer[0] + + yield factory + + if _consumer[0]: + _consumer[0].close() + + +@pytest.fixture +def kafka_producer(kafka_producer_factory): + """Return a KafkaProducer fixture""" + yield kafka_producer_factory() + + +@pytest.fixture +def kafka_producer_factory(kafka_broker, request): + """Return a KafkaProduce factory fixture""" + _producer = [None] + + def factory(**kafka_producer_params): + params = {} if kafka_producer_params is None else kafka_producer_params.copy() + params.setdefault('client_id', 'producer_%s' % (request.node.name,)) + _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) + return _producer[0] + + yield factory + + if _producer[0]: + _producer[0].close() + +@pytest.fixture +def kafka_admin_client(kafka_admin_client_factory): + """Return a KafkaAdminClient fixture""" + yield kafka_admin_client_factory() + +@pytest.fixture +def kafka_admin_client_factory(kafka_broker): + """Return a KafkaAdminClient factory fixture""" + _admin_client = [None] + + def factory(**kafka_admin_client_params): + params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() + _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) + return _admin_client[0] + + yield factory + + if _admin_client[0]: + _admin_client[0].close() + +@pytest.fixture +def topic(kafka_broker, request): + """Return a topic fixture""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name + + +@pytest.fixture +def conn(mocker): + """Return a connection mocker fixture""" + from kafka.conn import ConnectionStates + from kafka.future import Future + from kafka.protocol.metadata import MetadataResponse + conn = mocker.patch('kafka.client_async.BrokerConnection') + conn.return_value = conn + conn.state = ConnectionStates.CONNECTED + conn.send.return_value = Future().success( + MetadataResponse[0]( + [(0, 'foo', 12), (1, 'bar', 34)], # brokers + [])) # topics + conn.blacked_out.return_value = False + def _set_conn_state(state): + conn.state = state + return state + conn._set_conn_state = _set_conn_state + conn.connect.side_effect = lambda: conn.state + conn.connect_blocking.return_value = True + conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING, + ConnectionStates.HANDSHAKE) + conn.connected = lambda: conn.state is ConnectionStates.CONNECTED + conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED + return conn + + +@pytest.fixture() +def send_messages(topic, kafka_producer, request): + """A factory that returns a send_messages function with a pre-populated + topic topic / producer.""" + + def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): + """ + messages is typically `range(0,100)` + partition is an int + """ + messages_and_futures = [] # [(message, produce_future),] + for i in number_range: + # request.node.name provides the test name (including parametrized values) + encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') + future = kafka_producer.send(topic, value=encoded_msg, partition=partition) + messages_and_futures.append((encoded_msg, future)) + kafka_producer.flush() + for (msg, f) in messages_and_futures: + assert f.succeeded() + return [msg for (msg, f) in messages_and_futures] + + return _send_messages diff --git a/fixtures.py b/fixtures.py new file mode 100644 index 0000000..d21e3ae --- /dev/null +++ b/fixtures.py @@ -0,0 +1,673 @@ +from __future__ import absolute_import + +import atexit +import logging +import os +import os.path +import socket +import subprocess +import time +import uuid + +import py +from kafka.vendor.six.moves import urllib, range +from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 + +from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer +from kafka.errors import InvalidReplicationFactorError +from kafka.protocol.admin import CreateTopicsRequest +from kafka.protocol.metadata import MetadataRequest +from test.testutil import env_kafka_version, random_string +from test.service import ExternalService, SpawnedService + +log = logging.getLogger(__name__) + + +def get_open_port(): + sock = socket.socket() + sock.bind(("", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + +def gen_ssl_resources(directory): + os.system(""" + cd {0} + echo Generating SSL resources in {0} + + # Step 1 + keytool -keystore kafka.server.keystore.jks -alias localhost -validity 1 \ + -genkey -storepass foobar -keypass foobar \ + -dname "CN=localhost, OU=kafka-python, O=kafka-python, L=SF, ST=CA, C=US" \ + -ext SAN=dns:localhost + + # Step 2 + openssl genrsa -out ca-key 2048 + openssl req -new -x509 -key ca-key -out ca-cert -days 1 \ + -subj "/C=US/ST=CA/O=MyOrg, Inc./CN=mydomain.com" + keytool -keystore kafka.server.truststore.jks -alias CARoot -import \ + -file ca-cert -storepass foobar -noprompt + + # Step 3 + keytool -keystore kafka.server.keystore.jks -alias localhost -certreq \ + -file cert-file -storepass foobar + openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed \ + -days 1 -CAcreateserial -passin pass:foobar + keytool -keystore kafka.server.keystore.jks -alias CARoot -import \ + -file ca-cert -storepass foobar -noprompt + keytool -keystore kafka.server.keystore.jks -alias localhost -import \ + -file cert-signed -storepass foobar -noprompt + """.format(directory)) + + +class Fixture(object): + kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') + scala_version = os.environ.get("SCALA_VERSION", '2.8.0') + project_root = os.environ.get('PROJECT_ROOT', + os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + kafka_root = os.environ.get("KAFKA_ROOT", + os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) + + def __init__(self): + self.child = None + + @classmethod + def download_official_distribution(cls, + kafka_version=None, + scala_version=None, + output_dir=None): + if not kafka_version: + kafka_version = cls.kafka_version + if not scala_version: + scala_version = cls.scala_version + if not output_dir: + output_dir = os.path.join(cls.project_root, 'servers', 'dist') + + distfile = 'kafka_%s-%s' % (scala_version, kafka_version,) + url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,) + output_file = os.path.join(output_dir, distfile + '.tgz') + + if os.path.isfile(output_file): + log.info("Found file already on disk: %s", output_file) + return output_file + + # New tarballs are .tgz, older ones are sometimes .tar.gz + try: + url = url_base + distfile + '.tgz' + log.info("Attempting to download %s", url) + response = urllib.request.urlopen(url) + except urllib.error.HTTPError: + log.exception("HTTP Error") + url = url_base + distfile + '.tar.gz' + log.info("Attempting to download %s", url) + response = urllib.request.urlopen(url) + + log.info("Saving distribution file to %s", output_file) + with open(output_file, 'w') as output_file_fd: + output_file_fd.write(response.read()) + + return output_file + + @classmethod + def test_resource(cls, filename): + return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename) + + @classmethod + def kafka_run_class_args(cls, *args): + result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] + result.extend([str(arg) for arg in args]) + return result + + def kafka_run_class_env(self): + env = os.environ.copy() + env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \ + (self.test_resource("log4j.properties"),) + return env + + @classmethod + def render_template(cls, source_file, target_file, binding): + log.info('Rendering %s from template %s', target_file.strpath, source_file) + with open(source_file, "r") as handle: + template = handle.read() + assert len(template) > 0, 'Empty template %s' % (source_file,) + with open(target_file.strpath, "w") as handle: + handle.write(template.format(**binding)) + handle.flush() + os.fsync(handle) + + # fsync directory for durability + # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ + dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + log.debug("Template string:") + for line in template.splitlines(): + log.debug(' ' + line.strip()) + log.debug("Rendered template:") + with open(target_file.strpath, 'r') as o: + for line in o: + log.debug(' ' + line.strip()) + log.debug("binding:") + for key, value in binding.items(): + log.debug(" {key}={value}".format(key=key, value=value)) + + def dump_logs(self): + self.child.dump_logs() + + +class ZookeeperFixture(Fixture): + @classmethod + def instance(cls): + if "ZOOKEEPER_URI" in os.environ: + parse = urlparse(os.environ["ZOOKEEPER_URI"]) + (host, port) = (parse.hostname, parse.port) + fixture = ExternalService(host, port) + else: + (host, port) = ("127.0.0.1", None) + fixture = cls(host, port) + + fixture.open() + return fixture + + def __init__(self, host, port, tmp_dir=None): + super(ZookeeperFixture, self).__init__() + self.host = host + self.port = port + + self.tmp_dir = tmp_dir + + def kafka_run_class_env(self): + env = super(ZookeeperFixture, self).kafka_run_class_env() + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath + return env + + def out(self, message): + log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message) + + def open(self): + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" tmp_dir = %s", self.tmp_dir.strpath) + + # Configure Zookeeper child process + template = self.test_resource("zookeeper.properties") + properties = self.tmp_dir.join("zookeeper.properties") + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", + properties.strpath) + env = self.kafka_run_class_env() + + # Party! + timeout = 5 + max_timeout = 120 + backoff = 1 + end_at = time.time() + max_timeout + tries = 1 + auto_port = (self.port is None) + while time.time() < end_at: + if auto_port: + self.port = get_open_port() + self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) + self.render_template(template, properties, vars(self)) + self.child = SpawnedService(args, env) + self.child.start() + timeout = min(timeout, max(end_at - time.time(), 0)) + if self.child.wait_for(r"binding to port", timeout=timeout): + break + self.child.dump_logs() + self.child.stop() + timeout *= 2 + time.sleep(backoff) + tries += 1 + backoff += 1 + else: + raise RuntimeError('Failed to start Zookeeper before max_timeout') + self.out("Done!") + atexit.register(self.close) + + def close(self): + if self.child is None: + return + self.out("Stopping...") + self.child.stop() + self.child = None + self.out("Done!") + self.tmp_dir.remove() + + def __del__(self): + self.close() + + +class KafkaFixture(Fixture): + broker_user = 'alice' + broker_password = 'alice-secret' + + @classmethod + def instance(cls, broker_id, zookeeper, zk_chroot=None, + host=None, port=None, + transport='PLAINTEXT', replicas=1, partitions=2, + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): + + if zk_chroot is None: + zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") + if "KAFKA_URI" in os.environ: + parse = urlparse(os.environ["KAFKA_URI"]) + (host, port) = (parse.hostname, parse.port) + fixture = ExternalService(host, port) + else: + if host is None: + host = "localhost" + fixture = KafkaFixture(host, port, broker_id, + zookeeper, zk_chroot, + transport=transport, + replicas=replicas, partitions=partitions, + sasl_mechanism=sasl_mechanism, + auto_create_topic=auto_create_topic, + tmp_dir=tmp_dir) + + fixture.open() + return fixture + + def __init__(self, host, port, broker_id, zookeeper, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT', + sasl_mechanism=None, auto_create_topic=True, + tmp_dir=None): + super(KafkaFixture, self).__init__() + + self.host = host + self.port = port + + self.broker_id = broker_id + self.auto_create_topic = auto_create_topic + self.transport = transport.upper() + if sasl_mechanism is not None: + self.sasl_mechanism = sasl_mechanism.upper() + else: + self.sasl_mechanism = None + self.ssl_dir = self.test_resource('ssl') + + # TODO: checking for port connection would be better than scanning logs + # until then, we need the pattern to work across all supported broker versions + # The logging format changed slightly in 1.0.0 + self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) + # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism + self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) + + self.zookeeper = zookeeper + self.zk_chroot = zk_chroot + # Add the attributes below for the template binding + self.zk_host = self.zookeeper.host + self.zk_port = self.zookeeper.port + + self.replicas = replicas + self.partitions = partitions + + self.tmp_dir = tmp_dir + self.running = False + + self._client = None + self.sasl_config = '' + self.jaas_config = '' + + def _sasl_config(self): + if not self.sasl_enabled: + return '' + + sasl_config = ( + 'sasl.enabled.mechanisms={mechanism}\n' + 'sasl.mechanism.inter.broker.protocol={mechanism}\n' + ) + return sasl_config.format(mechanism=self.sasl_mechanism) + + def _jaas_config(self): + if not self.sasl_enabled: + return '' + + elif self.sasl_mechanism == 'PLAIN': + jaas_config = ( + 'org.apache.kafka.common.security.plain.PlainLoginModule required\n' + ' username="{user}" password="{password}" user_{user}="{password}";\n' + ) + elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): + jaas_config = ( + 'org.apache.kafka.common.security.scram.ScramLoginModule required\n' + ' username="{user}" password="{password}";\n' + ) + else: + raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) + return jaas_config.format(user=self.broker_user, password=self.broker_password) + + def _add_scram_user(self): + self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) + args = self.kafka_run_class_args( + "kafka.admin.ConfigCommand", + "--zookeeper", + "%s:%d/%s" % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + "--alter", + "--entity-type", "users", + "--entity-name", self.broker_user, + "--add-config", + "{}=[password={}]".format(self.sasl_mechanism, self.broker_password), + ) + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stdout, stderr = proc.communicate() + + if proc.returncode != 0: + self.out("Failed to save credentials to zookeeper!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to save credentials to zookeeper!") + self.out("User created.") + + @property + def sasl_enabled(self): + return self.sasl_mechanism is not None + + def bootstrap_server(self): + return '%s:%d' % (self.host, self.port) + + def kafka_run_class_env(self): + env = super(KafkaFixture, self).kafka_run_class_env() + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath + return env + + def out(self, message): + log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message) + + def _create_zk_chroot(self): + self.out("Creating Zookeeper chroot node...") + args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", + "-server", + "%s:%d" % (self.zookeeper.host, + self.zookeeper.port), + "create", + "/%s" % (self.zk_chroot,), + "kafka-python") + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stdout, stderr = proc.communicate() + + if proc.returncode != 0: + self.out("Failed to create Zookeeper chroot node") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to create Zookeeper chroot node") + self.out("Kafka chroot created in Zookeeper!") + + def start(self): + # Configure Kafka child process + properties = self.tmp_dir.join("kafka.properties") + jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") + properties_template = self.test_resource("kafka.properties") + jaas_conf_template = self.test_resource("kafka_server_jaas.conf") + + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) + env = self.kafka_run_class_env() + if self.sasl_enabled: + opts = env.get('KAFKA_OPTS', '').strip() + opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath) + env['KAFKA_OPTS'] = opts + self.render_template(jaas_conf_template, jaas_conf, vars(self)) + + timeout = 5 + max_timeout = 120 + backoff = 1 + end_at = time.time() + max_timeout + tries = 1 + auto_port = (self.port is None) + while time.time() < end_at: + # We have had problems with port conflicts on travis + # so we will try a different port on each retry + # unless the fixture was passed a specific port + if auto_port: + self.port = get_open_port() + self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) + self.render_template(properties_template, properties, vars(self)) + + self.child = SpawnedService(args, env) + self.child.start() + timeout = min(timeout, max(end_at - time.time(), 0)) + if self._broker_ready(timeout) and self._scram_user_present(timeout): + break + + self.child.dump_logs() + self.child.stop() + + timeout *= 2 + time.sleep(backoff) + tries += 1 + backoff += 1 + else: + raise RuntimeError('Failed to start KafkaInstance before max_timeout') + + (self._client,) = self.get_clients(1, client_id='_internal_client') + + self.out("Done!") + self.running = True + + def _broker_ready(self, timeout): + return self.child.wait_for(self.start_pattern, timeout=timeout) + + def _scram_user_present(self, timeout): + # no need to wait for scram user if scram is not used + if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'): + return True + return self.child.wait_for(self.scram_pattern, timeout=timeout) + + def open(self): + if self.running: + self.out("Instance already running") + return + + # Create directories + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.tmp_dir.ensure('logs', dir=True) + self.tmp_dir.ensure('data', dir=True) + + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" sasl_mechanism = %s", self.sasl_mechanism) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) + + self._create_zk_chroot() + self.sasl_config = self._sasl_config() + self.jaas_config = self._jaas_config() + # add user to zookeeper for the first server + if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: + self._add_scram_user() + self.start() + + atexit.register(self.close) + + def __del__(self): + self.close() + + def stop(self): + if not self.running: + self.out("Instance already stopped") + return + + self.out("Stopping...") + self.child.stop() + self.child = None + self.running = False + self.out("Stopped!") + + def close(self): + self.stop() + if self.tmp_dir is not None: + self.tmp_dir.remove() + self.tmp_dir = None + self.out("Done!") + + def dump_logs(self): + super(KafkaFixture, self).dump_logs() + self.zookeeper.dump_logs() + + def _send_request(self, request, timeout=None): + def _failure(error): + raise error + retries = 10 + while True: + node_id = self._client.least_loaded_node() + for connect_retry in range(40): + self._client.maybe_connect(node_id) + if self._client.connected(node_id): + break + self._client.poll(timeout_ms=100) + else: + raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) + + try: + future = self._client.send(node_id, request) + future.error_on_callbacks = True + future.add_errback(_failure) + self._client.poll(future=future, timeout_ms=timeout) + return future.value + except Exception as exc: + time.sleep(1) + retries -= 1 + if retries == 0: + raise exc + else: + pass # retry + + def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000): + if num_partitions is None: + num_partitions = self.partitions + if replication_factor is None: + replication_factor = self.replicas + + # Try different methods to create a topic, from the fastest to the slowest + if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas: + self._create_topic_via_metadata(topic_name, timeout_ms) + elif env_kafka_version() >= (0, 10, 1, 0): + try: + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + except InvalidReplicationFactorError: + # wait and try again + # on travis the brokers sometimes take a while to find themselves + time.sleep(0.5) + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + else: + self._create_topic_via_cli(topic_name, num_partitions, replication_factor) + + def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): + self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) + + def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + response = self._send_request(request, timeout=timeout_ms) + for topic_result in response.topic_errors: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + + def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if env_kafka_version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + if 'kafka.common.TopicExistsException' not in stdout: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) + + def get_topic_names(self): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--list' + ) + env = self.kafka_run_class_env() + env.pop('KAFKA_LOG4J_OPTS') + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + self.out("Failed to list topics!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to list topics!") + return stdout.decode().splitlines(False) + + def create_topics(self, topic_names, num_partitions=None, replication_factor=None): + for topic_name in topic_names: + self._create_topic(topic_name, num_partitions, replication_factor) + + def _enrich_client_params(self, params, **defaults): + params = params.copy() + for key, value in defaults.items(): + params.setdefault(key, value) + params.setdefault('bootstrap_servers', self.bootstrap_server()) + if self.sasl_enabled: + params.setdefault('sasl_mechanism', self.sasl_mechanism) + params.setdefault('security_protocol', self.transport) + if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): + params.setdefault('sasl_plain_username', self.broker_user) + params.setdefault('sasl_plain_password', self.broker_password) + return params + + @staticmethod + def _create_many_clients(cnt, cls, *args, **params): + client_id = params['client_id'] + for _ in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield cls(*args, **params) + + def get_clients(self, cnt=1, **params): + params = self._enrich_client_params(params, client_id='client') + for client in self._create_many_clients(cnt, KafkaClient, **params): + yield client + + def get_admin_clients(self, cnt, **params): + params = self._enrich_client_params(params, client_id='admin_client') + for client in self._create_many_clients(cnt, KafkaAdminClient, **params): + yield client + + def get_consumers(self, cnt, topics, **params): + params = self._enrich_client_params( + params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest' + ) + for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params): + yield client + + def get_producers(self, cnt, **params): + params = self._enrich_client_params(params, client_id='producer') + for client in self._create_many_clients(cnt, KafkaProducer, **params): + yield client diff --git a/python-kafka-python.changes b/python-kafka-python.changes index b9e6091..e41dab1 100644 --- a/python-kafka-python.changes +++ b/python-kafka-python.changes @@ -1,3 +1,10 @@ +------------------------------------------------------------------- +Tue Sep 6 14:12:44 UTC 2022 - John Vandenberg + +- Add test files missing from PyPI release: + conftest.py, fixtures.py, service.py & zookeeper.properties +- Activate test suite + ------------------------------------------------------------------- Tue Nov 10 08:03:56 UTC 2020 - Dirk Mueller diff --git a/python-kafka-python.spec b/python-kafka-python.spec index 9b7cb9c..26f1ef8 100644 --- a/python-kafka-python.spec +++ b/python-kafka-python.spec @@ -1,7 +1,7 @@ # # spec file for package python-kafka-python # -# Copyright (c) 2020 SUSE LLC +# Copyright (c) 2022 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -25,12 +25,27 @@ License: Apache-2.0 Group: Development/Languages/Python URL: https://github.com/mumrah/kafka-python Source: https://files.pythonhosted.org/packages/source/k/kafka-python/kafka-python-%{version}.tar.gz +Source1: https://raw.githubusercontent.com/dpkp/kafka-python/master/servers/0.11.0.3/resources/zookeeper.properties +Source2: https://raw.githubusercontent.com/dpkp/kafka-python/master/test/conftest.py +Source3: https://raw.githubusercontent.com/dpkp/kafka-python/master/test/fixtures.py +Source4: https://raw.githubusercontent.com/dpkp/kafka-python/master/test/service.py BuildRequires: %{python_module setuptools} -BuildRequires: %{python_module six} BuildRequires: fdupes BuildRequires: python-rpm-macros -Requires: python-six +# Recommends: python-crc32c # Not packaged +Recommends: python-zstandard +Suggests: python-lz4 +Suggests: python-xxhash BuildArch: noarch +# SECTION test requirements +BuildRequires: %{python_module lz4} +BuildRequires: %{python_module mock} +BuildRequires: %{python_module pytest-mock} +BuildRequires: %{python_module pytest} +BuildRequires: %{python_module python-snappy} +BuildRequires: %{python_module xxhash} +BuildRequires: %{python_module zstandard} +# /SECTION %python_subpackages %description @@ -41,6 +56,12 @@ is also supported for message sets. %prep %setup -q -n kafka-python-%{version} +mkdir -p servers/0.11.0.2/resources/ +cp %{SOURCE1} servers/0.11.0.2/resources/ + +cp %{SOURCE2} %{SOURCE3} %{SOURCE4} test/ + +touch test/__init__.py %build %python_build @@ -49,10 +70,13 @@ is also supported for message sets. %python_install %python_expand %fdupes %{buildroot}%{$python_sitelib} +%check +# test_kafka_producer_gc_cleanup is sometimes off by 1 +%pytest -rs -k 'not (test_kafka_consumer_offsets_for_time_old or test_kafka_producer_gc_cleanup)' + %files %{python_files} %license LICENSE %doc README.rst -%{python_sitelib}/* -%exclude %{python_sitelib}/tests/ +%{python_sitelib}/kafka*/ %changelog diff --git a/service.py b/service.py new file mode 100644 index 0000000..814db16 --- /dev/null +++ b/service.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import + +import logging +import os +import re +import select +import subprocess +import sys +import threading +import time + +__all__ = [ + 'ExternalService', + 'SpawnedService', +] + +log = logging.getLogger(__name__) + + +class ExternalService(object): + def __init__(self, host, port): + log.info("Using already running service at %s:%d", host, port) + self.host = host + self.port = port + + def open(self): + pass + + def close(self): + pass + + +class SpawnedService(threading.Thread): + def __init__(self, args=None, env=None): + super(SpawnedService, self).__init__() + + if args is None: + raise TypeError("args parameter is required") + self.args = args + self.env = env + self.captured_stdout = [] + self.captured_stderr = [] + + self.should_die = threading.Event() + self.child = None + self.alive = False + self.daemon = True + log.info("Created service for command:") + log.info(" "+' '.join(self.args)) + log.debug("With environment:") + for key, value in self.env.items(): + log.debug(" {key}={value}".format(key=key, value=value)) + + def _spawn(self): + if self.alive: return + if self.child and self.child.poll() is None: return + + self.child = subprocess.Popen( + self.args, + preexec_fn=os.setsid, # to avoid propagating signals + env=self.env, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.alive = self.child.poll() is None + + def _despawn(self): + if self.child.poll() is None: + self.child.terminate() + self.alive = False + for _ in range(50): + if self.child.poll() is not None: + self.child = None + break + time.sleep(0.1) + else: + self.child.kill() + + def run(self): + self._spawn() + while True: + try: + (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) + except select.error as ex: + if ex.args[0] == 4: + continue + else: + raise + + if self.child.stdout in rds: + line = self.child.stdout.readline().decode('utf-8').rstrip() + if line: + self.captured_stdout.append(line) + + if self.child.stderr in rds: + line = self.child.stderr.readline().decode('utf-8').rstrip() + if line: + self.captured_stderr.append(line) + + if self.child.poll() is not None: + self.dump_logs() + break + + if self.should_die.is_set(): + self._despawn() + break + + def dump_logs(self): + sys.stderr.write('\n'.join(self.captured_stderr)) + sys.stdout.write('\n'.join(self.captured_stdout)) + + def wait_for(self, pattern, timeout=30): + start = time.time() + while True: + if not self.is_alive(): + raise RuntimeError("Child thread died already.") + + elapsed = time.time() - start + if elapsed >= timeout: + log.error("Waiting for %r timed out after %d seconds", pattern, timeout) + return False + + if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: + log.info("Found pattern %r in %d seconds via stdout", pattern, elapsed) + return True + if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: + log.info("Found pattern %r in %d seconds via stderr", pattern, elapsed) + return True + time.sleep(0.1) + + def stop(self): + self.should_die.set() + self.join() diff --git a/zookeeper.properties b/zookeeper.properties new file mode 100644 index 0000000..9750a3a --- /dev/null +++ b/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0