Skip to content

Commit 8e516e6

Browse files
committed
chore: add checksum validation to received ObjectRangeData
When receiving an ObjectRangeData the checksum in the ChecksummedData should be validated. If validation fails, we will abandon the read_id associated with the failed ObjectRangeData. And issue a new request from the offset of the pending client side read. By abandoning the read_id we ensure that our response observer will receive and evaluate all BidiReadObjectResponse but that in pending bytes for that read_id won't be surfaced to the application thereby maintaining integrity of the bytes. Add a test to ITBlobDescriptorFakeTest.java to induce a checksum failure and validate correct handling.
1 parent 0a2fac5 commit 8e516e6

File tree

7 files changed

+250
-30
lines changed

7 files changed

+250
-30
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,11 @@
127127
<method>com.google.api.core.ApiFuture getBlobDescriptor(com.google.cloud.storage.BlobId, com.google.cloud.storage.Storage$BlobSourceOption[])</method>
128128
</difference>
129129

130+
<difference>
131+
<differenceType>7005</differenceType>
132+
<className>com/google/cloud/storage/Hasher$*</className>
133+
<method>* validate(*)</method>
134+
<to>* validate(*)</to>
135+
</difference>
136+
130137
</differences>

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.common.base.Preconditions.checkState;
20+
1921
import com.google.common.collect.ImmutableList;
2022
import com.google.storage.v2.BidiReadHandle;
2123
import com.google.storage.v2.BidiReadObjectRequest;
2224
import com.google.storage.v2.Object;
2325
import com.google.storage.v2.ReadRange;
26+
import java.util.HashMap;
2427
import java.util.List;
2528
import java.util.Map;
26-
import java.util.concurrent.ConcurrentHashMap;
2729
import java.util.concurrent.atomic.AtomicLong;
2830
import java.util.concurrent.atomic.AtomicReference;
2931
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -44,7 +46,7 @@ final class BlobDescriptorState {
4446
this.routingToken = new AtomicReference<>();
4547
this.metadata = new AtomicReference<>();
4648
this.readIdSeq = new AtomicLong(1);
47-
this.outstandingReads = new ConcurrentHashMap<>();
49+
this.outstandingReads = new HashMap<>();
4850
}
4951

