Skip to content

Commit 3eec2e3

Browse files
committed
chore: remove client side range limit validation
Appendable objects can change size without changing generation, meaning the size of an object being read from could change after the BlobDescriptor is opened. To allow reading from an offset we don't yet know about, remove all client side limitation as well as any client side optimization for ranges that are known to be zero bytes.
1 parent 1bf497e commit 3eec2e3

File tree

8 files changed

+72
-284
lines changed

8 files changed

+72
-284
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,15 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.core.SettableApiFuture;
2424
import com.google.api.gax.grpc.GrpcCallContext;
25-
import com.google.api.gax.grpc.GrpcStatusCode;
26-
import com.google.api.gax.rpc.ApiExceptionFactory;
2725
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2826
import com.google.cloud.storage.BlobDescriptorStreamRead.AccumulatingRead;
2927
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
3028
import com.google.cloud.storage.RetryContext.RetryContextProvider;
3129
import com.google.common.annotations.VisibleForTesting;
32-
import com.google.common.math.LongMath;
3330
import com.google.protobuf.ByteString;
3431
import com.google.storage.v2.BidiReadObjectRequest;
3532
import com.google.storage.v2.BidiReadObjectResponse;
36-
import io.grpc.Status.Code;
3733
import java.io.IOException;
38-
import java.util.OptionalLong;
3934
import java.util.concurrent.ScheduledExecutorService;
4035

