Skip to content

Commit 51af43e

Browse files
committed
chore: further encapsulate BlobDescriptorStream
Move it to its own file, and make it implement StateCheckingResponseObserver<BidiReadObjectResponse> directly rather than delegating to a static class (this is necessary to allow response error handling to feedback into stream lifecycle and restarts)
1 parent 0bfb9ff commit 51af43e

File tree

2 files changed

+237
-214
lines changed

2 files changed

+237
-214
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 3 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,17 @@
2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
2323
import com.google.api.gax.rpc.BidiStreamingCallable;
24-
import com.google.api.gax.rpc.ClientStream;
25-
import com.google.api.gax.rpc.ResponseObserver;
26-
import com.google.api.gax.rpc.StateCheckingResponseObserver;
27-
import com.google.api.gax.rpc.StreamController;
2824
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
2925
import com.google.common.annotations.VisibleForTesting;
3026
import com.google.protobuf.ByteString;
3127
import com.google.storage.v2.BidiReadObjectRequest;
3228
import com.google.storage.v2.BidiReadObjectResponse;
33-
import com.google.storage.v2.ObjectRangeData;
3429
import com.google.storage.v2.ReadRange;
3530
import java.io.IOException;
3631
import java.util.ArrayList;
3732
import java.util.Collections;
3833
import java.util.List;
39-
import java.util.concurrent.ExecutionException;
4034
import java.util.concurrent.Executor;
41-
import java.util.concurrent.TimeUnit;
42-
import java.util.concurrent.TimeoutException;
4335