5052
BidiReadObjectRequest getOpenRequest() {
@@ -74,15 +76,21 @@ long newReadId() {
7476

7577
@Nullable
7678
BlobDescriptorStreamRead getOutstandingRead(long key) {
77-
return outstandingReads.get(key);
79+
synchronized (this) {
80+
return outstandingReads.get(key);
81+
}
7882
}
7983

8084
void putOutstandingRead(long key, BlobDescriptorStreamRead value) {
81-
outstandingReads.put(key, value);
85+
synchronized (this) {
86+
outstandingReads.put(key, value);
87+
}
8288
}
8389

8490
void removeOutstandingRead(long key) {
85-
outstandingReads.remove(key);
91+
synchronized (this) {
92+
outstandingReads.remove(key);
93+
}
8694
}
8795

8896
void setRoutingToken(String routingToken) {
@@ -94,9 +102,22 @@ String getRoutingToken() {
94102
return this.routingToken.get();
95103
}
96104

97-
public List<ReadRange> getOutstandingReads() {
98-
return outstandingReads.values().stream()
99-
.map(BlobDescriptorStreamRead::makeReadRange)
100-
.collect(ImmutableList.toImmutableList());
105+
BlobDescriptorStreamRead assignNewReadId(long oldReadId) {
106+
synchronized (this) {
107+
BlobDescriptorStreamRead remove = outstandingReads.remove(oldReadId);
108+
checkState(remove != null, "unable to locate old");
109+
long newReadId = newReadId();
110+
BlobDescriptorStreamRead withNewReadId = remove.withNewReadId(newReadId);
111+
outstandingReads.put(newReadId, withNewReadId);
112+
return withNewReadId;
113+
}
114+
}
115+
116+
List<ReadRange> getOutstandingReads() {
117+
synchronized (this) {
118+
return outstandingReads.values().stream()
119+
.map(BlobDescriptorStreamRead::makeReadRange)
120+
.collect(ImmutableList.toImmutableList());
121+
}
101122
}
102123
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@
2424
import com.google.api.gax.rpc.ResponseObserver;
2525
import com.google.api.gax.rpc.StreamController;
2626
import com.google.common.base.Preconditions;
27+
import com.google.protobuf.ByteString;
2728
import com.google.rpc.Status;
2829
import com.google.storage.v2.BidiReadHandle;
2930
import com.google.storage.v2.BidiReadObjectError;
3031
import com.google.storage.v2.BidiReadObjectRedirectedError;
3132
import com.google.storage.v2.BidiReadObjectRequest;
3233
import com.google.storage.v2.BidiReadObjectResponse;
34+
import com.google.storage.v2.ChecksummedData;
3335
import com.google.storage.v2.Object;
3436
import com.google.storage.v2.ObjectRangeData;
37+
import com.google.storage.v2.ReadRange;
3538
import com.google.storage.v2.ReadRangeError;
3639
import java.io.IOException;
3740
import java.util.List;
@@ -240,11 +243,33 @@ public void onResponse(BidiReadObjectResponse response) {
240243
}
241244
for (int i = 0; i < rangeData.size(); i++) {
242245
ObjectRangeData d = rangeData.get(i);
243-
long id = d.getReadRange().getReadId();
246+
ReadRange readRange = d.getReadRange();
247+
long id = readRange.getReadId();
244248
BlobDescriptorStreamRead read = state.getOutstandingRead(id);
245249
if (read == null) {
246250
continue;
247251
}
252+
// TODO: validate read is still open
253+
// todo: check that offsets match up
254+
ChecksummedData checksummedData = d.getChecksummedData();
255+
ByteString content = checksummedData.getContent();
256+
int crc32C = checksummedData.getCrc32C();
257+
258+
try {
259+
// todo: benchmark how long it takes to compute this checksum and whether it needs to
260+
// happen on a non-io thread
261+
Hasher.enabled().validate(Crc32cValue.of(crc32C), content);
262+
} catch (IOException e) {
263+
//noinspection resource
264+
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
265+
// todo: record failure for read
266+
BidiReadObjectRequest requestWithNewReadId =
267+
BidiReadObjectRequest.newBuilder()
268+
.addReadRanges(readWithNewId.makeReadRange())
269+
.build();
270+
requestStream.send(requestWithNewReadId);
271+
continue;
272+
}
248273
final int idx = i;
249274
//noinspection rawtypes
250275
ResponseContentLifecycleHandle.ChildRef childRef =

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,16 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3939
protected final List<ChildRef> childRefs;
4040
protected boolean closed;
4141

42-
private BlobDescriptorStreamRead(long readId, long readOffset, long readLimit) {
42+
private BlobDescriptorStreamRead(long readId, ReadCursor readCursor) {
43+
this(readId, readCursor, Collections.synchronizedList(new ArrayList<>()), false);
44+
}
45+
46+
private BlobDescriptorStreamRead(
47+
long readId, ReadCursor readCursor, List<ChildRef> childRefs, boolean closed) {
4348
this.readId = readId;
44-
this.readCursor = new ReadCursor(readOffset, readOffset + readLimit);
45-
this.childRefs = Collections.synchronizedList(new ArrayList<>());
46-
this.closed = false;
49+
this.readCursor = readCursor;
50+
this.childRefs = childRefs;
51+
this.closed = closed;
4752
}
4853

4954
abstract void accept(ChildRef childRef) throws IOException;
@@ -52,6 +57,8 @@ private BlobDescriptorStreamRead(long readId, long readOffset, long readLimit) {
5257

5358
abstract void fail(Status status) throws IOException;
5459

60+
abstract BlobDescriptorStreamRead withNewReadId(long newReadId);
61+
5562
final ReadRange makeReadRange() {
5663
return ReadRange.newBuilder()
5764
.setReadId(readId)
@@ -70,22 +77,37 @@ public void close() throws IOException {
7077

7178
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
7279
long readId, ByteRangeSpec range, SettableApiFuture<byte[]> complete) {
73-
return new ByteArrayAccumulatingRead(readId, range.beginOffset(), range.length(), complete);
80+
return new ByteArrayAccumulatingRead(
81+
readId,
82+
new ReadCursor(range.beginOffset(), range.beginOffset() + range.length()),
83+
complete);
7484
}
7585

7686
static AccumulatingRead<DisposableByteString> createZeroCopyByteStringAccumulatingRead(
7787
long readId, ByteRangeSpec range, SettableApiFuture<DisposableByteString> complete) {
7888
return new ZeroCopyByteStringAccumulatingRead(
79-
readId, range.beginOffset(), range.length(), complete);
89+
readId,
90+
new ReadCursor(range.beginOffset(), range.beginOffset() + range.length()),
91+
complete);
8092
}
8193

8294
/** Base class of a read that will accumulate before completing by resolving a future */
8395
abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead {
8496
protected final SettableApiFuture<Result> complete;
8597

8698
private AccumulatingRead(
87-
long readId, long readOffset, long readLimit, SettableApiFuture<Result> complete) {
88-
super(readId, readOffset, readLimit);
99+
long readId, ReadCursor readCursor, SettableApiFuture<Result> complete) {
100+
super(readId, readCursor);
101+
this.complete = complete;
102+
}
103+
104+
private AccumulatingRead(
105+
long readId,
106+
ReadCursor readCursor,
107+
List<ChildRef> childRefs,
108+
boolean closed,
109+
SettableApiFuture<Result> complete) {
110+
super(readId, readCursor, childRefs, closed);
89111
this.complete = complete;
90112
}
91113

@@ -106,15 +128,29 @@ void fail(Status status) throws IOException {
106128
*/
107129
abstract static class StreamingRead extends BlobDescriptorStreamRead {
108130
private StreamingRead(long readId, long readOffset, long readLimit) {
109-
super(readId, readOffset, readLimit);
131+
super(readId, new ReadCursor(readOffset, readOffset + readLimit));
132+
}
133+
134+
public StreamingRead(
135+
long readId, ReadCursor readCursor, List<ChildRef> childRefs, boolean closed) {
136+
super(readId, readCursor, childRefs, closed);
110137
}
111138
}
112139

113140
static final class ByteArrayAccumulatingRead extends AccumulatingRead<byte[]> {
114141

115142
private ByteArrayAccumulatingRead(
116-
long readId, long readOffset, long readLimit, SettableApiFuture<byte[]> complete) {
117-
super(readId, readOffset, readLimit, complete);
143+
long readId, ReadCursor readCursor, SettableApiFuture<byte[]> complete) {
144+
super(readId, readCursor, complete);
145+
}
146+
147+
private ByteArrayAccumulatingRead(
148+
long readId,
149+
ReadCursor readCursor,
150+
List<ChildRef> childRefs,
151+
boolean closed,
152+
SettableApiFuture<byte[]> complete) {
153+
super(readId, readCursor, childRefs, closed, complete);
118154
}
119155

120156
@Override
@@ -136,6 +172,11 @@ void eof() throws IOException {
136172
close();
137173
}
138174
}
175+
176+
@Override
177+
ByteArrayAccumulatingRead withNewReadId(long newReadId) {
178+
return new ByteArrayAccumulatingRead(newReadId, readCursor, childRefs, closed, complete);
179+
}
139180
}
140181

141182
static final class ZeroCopyByteStringAccumulatingRead
@@ -144,11 +185,8 @@ static final class ZeroCopyByteStringAccumulatingRead
144185
private volatile ByteString byteString;
145186

146187
private ZeroCopyByteStringAccumulatingRead(
147-
long readId,
148-
long readOffset,
149-
long readLimit,
150-
SettableApiFuture<DisposableByteString> complete) {
151-
super(readId, readOffset, readLimit, complete);
188+
long readId, ReadCursor readCursor, SettableApiFuture<DisposableByteString> complete) {
189+
super(readId, readCursor, complete);
152190
}
153191

154192
@Override
@@ -172,5 +210,10 @@ void eof() throws IOException {
172210
byteString = base;
173211
complete.set(this);
174212
}
213+
214+
@Override
215+
ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
216+
return new ZeroCopyByteStringAccumulatingRead(newReadId, readCursor, complete);
217+
}
175218
}
176219
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
181181
if (checksummedData.hasCrc32C()) {
182182
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
183183
try {
184-
hasher.validate(expected, content.asReadOnlyByteBufferList());
184+
hasher.validate(expected, content);
185185
} catch (IOException e) {
186186
close();
187187
throw e;

google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2020
import com.google.common.hash.Hashing;
21+
import com.google.protobuf.ByteString;
2122
import java.io.IOException;
2223
import java.nio.ByteBuffer;
2324
import java.util.List;
@@ -38,7 +39,7 @@ default Crc32cLengthKnown hash(Supplier<ByteBuffer> b) {
3839

3940
void validate(Crc32cValue<?> expected, Supplier<ByteBuffer> b) throws IOException;
4041

41-
void validate(Crc32cValue<?> expected, List<ByteBuffer> buffers) throws IOException;
42+
void validate(Crc32cValue<?> expected, ByteString byteString) throws IOException;
4243

4344
@Nullable
4445
Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2);
@@ -66,7 +67,7 @@ public Crc32cLengthKnown hash(ByteBuffer b) {
6667
public void validate(Crc32cValue<?> expected, Supplier<ByteBuffer> b) {}
6768

6869
@Override
69-
public void validate(Crc32cValue<?> expected, List<ByteBuffer> b) {}
70+
public void validate(Crc32cValue<?> expected, ByteString b) {}
7071

7172
@Override
7273
public @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2) {
@@ -88,7 +89,8 @@ public Crc32cLengthKnown hash(ByteBuffer b) {
8889

8990
@SuppressWarnings({"ConstantConditions", "UnstableApiUsage"})
9091
@Override
91-
public void validate(Crc32cValue<?> expected, List<ByteBuffer> b) throws IOException {
92+
public void validate(Crc32cValue<?> expected, ByteString byteString) throws IOException {
93+
List<ByteBuffer> b = byteString.asReadOnlyByteBufferList();
9294
long remaining = 0;
9395
com.google.common.hash.Hasher crc32c = Hashing.crc32c().newHasher();
9496
for (ByteBuffer tmp : b) {

0 commit comments

Comments
 (0)