@@ -107,16 +107,61 @@ def test_constructor_and_default_state():
107107 assert manager ._client_id is not None
108108
109109
110- def test_constructor_with_options ():
110+ def test_constructor_with_default_options ():
111+ flow_control_ = types .FlowControl ()
111112 manager = streaming_pull_manager .StreamingPullManager (
112113 mock .sentinel .client ,
113114 mock .sentinel .subscription ,
114- flow_control = mock . sentinel . flow_control ,
115+ flow_control = flow_control_ ,
115116 scheduler = mock .sentinel .scheduler ,
116117 )
117118
118- assert manager .flow_control == mock . sentinel . flow_control
119+ assert manager .flow_control == flow_control_
119120 assert manager ._scheduler == mock .sentinel .scheduler
121+ assert manager ._ack_deadline == 10
122+ assert manager ._stream_ack_deadline == 60
123+
124+
125+ def test_constructor_with_min_and_max_duration_per_lease_extension_ ():
126+ flow_control_ = types .FlowControl (
127+ min_duration_per_lease_extension = 15 , max_duration_per_lease_extension = 20
128+ )
129+ manager = streaming_pull_manager .StreamingPullManager (
130+ mock .sentinel .client ,
131+ mock .sentinel .subscription ,
132+ flow_control = flow_control_ ,
133+ scheduler = mock .sentinel .scheduler ,
134+ )
135+ assert manager ._ack_deadline == 15
136+ assert manager ._stream_ack_deadline == 20
137+
138+
139+ def test_constructor_with_min_duration_per_lease_extension_too_low ():
140+ flow_control_ = types .FlowControl (
141+ min_duration_per_lease_extension = 9 , max_duration_per_lease_extension = 9
142+ )
143+ manager = streaming_pull_manager .StreamingPullManager (
144+ mock .sentinel .client ,
145+ mock .sentinel .subscription ,
146+ flow_control = flow_control_ ,
147+ scheduler = mock .sentinel .scheduler ,
148+ )
149+ assert manager ._ack_deadline == 10
150+ assert manager ._stream_ack_deadline == 10
151+
152+
153+ def test_constructor_with_max_duration_per_lease_extension_too_high ():
154+ flow_control_ = types .FlowControl (
155+ max_duration_per_lease_extension = 601 , min_duration_per_lease_extension = 601
156+ )
157+ manager = streaming_pull_manager .StreamingPullManager (
158+ mock .sentinel .client ,
159+ mock .sentinel .subscription ,
160+ flow_control = flow_control_ ,
161+ scheduler = mock .sentinel .scheduler ,
162+ )
163+ assert manager ._ack_deadline == 600
164+ assert manager ._stream_ack_deadline == 600
120165
121166
122167def make_manager (** kwargs ):
@@ -164,9 +209,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting():
164209 manager ._flow_control = types .FlowControl (
165210 min_duration_per_lease_extension = 0 , max_duration_per_lease_extension = 0
166211 )
212+ assert manager ._stream_ack_deadline == 60
213+ assert manager ._ack_deadline == 10
214+ assert manager ._obtain_ack_deadline (maybe_update = False ) == 10
167215
168216 deadline = manager ._obtain_ack_deadline (maybe_update = True )
169217 assert deadline == histogram .MIN_ACK_DEADLINE
218+ assert manager ._stream_ack_deadline == 60
170219
171220 # When we get some historical data, the deadline is adjusted.
172221 manager .ack_histogram .add (histogram .MIN_ACK_DEADLINE * 2 )
@@ -186,11 +235,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension():
186235 manager ._flow_control = types .FlowControl (
187236 max_duration_per_lease_extension = histogram .MIN_ACK_DEADLINE + 1
188237 )
238+ assert manager ._ack_deadline == 10
239+
189240 manager .ack_histogram .add (histogram .MIN_ACK_DEADLINE * 3 ) # make p99 value large
190241
191242 # The deadline configured in flow control should prevail.
192243 deadline = manager ._obtain_ack_deadline (maybe_update = True )
193244 assert deadline == histogram .MIN_ACK_DEADLINE + 1
245+ assert manager ._stream_ack_deadline == 60
194246
195247
196248def test__obtain_ack_deadline_with_min_duration_per_lease_extension ():
@@ -292,12 +344,12 @@ def test__obtain_ack_deadline_no_value_update():
292344
293345def test_client_id ():
294346 manager1 = make_manager ()
295- request1 = manager1 ._get_initial_request (stream_ack_deadline_seconds = 10 )
347+ request1 = manager1 ._get_initial_request (stream_ack_deadline_seconds = 60 )
296348 client_id_1 = request1 .client_id
297349 assert client_id_1
298350
299351 manager2 = make_manager ()
300- request2 = manager2 ._get_initial_request (stream_ack_deadline_seconds = 10 )
352+ request2 = manager2 ._get_initial_request (stream_ack_deadline_seconds = 60 )
301353 client_id_2 = request2 .client_id
302354 assert client_id_2
303355
@@ -308,7 +360,7 @@ def test_streaming_flow_control():
308360 manager = make_manager (
309361 flow_control = types .FlowControl (max_messages = 10 , max_bytes = 1000 )
310362 )
311- request = manager ._get_initial_request (stream_ack_deadline_seconds = 10 )
363+ request = manager ._get_initial_request (stream_ack_deadline_seconds = 60 )
312364 assert request .max_outstanding_messages == 10
313365 assert request .max_outstanding_bytes == 1000
314366
@@ -318,7 +370,7 @@ def test_streaming_flow_control_use_legacy_flow_control():
318370 flow_control = types .FlowControl (max_messages = 10 , max_bytes = 1000 ),
319371 use_legacy_flow_control = True ,
320372 )
321- request = manager ._get_initial_request (stream_ack_deadline_seconds = 10 )
373+ request = manager ._get_initial_request (stream_ack_deadline_seconds = 60 )
322374 assert request .max_outstanding_messages == 0
323375 assert request .max_outstanding_bytes == 0
324376
@@ -1046,12 +1098,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog):
10461098 result = manager .heartbeat ()
10471099
10481100 manager ._rpc .send .assert_called_once_with (
1049- gapic_types .StreamingPullRequest (stream_ack_deadline_seconds = 10 )
1101+ gapic_types .StreamingPullRequest (stream_ack_deadline_seconds = 60 )
10501102 )
10511103 assert result
10521104 # Set to false after a send is initiated.
10531105 assert not manager ._send_new_ack_deadline
1054- assert "Sending new ack_deadline of 10 seconds." in caplog .text
1106+ assert "Sending new ack_deadline of 60 seconds." in caplog .text
10551107
10561108
10571109@mock .patch ("google.api_core.bidi.ResumableBidiRpc" , autospec = True )
0 commit comments