Skip to content

Commit 9416a8f

Browse files
committed
chore: introduce BlobDescriptorStreamRead
Base abstraction for an outstanding read of a BlobDescriptor stream. Sealed hierarchy with factory methods providing instances. In the future streaming reads can fit in alongside the current accumulating implementations.
1 parent 411615d commit 9416a8f

File tree

6 files changed

+292
-111
lines changed

6 files changed

+292
-111
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 18 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,11 @@
2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
2323
import com.google.api.gax.rpc.BidiStreamingCallable;
24-
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
25-
import com.google.common.annotations.VisibleForTesting;
26-
import com.google.protobuf.ByteString;
24+
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
25+
import com.google.cloud.storage.BlobDescriptorStreamRead.AccumulatingRead;
2726
import com.google.storage.v2.BidiReadObjectRequest;
2827
import com.google.storage.v2.BidiReadObjectResponse;
29-
import com.google.storage.v2.ReadRange;
3028
import java.io.IOException;
31-
import java.util.ArrayList;
32-
import java.util.Collections;
33-
import java.util.List;
3429
import java.util.concurrent.Executor;
3530

3631
final class BlobDescriptorImpl implements BlobDescriptor {
@@ -49,11 +44,23 @@ private BlobDescriptorImpl(BlobDescriptorStream stream, BlobDescriptorState stat
4944
public ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range) {
5045
long readId = state.newReadId();
5146
SettableApiFuture<byte[]> future = SettableApiFuture.create();
52-
OutstandingReadToArray value =
53-
new OutstandingReadToArray(readId, range.beginOffset(), range.length(), future);
47+
AccumulatingRead<byte[]> read =
48+
BlobDescriptorStreamRead.createByteArrayAccumulatingRead(readId, range, future);
5449
BidiReadObjectRequest request =
55-
BidiReadObjectRequest.newBuilder().addReadRanges(value.makeReadRange()).build();
56-
state.putOutstandingRead(readId, value);
50+
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
51+
state.putOutstandingRead(readId, read);
52+
stream.send(request);
53+
return future;
54+
}
55+
56+
public ApiFuture<DisposableByteString> readRangeAsByteString(ByteRangeSpec range) {
57+
long readId = state.newReadId();
58+
SettableApiFuture<DisposableByteString> future = SettableApiFuture.create();
59+
AccumulatingRead<DisposableByteString> read =
60+
BlobDescriptorStreamRead.createZeroCopyByteStringAccumulatingRead(readId, range, future);
61+
BidiReadObjectRequest request =
62+
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
63+
state.putOutstandingRead(readId, read);
5764
stream.send(request);
5865
return future;
5966
}
@@ -85,47 +92,4 @@ static ApiFuture<BlobDescriptor> create(
8592
stream.send(openRequest);
8693
return StorageException.coalesceAsync(blobDescriptorFuture);
8794
}
88-
89-
@VisibleForTesting
90-
static final class OutstandingReadToArray {
91-
private final long readId;
92-
private final ReadCursor readCursor;
93-
private final List<ChildRef> childRefs;
94-
private final SettableApiFuture<byte[]> complete;
95-
96-
@VisibleForTesting
97-
OutstandingReadToArray(
98-
long readId, long readOffset, long readLimit, SettableApiFuture<byte[]> complete) {
99-
this.readId = readId;
100-
this.readCursor = new ReadCursor(readOffset, readOffset + readLimit);
101-
this.childRefs = Collections.synchronizedList(new ArrayList<>());
102-
this.complete = complete;
103-
}
104-
105-
public void accept(ChildRef childRef) throws IOException {
106-
int size = childRef.byteString().size();
107-
childRefs.add(childRef);
108-
readCursor.advance(size);
109-
}
110-
111-
public void eof() throws IOException {
112-
try {
113-
ByteString base = ByteString.empty();
114-
for (ChildRef ref : childRefs) {
115-
base = base.concat(ref.byteString());
116-
}
117-
complete.set(base.toByteArray());
118-
} finally {
119-
GrpcUtils.closeAll(childRefs);
120-
}
121-
}
122-
123-
public ReadRange makeReadRange() {
124-
return ReadRange.newBuilder()
125-
.setReadId(readId)
126-
.setReadOffset(readCursor.position())
127-
.setReadLength(readCursor.remaining())
128-
.build();
129-
}
130-
}
13195
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import com.google.cloud.storage.BlobDescriptorImpl.OutstandingReadToArray;
2019
import com.google.storage.v2.BidiReadHandle;
2120
import com.google.storage.v2.BidiReadObjectRequest;
2221
import com.google.storage.v2.Object;
@@ -31,7 +30,7 @@ final class BlobDescriptorState {
3130
private final AtomicReference<BidiReadHandle> bidiReadHandle;
3231
private final AtomicReference<Object> metadata;
3332
private final AtomicLong readIdSeq;
34-
private final Map<Long, OutstandingReadToArray> outstandingReads;
33+
private final Map<Long, BlobDescriptorStreamRead> outstandingReads;
3534

3635
BlobDescriptorState(BidiReadObjectRequest openRequest) {
3736
this.openRequest = openRequest;
@@ -61,11 +60,11 @@ long newReadId() {
6160
return readIdSeq.getAndIncrement();
6261
}
6362

64-
OutstandingReadToArray getOutstandingRead(long key) {
63+
BlobDescriptorStreamRead getOutstandingRead(long key) {
6564
return outstandingReads.get(key);
6665
}
6766

68-
void putOutstandingRead(long key, OutstandingReadToArray value) {
67+
void putOutstandingRead(long key, BlobDescriptorStreamRead value) {
6968
outstandingReads.put(key, value);
7069
}
7170

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.api.gax.rpc.ClientStream;
2424
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2525
import com.google.api.gax.rpc.StreamController;
26-
import com.google.cloud.storage.BlobDescriptorImpl.OutstandingReadToArray;
2726
import com.google.common.base.Preconditions;
2827
import com.google.storage.v2.BidiReadObjectRequest;
2928
import com.google.storage.v2.BidiReadObjectResponse;
@@ -185,7 +184,7 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
185184
for (int i = 0; i < rangeData.size(); i++) {
186185
ObjectRangeData d = rangeData.get(i);
187186
long id = d.getReadRange().getReadId();
188-
OutstandingReadToArray read = state.getOutstandingRead(id);
187+
BlobDescriptorStreamRead read = state.getOutstandingRead(id);
189188
if (read == null) {
190189
continue;
191190
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.core.SettableApiFuture;
20+
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
21+
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
22+
import com.google.protobuf.ByteString;
23+
import com.google.storage.v2.ReadRange;
24+
import java.io.Closeable;
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
30+
abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
31+
32+
protected final long readId;
33+
protected final ReadCursor readCursor;
34+
protected final List<ChildRef> childRefs;
35+
protected boolean closed;
36+
37+
private BlobDescriptorStreamRead(long readId, long readOffset, long readLimit) {
38+
this.readId = readId;
39+
this.readCursor = new ReadCursor(readOffset, readOffset + readLimit);
40+
this.childRefs = Collections.synchronizedList(new ArrayList<>());
41+
this.closed = false;
42+
}
43+
44+
abstract void accept(ChildRef childRef) throws IOException;
45+
46+
abstract void eof() throws IOException;
47+
48+
final ReadRange makeReadRange() {
49+
return ReadRange.newBuilder()
50+
.setReadId(readId)
51+
.setReadOffset(readCursor.position())
52+
.setReadLength(readCursor.remaining())
53+
.build();
54+
}
55+
56+
@Override
57+
public void close() throws IOException {
58+
if (!closed) {
59+
closed = true;
60+
GrpcUtils.closeAll(childRefs);
61+
}
62+
}
63+
64+
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
65+
long readId, ByteRangeSpec range, SettableApiFuture<byte[]> complete) {
66+
return new ByteArrayAccumulatingRead(readId, range.beginOffset(), range.length(), complete);
67+
}
68+
69+
static AccumulatingRead<DisposableByteString> createZeroCopyByteStringAccumulatingRead(
70+
long readId, ByteRangeSpec range, SettableApiFuture<DisposableByteString> complete) {
71+
return new ZeroCopyByteStringAccumulatingRead(
72+
readId, range.beginOffset(), range.length(), complete);
73+
}
74+
75+
/** Base class of a read that will accumulate before completing by resolving a future */
76+
abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead {
77+
protected final SettableApiFuture<Result> complete;
78+
79+
private AccumulatingRead(
80+
long readId, long readOffset, long readLimit, SettableApiFuture<Result> complete) {
81+
super(readId, readOffset, readLimit);
82+
this.complete = complete;
83+
}
84+
}
85+
86+
/**
87+
* Base class of a read that will be processed in a streaming manner (e.g. {@link
88+
* java.nio.channels.ReadableByteChannel})
89+
*/
90+
abstract static class StreamingRead extends BlobDescriptorStreamRead {
91+
private StreamingRead(long readId, long readOffset, long readLimit) {
92+
super(readId, readOffset, readLimit);
93+
}
94+
}
95+
96+
static final class ByteArrayAccumulatingRead extends AccumulatingRead<byte[]> {
97+
98+
private ByteArrayAccumulatingRead(
99+
long readId, long readOffset, long readLimit, SettableApiFuture<byte[]> complete) {
100+
super(readId, readOffset, readLimit, complete);
101+
}
102+
103+
@Override
104+
void accept(ChildRef childRef) throws IOException {
105+
int size = childRef.byteString().size();
106+
childRefs.add(childRef);
107+
readCursor.advance(size);
108+
}
109+
110+
@Override
111+
void eof() throws IOException {
112+
try {
113+
ByteString base = ByteString.empty();
114+
for (ChildRef ref : childRefs) {
115+
base = base.concat(ref.byteString());
116+
}
117+
complete.set(base.toByteArray());
118+
} finally {
119+
close();
120+
}
121+
}
122+
}
123+
124+
static final class ZeroCopyByteStringAccumulatingRead
125+
extends AccumulatingRead<DisposableByteString> implements DisposableByteString {
126+
127+
private volatile ByteString byteString;
128+
129+
private ZeroCopyByteStringAccumulatingRead(
130+
long readId,
131+
long readOffset,
132+
long readLimit,
133+
SettableApiFuture<DisposableByteString> complete) {
134+
super(readId, readOffset, readLimit, complete);
135+
}
136+
137+
@Override
138+
public ByteString byteString() {
139+
return byteString;
140+
}
141+
142+
@Override
143+
void accept(ChildRef childRef) throws IOException {
144+
int size = childRef.byteString().size();
145+
childRefs.add(childRef);
146+
readCursor.advance(size);
147+
}
148+
149+
@Override
150+
void eof() throws IOException {
151+
ByteString base = ByteString.empty();
152+
for (ChildRef ref : childRefs) {
153+
base = base.concat(ref.byteString());
154+
}
155+
byteString = base;
156+
complete.set(this);
157+
}
158+
}
159+
}

0 commit comments

Comments
 (0)