Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Minor modifications following Ewen's code review
  • Loading branch information
edenhill committed May 9, 2016
commit e6d38775b0ac0c418a408ac0e05e34ef67661025
3 changes: 0 additions & 3 deletions confluent_kafka/kafkatest/README
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
This directory contains clients implementing the official Kafka tests/kafkatest
Verifiable client test semantics.

FIXME: Instructions on how to use this.
1 change: 1 addition & 0 deletions confluent_kafka/kafkatest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" This directory contains clients implementing the official Kafka tests/kafkatest."""
4 changes: 2 additions & 2 deletions confluent_kafka/kafkatest/kafkatest_verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from confluent_kafka import Consumer, KafkaError, KafkaException
from verifiable_client import VerifiableClient

class VerifiableConsumer (VerifiableClient):
class VerifiableConsumer(VerifiableClient):
"""
confluent-kafka-python backed VerifiableConsumer class for use with
Kafka's kafkatests client tests.
Expand Down Expand Up @@ -202,7 +202,7 @@ def msg_consume (self, msg):
self.do_commit(immediate=False)


class AssignedPartition (object):
class AssignedPartition(object):
""" Local state container for assigned partition. """
def __init__ (self, topic, partition):
super(AssignedPartition, self).__init__()
Expand Down
2 changes: 1 addition & 1 deletion confluent_kafka/kafkatest/kafkatest_verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from confluent_kafka import Producer, KafkaError, KafkaException
from verifiable_client import VerifiableClient

class VerifiableProducer (VerifiableClient):
class VerifiableProducer(VerifiableClient):
"""
confluent-kafka-python backed VerifiableProducer class for use with
Kafka's kafkatests client tests.
Expand Down
14 changes: 7 additions & 7 deletions confluent_kafka/kafkatest/verifiable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import signal, socket, os, sys, time, json, re, datetime


class VerifiableClient (object):
class VerifiableClient(object):
"""
Generic base class for a kafkatest verifiable client.
Implements the common kafkatest protocol and semantics.
Expand Down Expand Up @@ -61,20 +61,20 @@ def send (self, d):
@staticmethod
def set_config (conf, args):
""" Set client config properties using args dict. """
for n in args:
if args[n] is None:
for n,v in args.iteritems():
if v is None:
continue
# Things to ignore
if '.' not in n:
# App config, skip
continue
if n[:6] == 'topic.':
if n.startswith('topic.'):
# Set "topic.<...>" properties on default topic conf dict
conf['default.topic.config'][n[6:]] = args[n]
conf['default.topic.config'][n[6:]] = v
elif n == 'partition.assignment.strategy':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem like things we should try to make a bit more generic in kafkatest and make the Java client also handle the necessary mapping. I'd like to avoid java-specific details like this leaking out into a bunch of other clients.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that'd be nice, will postpone until kafkatest alt client integration.

# Convert Java class name to config value.
# "org.apache.kafka.clients.consumer.RangeAssignor" -> "range"
conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor',
lambda x: x.group(1).lower(), args[n])
lambda x: x.group(1).lower(), v)
else:
conf[n] = args[n]
conf[n] = v