4436
final class BlobDescriptorImpl implements BlobDescriptor {
4537

@@ -79,219 +71,16 @@ static ApiFuture<BlobDescriptor> create(
7971
Executor executor) {
8072
BlobDescriptorState state = new BlobDescriptorState(openRequest);
8173

82-
BlobDescriptorResponseObserver responseObserver =
83-
new BlobDescriptorResponseObserver(state, executor, bidiResponseContentLifecycleManager);
84-
85-
BlobDescriptorStream stream = new BlobDescriptorStream(callable, context, responseObserver);
74+
BlobDescriptorStream stream =
75+
BlobDescriptorStream.create(
76+
executor, bidiResponseContentLifecycleManager, callable, context, state);
8677

8778
ApiFuture<BlobDescriptor> blobDescriptorFuture =
8879
ApiFutures.transform(stream, nowOpen -> new BlobDescriptorImpl(stream, state), executor);
8980
stream.send(openRequest);
9081
return StorageException.coalesceAsync(blobDescriptorFuture);
9182
}
9283

93-
private static final class BlobDescriptorStream
94-
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void> {
95-
private final SettableApiFuture<Void> openSignal;
96-
97-
private final BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable;
98-
private final GrpcCallContext context;
99-
private final ResponseObserver<BidiReadObjectResponse> responseObserver;
100-
private final OpenMonitorResponseObserver openMonitorResponseObserver;
101-
102-
private volatile ClientStream<BidiReadObjectRequest> requestStream;
103-
104-
public BlobDescriptorStream(
105-
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
106-
GrpcCallContext context,
107-
BlobDescriptorResponseObserver responseObserver) {
108-
this.callable = callable;
109-
this.context = context;
110-
this.responseObserver = responseObserver;
111-
this.openMonitorResponseObserver = new OpenMonitorResponseObserver(responseObserver);
112-
this.openSignal = SettableApiFuture.create();
113-
}
114-
115-
public ClientStream<BidiReadObjectRequest> getRequestStream() {
116-
if (requestStream != null) {
117-
return requestStream;
118-
} else {
119-
synchronized (this) {
120-
if (requestStream == null) {
121-
requestStream = callable.splitCall(openMonitorResponseObserver, context);
122-
}
123-
return requestStream;
124-
}
125-
}
126-
}
127-
128-
@Override
129-
public void send(BidiReadObjectRequest request) {
130-
getRequestStream().send(request);
131-
}
132-
133-
@Override
134-
public void closeSendWithError(Throwable t) {
135-
getRequestStream().closeSendWithError(t);
136-
}
137-
138-
@Override
139-
public void closeSend() {
140-
getRequestStream().closeSend();
141-
}
142-
143-
@Override
144-
public boolean isSendReady() {
145-
return getRequestStream().isSendReady();
146-
}
147-
148-
@Override
149-
public void addListener(Runnable listener, Executor executor) {
150-
openSignal.addListener(listener, executor);
151-
}
152-
153-
@Override
154-
public boolean cancel(boolean mayInterruptIfRunning) {
155-
return openSignal.cancel(mayInterruptIfRunning);
156-
}
157-
158-
@Override
159-
public Void get() throws InterruptedException, ExecutionException {
160-
return openSignal.get();
161-
}
162-
163-
@Override
164-
public Void get(long timeout, TimeUnit unit)
165-
throws InterruptedException, ExecutionException, TimeoutException {
166-
return openSignal.get(timeout, unit);
167-
}
168-
169-
@Override
170-
public boolean isCancelled() {
171-
return openSignal.isCancelled();
172-
}
173-
174-
@Override
175-
public boolean isDone() {
176-
return openSignal.isDone();
177-
}
178-
179-
private class OpenMonitorResponseObserver
180-
extends StateCheckingResponseObserver<BidiReadObjectResponse> {
181-
182-
private final BlobDescriptorResponseObserver responseObserver;
183-
184-
private OpenMonitorResponseObserver(BlobDescriptorResponseObserver responseObserver) {
185-
this.responseObserver = responseObserver;
186-
}
187-
188-
@Override
189-
protected void onStartImpl(StreamController controller) {
190-
responseObserver.onStartImpl(controller);
191-
}
192-
193-
@Override
194-
protected void onResponseImpl(BidiReadObjectResponse response) {
195-
responseObserver.onResponseImpl(response);
196-
openSignal.set(null);
197-
}
198-
199-
@Override
200-
protected void onErrorImpl(Throwable t) {
201-
responseObserver.onErrorImpl(t);
202-
openSignal.setException(t);
203-
}
204-
205-
@Override
206-
protected void onCompleteImpl() {
207-
responseObserver.onCompleteImpl();
208-
openSignal.set(null);
209-
}
210-
}
211-
}
212-
213-
private static final class BlobDescriptorResponseObserver
214-
extends StateCheckingResponseObserver<BidiReadObjectResponse> {
215-
216-
private StreamController controller;
217-
private final BlobDescriptorState state;
218-
private final Executor exec;
219-
private final ResponseContentLifecycleManager<BidiReadObjectResponse>
220-
bidiResponseContentLifecycleManager;
221-
222-
private BlobDescriptorResponseObserver(
223-
BlobDescriptorState state,
224-
Executor exec,
225-
ResponseContentLifecycleManager<BidiReadObjectResponse>
226-
bidiResponseContentLifecycleManager) {
227-
this.state = state;
228-
this.exec = exec;
229-
this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager;
230-
}
231-
232-
@Override
233-
protected void onStartImpl(StreamController controller) {
234-
this.controller = controller;
235-
controller.disableAutoInboundFlowControl();
236-
controller.request(2);
237-
}
238-
239-
@Override
240-
protected void onResponseImpl(BidiReadObjectResponse response) {
241-
controller.request(1);
242-
try (ResponseContentLifecycleHandle<BidiReadObjectResponse> handle =
243-
bidiResponseContentLifecycleManager.get(response)) {
244-
if (response.hasMetadata()) {
245-
state.setMetadata(response.getMetadata());
246-
}
247-
if (response.hasReadHandle()) {
248-
state.setBidiReadHandle(response.getReadHandle());
249-
}
250-
List<ObjectRangeData> rangeData = response.getObjectDataRangesList();
251-
if (rangeData.isEmpty()) {
252-
return;
253-
}
254-
for (int i = 0; i < rangeData.size(); i++) {
255-
ObjectRangeData d = rangeData.get(i);
256-
long id = d.getReadRange().getReadId();
257-
OutstandingReadToArray read = state.getOutstandingRead(id);
258-
if (read == null) {
259-
continue;
260-
}
261-
final int idx = i;
262-
//noinspection rawtypes
263-
ChildRef childRef =
264-
handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
265-
read.accept(childRef);
266-
if (d.getRangeEnd()) {
267-
// invoke eof on exec, the resolving future could have a downstream callback
268-
// that we don't want to block this grpc thread
269-
exec.execute(
270-
() -> {
271-
try {
272-
read.eof();
273-
// don't remove the outstanding read until the future has been resolved
274-
state.removeOutstandingRead(id);
275-
} catch (IOException e) {
276-
// TODO: sync this up with stream restarts when the time comes
277-
throw StorageException.coalesce(e);
278-
}
279-
});
280-
}
281-
}
282-
} catch (IOException e) {
283-
// TODO: sync this up with stream restarts when the time comes
284-
throw StorageException.coalesce(e);
285-
}
286-
}
287-
288-
@Override
289-
protected void onErrorImpl(Throwable t) {}
290-
291-
@Override
292-
protected void onCompleteImpl() {}
293-
}
294-
29584
@VisibleForTesting
29685
static final class OutstandingReadToArray {
29786
private final long readId;

0 commit comments

Comments
 (0)