4136
final class BlobDescriptorImpl implements BlobDescriptor {
@@ -59,14 +54,10 @@ private BlobDescriptorImpl(
5954
public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
6055
checkState(stream.isOpen(), "stream already closed");
6156
long readId = state.newReadId();
62-
ReadCursor readCursor = getReadCursor(range, state);
63-
if (!readCursor.hasRemaining()) {
64-
return ApiFutures.immediateFuture(new byte[0]);
65-
}
6657
SettableApiFuture<byte[]> future = SettableApiFuture.create();
6758
AccumulatingRead<byte[]> read =
6859
BlobDescriptorStreamRead.createByteArrayAccumulatingRead(
69-
readId, readCursor, retryContextProvider.create(), future);
60+
readId, range, retryContextProvider.create(), future);
7061
BidiReadObjectRequest request =
7162
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
7263
state.putOutstandingRead(readId, read);
@@ -77,14 +68,10 @@ public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
7768
public ApiFuture<DisposableByteString> readRangeAsByteString(RangeSpec range) {
7869
checkState(stream.isOpen(), "stream already closed");
7970
long readId = state.newReadId();
80-
ReadCursor readCursor = getReadCursor(range, state);
81-
if (!readCursor.hasRemaining()) {
82-
return ApiFutures.immediateFuture(EmptyDisposableByteString.INSTANCE);
83-
}
8471
SettableApiFuture<DisposableByteString> future = SettableApiFuture.create();
8572
AccumulatingRead<DisposableByteString> read =
8673
BlobDescriptorStreamRead.createZeroCopyByteStringAccumulatingRead(
87-
readId, readCursor, future, retryContextProvider.create());
74+
readId, range, future, retryContextProvider.create());
8875
BidiReadObjectRequest request =
8976
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
9077
state.putOutstandingRead(readId, read);
@@ -102,26 +89,6 @@ public void close() throws IOException {
10289
stream.close();
10390
}
10491

105-
@VisibleForTesting
106-
static ReadCursor getReadCursor(RangeSpec range, BlobDescriptorState state) {
107-
long begin = range.begin();
108-
long objectSize = state.getMetadata().getSize();
109-
if (begin > objectSize) {
110-
throw ApiExceptionFactory.createException(
111-
String.format(
112-
"range begin must be < objectSize (range begin = %d, object size = %d",
113-
begin, objectSize),
114-
null,
115-
GrpcStatusCode.of(Code.OUT_OF_RANGE),
116-
false);
117-
}
118-
final long end;
119-
OptionalLong limit = range.limit();
120-
long saturatedAdd = LongMath.saturatedAdd(begin, limit.orElse(0L));
121-
end = Math.min(saturatedAdd, objectSize);
122-
return new ReadCursor(begin, end);
123-
}
124-
12592
static ApiFuture<BlobDescriptor> create(
12693
BidiReadObjectRequest openRequest,
12794
GrpcCallContext context,

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -295,23 +295,11 @@ public void onResponse(BidiReadObjectResponse response) {
295295

296296
final int idx = i;
297297
long begin = readRange.getReadOffset();
298-
long position = read.getReadCursor().position();
298+
long position = read.readOffset();
299299
if (begin == position) {
300-
long remaining = read.getReadCursor().remaining();
301-
int contentSize = content.size();
302300
ChildRef childRef;
303-
if (remaining >= contentSize) {
304-
childRef =
305-
handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
306-
} else {
307-
childRef =
308-
handle.borrow(
309-
r ->
310-
r.getObjectDataRanges(idx)
311-
.getChecksummedData()
312-
.getContent()
313-
.substring(0, Math.toIntExact(remaining)));
314-
}
301+
childRef =
302+
handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
315303
read.accept(childRef);
316304
} else if (begin < position) {
317305
int skip = Math.toIntExact(position - begin);
@@ -348,7 +336,7 @@ public void onResponse(BidiReadObjectResponse response) {
348336
continue;
349337
}
350338

351-
if (d.getRangeEnd() && !read.getReadCursor().hasRemaining()) {
339+
if (d.getRangeEnd()) {
352340
// invoke eof on exec, the resolving future could have a downstream callback
353341
// that we don't want to block this grpc thread
354342
executor.execute(

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,46 @@
2929
import java.util.ArrayList;
3030
import java.util.Collections;
3131
import java.util.List;
32+
import java.util.concurrent.atomic.AtomicLong;
3233

3334
abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3435

3536
protected final long readId;
36-
protected final ReadCursor readCursor;
37+
protected final RangeSpec rangeSpec;
3738
protected final List<ChildRef> childRefs;
3839
protected final RetryContext retryContext;
40+
protected final AtomicLong readOffset;
3941
protected boolean closed;
4042
protected boolean tombstoned;
4143

42-
BlobDescriptorStreamRead(long readId, ReadCursor readCursor, RetryContext retryContext) {
43-
this(readId, readCursor, Collections.synchronizedList(new ArrayList<>()), retryContext, false);
44+
BlobDescriptorStreamRead(long readId, RangeSpec rangeSpec, RetryContext retryContext) {
45+
this(
46+
readId,
47+
rangeSpec,
48+
Collections.synchronizedList(new ArrayList<>()),
49+
new AtomicLong(rangeSpec.begin()),
50+
retryContext,
51+
false);
4452
}
4553

4654
BlobDescriptorStreamRead(
4755
long readId,
48-
ReadCursor readCursor,
56+
RangeSpec rangeSpec,
4957
List<ChildRef> childRefs,
58+
AtomicLong readOffset,
5059
RetryContext retryContext,
5160
boolean closed) {
5261
this.readId = readId;
53-
this.readCursor = readCursor;
62+
this.rangeSpec = rangeSpec;
5463
this.childRefs = childRefs;
5564
this.retryContext = retryContext;
65+
this.readOffset = readOffset;
5666
this.closed = closed;
5767
this.tombstoned = false;
5868
}
5969

60-
ReadCursor getReadCursor() {
61-
return readCursor;
70+
long readOffset() {
71+
return readOffset.get();
6272
}
6373

6474
abstract boolean acceptingBytes();
@@ -76,11 +86,16 @@ final void preFail() {
7686
abstract BlobDescriptorStreamRead withNewReadId(long newReadId);
7787

7888
final ReadRange makeReadRange() {
79-
return ReadRange.newBuilder()
80-
.setReadId(readId)
81-
.setReadOffset(readCursor.position())
82-
.setReadLength(readCursor.remaining())
83-
.build();
89+
long currentOffset = readOffset.get();
90+
ReadRange.Builder b = ReadRange.newBuilder().setReadId(readId).setReadOffset(currentOffset);
91+
rangeSpec
92+
.limit()
93+
.ifPresent(
94+
limit -> {
95+
long readSoFar = currentOffset - rangeSpec.begin();
96+
b.setReadLength(limit - readSoFar);
97+
});
98+
return b.build();
8499
}
85100

86101
@Override
@@ -98,18 +113,18 @@ public void close() throws IOException {
98113

99114
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
100115
long readId,
101-
ReadCursor readCursor,
116+
RangeSpec rangeSpec,
102117
RetryContext retryContext,
103118
SettableApiFuture<byte[]> complete) {
104-
return new ByteArrayAccumulatingRead(readId, readCursor, retryContext, complete);
119+
return new ByteArrayAccumulatingRead(readId, rangeSpec, retryContext, complete);
105120
}
106121

107122
static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRead(
108123
long readId,
109-
ReadCursor readCursor,
124+
RangeSpec rangeSpec,
110125
SettableApiFuture<DisposableByteString> complete,
111126
RetryContext retryContext) {
112-
return new ZeroCopyByteStringAccumulatingRead(readId, readCursor, retryContext, complete);
127+
return new ZeroCopyByteStringAccumulatingRead(readId, rangeSpec, retryContext, complete);
113128
}
114129

115130
/** Base class of a read that will accumulate before completing by resolving a future */
@@ -118,27 +133,28 @@ abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead
118133

119134
private AccumulatingRead(
120135
long readId,
121-
ReadCursor readCursor,
136+
RangeSpec rangeSpec,
122137
RetryContext retryContext,
123138
SettableApiFuture<Result> complete) {
124-
super(readId, readCursor, retryContext);
139+
super(readId, rangeSpec, retryContext);
125140
this.complete = complete;
126141
}
127142

128143
private AccumulatingRead(
129144
long readId,
130-
ReadCursor readCursor,
145+
RangeSpec rangeSpec,
131146
List<ChildRef> childRefs,
147+
AtomicLong readOffset,
132148
RetryContext retryContext,
133149
boolean closed,
134150
SettableApiFuture<Result> complete) {
135-
super(readId, readCursor, childRefs, retryContext, closed);
151+
super(readId, rangeSpec, childRefs, readOffset, retryContext, closed);
136152
this.complete = complete;
137153
}
138154

139155
@Override
140156
boolean acceptingBytes() {
141-
return !complete.isDone() && !tombstoned && readCursor.hasRemaining();
157+
return !complete.isDone() && !tombstoned;
142158
}
143159

144160
@Override
@@ -170,46 +186,48 @@ public boolean readyToSend() {
170186
* java.nio.channels.ReadableByteChannel})
171187
*/
172188
abstract static class StreamingRead extends BlobDescriptorStreamRead {
173-
private StreamingRead(long readId, long readOffset, long readLimit, RetryContext retryContext) {
174-
super(readId, new ReadCursor(readOffset, readOffset + readLimit), retryContext);
189+
private StreamingRead(long readId, RangeSpec range, RetryContext retryContext) {
190+
super(readId, range, retryContext);
175191
}
176192

177193
private StreamingRead(
178194
long readId,
179-
ReadCursor readCursor,
195+
RangeSpec rangeSpec,
180196
List<ChildRef> childRefs,
197+
AtomicLong readOffset,
181198
RetryContext retryContext,
182199
boolean closed) {
183-
super(readId, readCursor, childRefs, retryContext, closed);
200+
super(readId, rangeSpec, childRefs, readOffset, retryContext, closed);
184201
}
185202
}
186203

187204
static final class ByteArrayAccumulatingRead extends AccumulatingRead<byte[]> {
188205

189206
private ByteArrayAccumulatingRead(
190207
long readId,
191-
ReadCursor readCursor,
208+
RangeSpec rangeSpec,
192209
RetryContext retryContext,
193210
SettableApiFuture<byte[]> complete) {
194-
super(readId, readCursor, retryContext, complete);
211+
super(readId, rangeSpec, retryContext, complete);
195212
}
196213

197214
private ByteArrayAccumulatingRead(
198215
long readId,
199-
ReadCursor readCursor,
216+
RangeSpec rangeSpec,
200217
List<ChildRef> childRefs,
201218
RetryContext retryContext,
219+
AtomicLong readOffset,
202220
boolean closed,
203221
SettableApiFuture<byte[]> complete) {
204-
super(readId, readCursor, childRefs, retryContext, closed, complete);
222+
super(readId, rangeSpec, childRefs, readOffset, retryContext, closed, complete);
205223
}
206224

207225
@Override
208226
void accept(ChildRef childRef) throws IOException {
209227
retryContext.reset();
210228
int size = childRef.byteString().size();
211229
childRefs.add(childRef);
212-
readCursor.advance(size);
230+
readOffset.addAndGet(size);
213231
}
214232

215233
@Override
@@ -230,7 +248,7 @@ void eof() throws IOException {
230248
ByteArrayAccumulatingRead withNewReadId(long newReadId) {
231249
this.tombstoned = true;
232250
return new ByteArrayAccumulatingRead(
233-
newReadId, readCursor, childRefs, retryContext, closed, complete);
251+
newReadId, rangeSpec, childRefs, retryContext, readOffset, closed, complete);
234252
}
235253
}
236254

@@ -241,21 +259,22 @@ static final class ZeroCopyByteStringAccumulatingRead
241259

242260
private ZeroCopyByteStringAccumulatingRead(
243261
long readId,
244-
ReadCursor readCursor,
262+
RangeSpec rangeSpec,
245263
RetryContext retryContext,
246264
SettableApiFuture<DisposableByteString> complete) {
247-
super(readId, readCursor, retryContext, complete);
265+
super(readId, rangeSpec, retryContext, complete);
248266
}
249267

250268
public ZeroCopyByteStringAccumulatingRead(
251269
long readId,
252-
ReadCursor readCursor,
270+
RangeSpec rangeSpec,
253271
List<ChildRef> childRefs,
272+
AtomicLong readOffset,
254273
RetryContext retryContext,
255274
boolean closed,
256275
SettableApiFuture<DisposableByteString> complete,
257276
ByteString byteString) {
258-
super(readId, readCursor, childRefs, retryContext, closed, complete);
277+
super(readId, rangeSpec, childRefs, readOffset, retryContext, closed, complete);
259278
this.byteString = byteString;
260279
}
261280

@@ -269,7 +288,7 @@ void accept(ChildRef childRef) throws IOException {
269288
retryContext.reset();
270289
int size = childRef.byteString().size();
271290
childRefs.add(childRef);
272-
readCursor.advance(size);
291+
readOffset.addAndGet(size);
273292
}
274293

275294
@Override
@@ -287,7 +306,7 @@ void eof() throws IOException {
287306
ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
288307
this.tombstoned = true;
289308
return new ZeroCopyByteStringAccumulatingRead(
290-
newReadId, readCursor, childRefs, retryContext, closed, complete, byteString);
309+
newReadId, rangeSpec, childRefs, readOffset, retryContext, closed, complete, byteString);
291310
}
292311
}
293312
}

0 commit comments

Comments
 (0)