|
| 1 | +from __future__ import absolute_import |
| 2 | + |
1 | 3 | import pytest |
2 | 4 |
|
3 | | -from kafka import KafkaConsumer |
4 | | -from kafka.errors import KafkaConfigurationError |
| 5 | +from kafka import KafkaConsumer, TopicPartition |
| 6 | +from kafka.errors import KafkaConfigurationError, IllegalStateError |
| 7 | + |
| 8 | + |
| 9 | +def test_session_timeout_larger_than_request_timeout_raises(): |
| 10 | + with pytest.raises(KafkaConfigurationError): |
| 11 | + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000) |
| 12 | + |
| 13 | + |
| 14 | +def test_fetch_max_wait_larger_than_request_timeout_raises(): |
| 15 | + with pytest.raises(KafkaConfigurationError): |
| 16 | + KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000) |
| 17 | + |
| 18 | + |
| 19 | +def test_request_timeout_larger_than_connections_max_idle_ms_raises(): |
| 20 | + with pytest.raises(KafkaConfigurationError): |
| 21 | + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000) |
| 22 | + |
5 | 23 |
|
| 24 | +def test_subscription_copy(): |
| 25 | + consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) |
| 26 | + sub = consumer.subscription() |
| 27 | + assert sub is not consumer.subscription() |
| 28 | + assert sub == set(['foo']) |
| 29 | + sub.add('fizz') |
| 30 | + assert consumer.subscription() == set(['foo']) |
6 | 31 |
|
7 | | -class TestKafkaConsumer: |
8 | | - def test_session_timeout_larger_than_request_timeout_raises(self): |
9 | | - with pytest.raises(KafkaConfigurationError): |
10 | | - KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000) |
11 | 32 |
|
12 | | - def test_fetch_max_wait_larger_than_request_timeout_raises(self): |
13 | | - with pytest.raises(KafkaConfigurationError): |
14 | | - KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000) |
| 33 | +def test_assign(): |
| 34 | + # Consumer w/ subscription to topic 'foo' |
| 35 | + consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) |
| 36 | + assert consumer.assignment() == set() |
| 37 | + # Cannot assign manually |
| 38 | + with pytest.raises(IllegalStateError): |
| 39 | + consumer.assign([TopicPartition('foo', 0)]) |
15 | 40 |
|
16 | | - def test_request_timeout_larger_than_connections_max_idle_ms_raises(self): |
17 | | - with pytest.raises(KafkaConfigurationError): |
18 | | - KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000) |
| 41 | + assert 'foo' in consumer._client._topics |
19 | 42 |
|
20 | | - def test_subscription_copy(self): |
21 | | - consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) |
22 | | - sub = consumer.subscription() |
23 | | - assert sub is not consumer.subscription() |
24 | | - assert sub == set(['foo']) |
25 | | - sub.add('fizz') |
26 | | - assert consumer.subscription() == set(['foo']) |
| 43 | + consumer = KafkaConsumer(api_version=(0, 10, 0)) |
| 44 | + assert consumer.assignment() == set() |
| 45 | + consumer.assign([TopicPartition('foo', 0)]) |
| 46 | + assert consumer.assignment() == set([TopicPartition('foo', 0)]) |
| 47 | + assert 'foo' in consumer._client._topics |
| 48 | + # Cannot subscribe |
| 49 | + with pytest.raises(IllegalStateError): |
| 50 | + consumer.subscribe(topics=['foo']) |
| 51 | + consumer.assign([]) |
| 52 | + assert consumer.assignment() == set() |
0 commit comments