@@ -109,6 +109,72 @@ def test_ordered_messages_one_key():
109109 assert moh .size == 0
110110
111111
112+ def test_ordered_messages_drop_duplicate_keys (caplog ):
113+ moh = messages_on_hold .MessagesOnHold ()
114+
115+ msg1 = make_message (ack_id = "ack1" , ordering_key = "key1" )
116+ moh .put (msg1 )
117+ assert moh .size == 1
118+
119+ msg2 = make_message (ack_id = "ack2" , ordering_key = "key1" )
120+ moh .put (msg2 )
121+ assert moh .size == 2
122+
123+ # Get first message for "key1"
124+ assert moh .get () == msg1
125+ assert moh .size == 1
126+
127+ # Still waiting on the previously-sent message for "key1", and there are no
128+ # other messages, so return None.
129+ assert moh .get () is None
130+ assert moh .size == 1
131+
132+ # Activate "key1".
133+ callback_tracker = ScheduleMessageCallbackTracker ()
134+ moh .activate_ordering_keys (["key1" , "key1" ], callback_tracker )
135+ assert callback_tracker .called
136+ assert callback_tracker .message == msg2
137+ assert moh .size == 0
138+ assert len (moh ._pending_ordered_messages ) == 0
139+
140+ # Activate "key1" again
141+ callback_tracker = ScheduleMessageCallbackTracker ()
142+ moh .activate_ordering_keys (["key1" ], callback_tracker )
143+ assert not callback_tracker .called
144+
145+ # Activate "key1" again. There are no other messages for that key, so clean
146+ # up state for that key.
147+ callback_tracker = ScheduleMessageCallbackTracker ()
148+ moh .activate_ordering_keys (["key1" ], callback_tracker )
149+ assert not callback_tracker .called
150+
151+ msg3 = make_message (ack_id = "ack3" , ordering_key = "key1" )
152+ moh .put (msg3 )
153+ assert moh .size == 1
154+
155+ # Get next message for "key1"
156+ assert moh .get () == msg3
157+ assert moh .size == 0
158+
159+ # Activate "key1".
160+ callback_tracker = ScheduleMessageCallbackTracker ()
161+ moh .activate_ordering_keys (["key1" ], callback_tracker )
162+ assert not callback_tracker .called
163+
164+ # Activate "key1" again. There are no other messages for that key, so clean
165+ # up state for that key.
166+ callback_tracker = ScheduleMessageCallbackTracker ()
167+ moh .activate_ordering_keys (["key1" ], callback_tracker )
168+ assert not callback_tracker .called
169+
170+ # Activate "key1" again after being cleaned up. There are no other messages for that key, so clean
171+ # up state for that key.
172+ callback_tracker = ScheduleMessageCallbackTracker ()
173+ moh .activate_ordering_keys (["key1" ], callback_tracker )
174+ assert not callback_tracker .called
175+ assert "No message queue exists for message ordering key: key1" in caplog .text
176+
177+
112178def test_ordered_messages_two_keys ():
113179 moh = messages_on_hold .MessagesOnHold ()
114180
@@ -278,3 +344,39 @@ def test_ordered_and_unordered_messages_interleaved():
278344 # No messages left.
279345 assert moh .get () is None
280346 assert moh .size == 0
347+
348+
349+ def test_cleanup_nonexistent_key (caplog ):
350+ moh = messages_on_hold .MessagesOnHold ()
351+ moh ._clean_up_ordering_key ("non-existent-key" )
352+ assert (
353+ "Tried to clean up ordering key that does not exist: non-existent-key"
354+ in caplog .text
355+ )
356+
357+
358+ def test_cleanup_key_with_messages (caplog ):
359+ moh = messages_on_hold .MessagesOnHold ()
360+
361+ # Put message with "key1".
362+ msg1 = make_message (ack_id = "ack1" , ordering_key = "key1" )
363+ moh .put (msg1 )
364+ assert moh .size == 1
365+
366+ # Put another message "key1"
367+ msg2 = make_message (ack_id = "ack2" , ordering_key = "key1" )
368+ moh .put (msg2 )
369+ assert moh .size == 2
370+
371+ # Get first message for "key1"
372+ assert moh .get () == msg1
373+ assert moh .size == 1
374+
375+ # Get first message for "key1"
376+ assert moh .get () is None
377+ assert moh .size == 1
378+
379+ moh ._clean_up_ordering_key ("key1" )
380+ assert (
381+ "Tried to clean up ordering key: key1 with 1 messages remaining." in caplog .text
382+ )
0 commit comments