|
21 | 21 | import com.google.api.core.SettableApiFuture; |
22 | 22 | import com.google.api.gax.grpc.GrpcCallContext; |
23 | 23 | import com.google.api.gax.rpc.BidiStreamingCallable; |
24 | | -import com.google.api.gax.rpc.ClientStream; |
25 | | -import com.google.api.gax.rpc.ResponseObserver; |
26 | | -import com.google.api.gax.rpc.StateCheckingResponseObserver; |
27 | | -import com.google.api.gax.rpc.StreamController; |
28 | 24 | import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef; |
29 | 25 | import com.google.common.annotations.VisibleForTesting; |
30 | 26 | import com.google.protobuf.ByteString; |
31 | 27 | import com.google.storage.v2.BidiReadObjectRequest; |
32 | 28 | import com.google.storage.v2.BidiReadObjectResponse; |
33 | | -import com.google.storage.v2.ObjectRangeData; |
34 | 29 | import com.google.storage.v2.ReadRange; |
35 | 30 | import java.io.IOException; |
36 | 31 | import java.util.ArrayList; |
37 | 32 | import java.util.Collections; |
38 | 33 | import java.util.List; |
39 | | -import java.util.concurrent.ExecutionException; |
40 | 34 | import java.util.concurrent.Executor; |
41 | | -import java.util.concurrent.TimeUnit; |
42 | | -import java.util.concurrent.TimeoutException; |
43 | 35 |
|
44 | 36 | final class BlobDescriptorImpl implements BlobDescriptor { |
45 | 37 |
|
@@ -79,219 +71,16 @@ static ApiFuture<BlobDescriptor> create( |
79 | 71 | Executor executor) { |
80 | 72 | BlobDescriptorState state = new BlobDescriptorState(openRequest); |
81 | 73 |
|
82 | | - BlobDescriptorResponseObserver responseObserver = |
83 | | - new BlobDescriptorResponseObserver(state, executor, bidiResponseContentLifecycleManager); |
84 | | - |
85 | | - BlobDescriptorStream stream = new BlobDescriptorStream(callable, context, responseObserver); |
| 74 | + BlobDescriptorStream stream = |
| 75 | + BlobDescriptorStream.create( |
| 76 | + executor, bidiResponseContentLifecycleManager, callable, context, state); |
86 | 77 |
|
87 | 78 | ApiFuture<BlobDescriptor> blobDescriptorFuture = |
88 | 79 | ApiFutures.transform(stream, nowOpen -> new BlobDescriptorImpl(stream, state), executor); |
89 | 80 | stream.send(openRequest); |
90 | 81 | return StorageException.coalesceAsync(blobDescriptorFuture); |
91 | 82 | } |
92 | 83 |
|
93 | | - private static final class BlobDescriptorStream |
94 | | - implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void> { |
95 | | - private final SettableApiFuture<Void> openSignal; |
96 | | - |
97 | | - private final BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable; |
98 | | - private final GrpcCallContext context; |
99 | | - private final ResponseObserver<BidiReadObjectResponse> responseObserver; |
100 | | - private final OpenMonitorResponseObserver openMonitorResponseObserver; |
101 | | - |
102 | | - private volatile ClientStream<BidiReadObjectRequest> requestStream; |
103 | | - |
104 | | - public BlobDescriptorStream( |
105 | | - BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable, |
106 | | - GrpcCallContext context, |
107 | | - BlobDescriptorResponseObserver responseObserver) { |
108 | | - this.callable = callable; |
109 | | - this.context = context; |
110 | | - this.responseObserver = responseObserver; |
111 | | - this.openMonitorResponseObserver = new OpenMonitorResponseObserver(responseObserver); |
112 | | - this.openSignal = SettableApiFuture.create(); |
113 | | - } |
114 | | - |
115 | | - public ClientStream<BidiReadObjectRequest> getRequestStream() { |
116 | | - if (requestStream != null) { |
117 | | - return requestStream; |
118 | | - } else { |
119 | | - synchronized (this) { |
120 | | - if (requestStream == null) { |
121 | | - requestStream = callable.splitCall(openMonitorResponseObserver, context); |
122 | | - } |
123 | | - return requestStream; |
124 | | - } |
125 | | - } |
126 | | - } |
127 | | - |
128 | | - @Override |
129 | | - public void send(BidiReadObjectRequest request) { |
130 | | - getRequestStream().send(request); |
131 | | - } |
132 | | - |
133 | | - @Override |
134 | | - public void closeSendWithError(Throwable t) { |
135 | | - getRequestStream().closeSendWithError(t); |
136 | | - } |
137 | | - |
138 | | - @Override |
139 | | - public void closeSend() { |
140 | | - getRequestStream().closeSend(); |
141 | | - } |
142 | | - |
143 | | - @Override |
144 | | - public boolean isSendReady() { |
145 | | - return getRequestStream().isSendReady(); |
146 | | - } |
147 | | - |
148 | | - @Override |
149 | | - public void addListener(Runnable listener, Executor executor) { |
150 | | - openSignal.addListener(listener, executor); |
151 | | - } |
152 | | - |
153 | | - @Override |
154 | | - public boolean cancel(boolean mayInterruptIfRunning) { |
155 | | - return openSignal.cancel(mayInterruptIfRunning); |
156 | | - } |
157 | | - |
158 | | - @Override |
159 | | - public Void get() throws InterruptedException, ExecutionException { |
160 | | - return openSignal.get(); |
161 | | - } |
162 | | - |
163 | | - @Override |
164 | | - public Void get(long timeout, TimeUnit unit) |
165 | | - throws InterruptedException, ExecutionException, TimeoutException { |
166 | | - return openSignal.get(timeout, unit); |
167 | | - } |
168 | | - |
169 | | - @Override |
170 | | - public boolean isCancelled() { |
171 | | - return openSignal.isCancelled(); |
172 | | - } |
173 | | - |
174 | | - @Override |
175 | | - public boolean isDone() { |
176 | | - return openSignal.isDone(); |
177 | | - } |
178 | | - |
179 | | - private class OpenMonitorResponseObserver |
180 | | - extends StateCheckingResponseObserver<BidiReadObjectResponse> { |
181 | | - |
182 | | - private final BlobDescriptorResponseObserver responseObserver; |
183 | | - |
184 | | - private OpenMonitorResponseObserver(BlobDescriptorResponseObserver responseObserver) { |
185 | | - this.responseObserver = responseObserver; |
186 | | - } |
187 | | - |
188 | | - @Override |
189 | | - protected void onStartImpl(StreamController controller) { |
190 | | - responseObserver.onStartImpl(controller); |
191 | | - } |
192 | | - |
193 | | - @Override |
194 | | - protected void onResponseImpl(BidiReadObjectResponse response) { |
195 | | - responseObserver.onResponseImpl(response); |
196 | | - openSignal.set(null); |
197 | | - } |
198 | | - |
199 | | - @Override |
200 | | - protected void onErrorImpl(Throwable t) { |
201 | | - responseObserver.onErrorImpl(t); |
202 | | - openSignal.setException(t); |
203 | | - } |
204 | | - |
205 | | - @Override |
206 | | - protected void onCompleteImpl() { |
207 | | - responseObserver.onCompleteImpl(); |
208 | | - openSignal.set(null); |
209 | | - } |
210 | | - } |
211 | | - } |
212 | | - |
213 | | - private static final class BlobDescriptorResponseObserver |
214 | | - extends StateCheckingResponseObserver<BidiReadObjectResponse> { |
215 | | - |
216 | | - private StreamController controller; |
217 | | - private final BlobDescriptorState state; |
218 | | - private final Executor exec; |
219 | | - private final ResponseContentLifecycleManager<BidiReadObjectResponse> |
220 | | - bidiResponseContentLifecycleManager; |
221 | | - |
222 | | - private BlobDescriptorResponseObserver( |
223 | | - BlobDescriptorState state, |
224 | | - Executor exec, |
225 | | - ResponseContentLifecycleManager<BidiReadObjectResponse> |
226 | | - bidiResponseContentLifecycleManager) { |
227 | | - this.state = state; |
228 | | - this.exec = exec; |
229 | | - this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager; |
230 | | - } |
231 | | - |
232 | | - @Override |
233 | | - protected void onStartImpl(StreamController controller) { |
234 | | - this.controller = controller; |
235 | | - controller.disableAutoInboundFlowControl(); |
236 | | - controller.request(2); |
237 | | - } |
238 | | - |
239 | | - @Override |
240 | | - protected void onResponseImpl(BidiReadObjectResponse response) { |
241 | | - controller.request(1); |
242 | | - try (ResponseContentLifecycleHandle<BidiReadObjectResponse> handle = |
243 | | - bidiResponseContentLifecycleManager.get(response)) { |
244 | | - if (response.hasMetadata()) { |
245 | | - state.setMetadata(response.getMetadata()); |
246 | | - } |
247 | | - if (response.hasReadHandle()) { |
248 | | - state.setBidiReadHandle(response.getReadHandle()); |
249 | | - } |
250 | | - List<ObjectRangeData> rangeData = response.getObjectDataRangesList(); |
251 | | - if (rangeData.isEmpty()) { |
252 | | - return; |
253 | | - } |
254 | | - for (int i = 0; i < rangeData.size(); i++) { |
255 | | - ObjectRangeData d = rangeData.get(i); |
256 | | - long id = d.getReadRange().getReadId(); |
257 | | - OutstandingReadToArray read = state.getOutstandingRead(id); |
258 | | - if (read == null) { |
259 | | - continue; |
260 | | - } |
261 | | - final int idx = i; |
262 | | - //noinspection rawtypes |
263 | | - ChildRef childRef = |
264 | | - handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent()); |
265 | | - read.accept(childRef); |
266 | | - if (d.getRangeEnd()) { |
267 | | - // invoke eof on exec, the resolving future could have a downstream callback |
268 | | - // that we don't want to block this grpc thread |
269 | | - exec.execute( |
270 | | - () -> { |
271 | | - try { |
272 | | - read.eof(); |
273 | | - // don't remove the outstanding read until the future has been resolved |
274 | | - state.removeOutstandingRead(id); |
275 | | - } catch (IOException e) { |
276 | | - // TODO: sync this up with stream restarts when the time comes |
277 | | - throw StorageException.coalesce(e); |
278 | | - } |
279 | | - }); |
280 | | - } |
281 | | - } |
282 | | - } catch (IOException e) { |
283 | | - // TODO: sync this up with stream restarts when the time comes |
284 | | - throw StorageException.coalesce(e); |
285 | | - } |
286 | | - } |
287 | | - |
288 | | - @Override |
289 | | - protected void onErrorImpl(Throwable t) {} |
290 | | - |
291 | | - @Override |
292 | | - protected void onCompleteImpl() {} |
293 | | - } |
294 | | - |
295 | 84 | @VisibleForTesting |
296 | 85 | static final class OutstandingReadToArray { |
297 | 86 | private final long readId; |
|
0 commit comments