1515
1616from os import environ
1717from os .path import basename
18- from time import sleep , time
18+ import threading
19+ import time
20+ import uuid
1921
2022from google .api_core .exceptions import AlreadyExists
2123from google .api_core .exceptions import InvalidArgument
3537TRY_LIMIT = 20
3638
3739
40+ class MessageReceiver :
41+ """Custom class to handle incoming Pub/Sub messages."""
42+ def __init__ (self , expected_msg_nums , done_event ):
43+ # initialize counter to 0 on initialization
44+ self .msg_count = 0
45+ self .expected_msg_nums = expected_msg_nums
46+ self .done_event = done_event
47+
48+ def pubsub_callback (self , message ):
49+ # every time a pubsub message comes in, print it and count it
50+ self .msg_count += 1
51+ print ('Message {}: {}' .format (self .msg_count , message .data ))
52+ message .ack ()
53+ if (self .msg_count == self .expected_msg_nums ):
54+ self .done_event .set ()
55+
56+
3857class TestContainerAnalysisSamples :
3958
4059 def setup_method (self , test_method ):
4160 print ('SETUP {}' .format (test_method .__name__ ))
42- timestamp = str (int (time ()))
43- self .note_id = 'note-{}-{}' .format (timestamp , test_method .__name__ )
44- self .image_url = '{}.{}' .format (timestamp , test_method .__name__ )
61+ self .note_id = 'note-{}' .format (uuid .uuid4 ())
62+ self .image_url = '{}.{}' .format (uuid .uuid4 (), test_method .__name__ )
4563 self .note_obj = samples .create_note (self .note_id , PROJECT_ID )
4664
4765 def teardown_method (self , test_method ):
@@ -102,7 +120,7 @@ def test_occurrences_for_image(self):
102120 tries += 1
103121 new_count = samples .get_occurrences_for_image (self .image_url ,
104122 PROJECT_ID )
105- sleep (SLEEP_TIME )
123+ time . sleep (SLEEP_TIME )
106124 assert new_count == 1
107125 assert orig_count == 0
108126 # clean up
@@ -121,7 +139,7 @@ def test_occurrences_for_note(self):
121139 tries += 1
122140 new_count = samples .get_occurrences_for_note (self .note_id ,
123141 PROJECT_ID )
124- sleep (SLEEP_TIME )
142+ time . sleep (SLEEP_TIME )
125143 assert new_count == 1
126144 assert orig_count == 0
127145 # clean up
@@ -138,33 +156,31 @@ def test_pubsub(self):
138156 except AlreadyExists :
139157 pass
140158
141- subscription_id = 'drydockOccurrences'
159+ subscription_id = 'container-analysis-test-{}' . format ( uuid . uuid4 ())
142160 subscription_name = client .subscription_path (PROJECT_ID ,
143161 subscription_id )
144162 samples .create_occurrence_subscription (subscription_id , PROJECT_ID )
145- tries = 0
146- success = False
147- while not success and tries < TRY_LIMIT :
148- print (tries )
149- tries += 1
150- receiver = samples .MessageReceiver ()
163+
164+ # I can not make it pass with multiple messages. My guess is
165+ # the server started to dedup?
166+ message_count = 1
167+ try :
168+ job_done = threading .Event ()
169+ receiver = MessageReceiver (message_count , job_done )
151170 client .subscribe (subscription_name , receiver .pubsub_callback )
152171
153- # test adding 3 more occurrences
154- total_created = 3
155- for _ in range (total_created ):
156- occ = samples .create_occurrence (self .image_url ,
157- self .note_id ,
158- PROJECT_ID ,
159- PROJECT_ID )
160- sleep (SLEEP_TIME )
172+ for i in range (message_count ):
173+ occ = samples .create_occurrence (
174+ self .image_url , self .note_id , PROJECT_ID , PROJECT_ID )
175+ time .sleep (SLEEP_TIME )
161176 samples .delete_occurrence (basename (occ .name ), PROJECT_ID )
162- sleep (SLEEP_TIME )
177+ time .sleep (SLEEP_TIME )
178+ job_done .wait (timeout = 60 )
163179 print ('done. msg_count = {}' .format (receiver .msg_count ))
164- success = receiver .msg_count == total_created
165- assert receiver . msg_count == total_created
166- # clean up
167- client .delete_subscription (subscription_name )
180+ assert message_count < = receiver .msg_count
181+ finally :
182+ # clean up
183+ client .delete_subscription (subscription_name )
168184
169185 def test_poll_discovery_occurrence (self ):
170186 # try with no discovery occurrence
@@ -177,7 +193,7 @@ def test_poll_discovery_occurrence(self):
177193 assert False
178194
179195 # create discovery occurrence
180- note_id = 'discovery-note-{}' .format (int ( time () ))
196+ note_id = 'discovery-note-{}' .format (uuid . uuid4 ( ))
181197 client = containeranalysis_v1 .ContainerAnalysisClient ()
182198 grafeas_client = client .get_grafeas_client ()
183199 note = {
@@ -225,7 +241,7 @@ def test_find_vulnerabilities_for_image(self):
225241 occ_list = samples .find_vulnerabilities_for_image (self .image_url ,
226242 PROJECT_ID )
227243 count = len (occ_list )
228- sleep (SLEEP_TIME )
244+ time . sleep (SLEEP_TIME )
229245 assert len (occ_list ) == 1
230246 samples .delete_occurrence (basename (created .name ), PROJECT_ID )
231247
@@ -236,7 +252,7 @@ def test_find_high_severity_vulnerabilities(self):
236252 assert len (occ_list ) == 0
237253
238254 # create new high severity vulnerability
239- note_id = 'discovery-note-{}' .format (int ( time () ))
255+ note_id = 'discovery-note-{}' .format (uuid . uuid4 ( ))
240256 client = containeranalysis_v1 .ContainerAnalysisClient ()
241257 grafeas_client = client .get_grafeas_client ()
242258 note = {
@@ -287,7 +303,7 @@ def test_find_high_severity_vulnerabilities(self):
287303 occ_list = samples .find_vulnerabilities_for_image (self .image_url ,
288304 PROJECT_ID )
289305 count = len (occ_list )
290- sleep (SLEEP_TIME )
306+ time . sleep (SLEEP_TIME )
291307 assert len (occ_list ) == 1
292308 # clean up
293309 samples .delete_occurrence (basename (created .name ), PROJECT_ID )
0 commit comments