@@ -770,6 +770,113 @@ def test_large_messages(self):
770770 self .assertEquals (all_messages [i ], message .message )
771771 self .assertEquals (i , 19 )
772772
773+ class TestFailover (unittest .TestCase ):
774+
775+ @classmethod
776+ def setUpClass (cls ):
777+
778+ zk_chroot = random_string (10 )
779+ replicas = 2
780+ partitions = 2
781+
782+ # mini zookeeper, 2 kafka brokers
783+ cls .zk = ZookeeperFixture .instance ()
784+ kk_args = [cls .zk .host , cls .zk .port , zk_chroot , replicas , partitions ]
785+ cls .brokers = [KafkaFixture .instance (i , * kk_args ) for i in range (replicas )]
786+ cls .client = KafkaClient (cls .brokers [0 ].host , cls .brokers [0 ].port )
787+
788+ @classmethod
789+ def tearDownClass (cls ):
790+ cls .client .close ()
791+ for broker in cls .brokers :
792+ broker .close ()
793+ cls .zk .close ()
794+
795+ def test_switch_leader (self ):
796+
797+ key , topic , partition = random_string (5 ), 'test_switch_leader' , 0
798+ producer = SimpleProducer (self .client , topic )
799+
800+ for i in range (1 , 4 ):
801+
802+ # XXX unfortunately, the conns dict needs to be warmed for this to work
803+ # XXX unfortunately, for warming to work, we need at least as many partitions as brokers
804+ self ._send_random_messages (producer , 10 )
805+
806+ # kil leader for partition 0
807+ broker = self ._kill_leader (topic , partition )
808+
809+ # expect failure, reload meta data
810+ with self .assertRaises (FailedPayloadsException ):
811+ producer .send_messages ('part 1' )
812+ producer .send_messages ('part 2' )
813+ time .sleep (1 )
814+
815+ # send to new leader
816+ self ._send_random_messages (producer , 10 )
817+
818+ broker .open ()
819+ time .sleep (3 )
820+
821+ # count number of messages
822+ count = self ._count_messages ('test_switch_leader group %s' % i , topic )
823+ self .assertIn (count , range (20 * i , 22 * i + 1 ))
824+
825+ producer .stop ()
826+
827+ def test_switch_leader_async (self ):
828+
829+ key , topic , partition = random_string (5 ), 'test_switch_leader_async' , 0
830+ producer = SimpleProducer (self .client , topic , async = True )
831+
832+ for i in range (1 , 4 ):
833+
834+ self ._send_random_messages (producer , 10 )
835+
836+ # kil leader for partition 0
837+ broker = self ._kill_leader (topic , partition )
838+
839+ # expect failure, reload meta data
840+ producer .send_messages ('part 1' )
841+ producer .send_messages ('part 2' )
842+ time .sleep (1 )
843+
844+ # send to new leader
845+ self ._send_random_messages (producer , 10 )
846+
847+ broker .open ()
848+ time .sleep (3 )
849+
850+ # count number of messages
851+ count = self ._count_messages ('test_switch_leader_async group %s' % i , topic )
852+ self .assertIn (count , range (20 * i , 22 * i + 1 ))
853+
854+ producer .stop ()
855+
856+ def _send_random_messages (self , producer , n ):
857+ for j in range (n ):
858+ resp = producer .send_messages (random_string (10 ))
859+ if len (resp ) > 0 :
860+ self .assertEquals (resp [0 ].error , 0 )
861+ time .sleep (1 ) # give it some time
862+
863+ def _kill_leader (self , topic , partition ):
864+ leader = self .client .topics_to_brokers [TopicAndPartition (topic , partition )]
865+ broker = self .brokers [leader .nodeId ]
866+ broker .close ()
867+ time .sleep (1 ) # give it some time
868+ return broker
869+
870+ def _count_messages (self , group , topic ):
871+ client = KafkaClient (self .brokers [0 ].host , self .brokers [0 ].port )
872+ consumer = SimpleConsumer (client , group , topic , auto_commit = False )
873+ all_messages = []
874+ for message in consumer :
875+ all_messages .append (message )
876+ consumer .stop ()
877+ client .close ()
878+ return len (all_messages )
879+
773880
774881def random_string (l ):
775882 s = "" .join (random .choice (string .letters ) for i in xrange (l ))
0 commit comments