Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,54 +76,61 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
return new UnbufferedReadableByteChannelSessionBuilder(getF(read, hasher));
}

private static BiFunction<Object, SettableApiFuture<Object>, UnbufferedReadableByteChannel>
private static BiFunction<
ReadObjectRequest, SettableApiFuture<Object>, UnbufferedReadableByteChannel>
getF(ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read, Hasher hasher) {
return (object, resultFuture) ->
new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher);
}

public static final class BufferedReadableByteChannelSessionBuilder {

private BiFunction<Object, SettableApiFuture<Object>, BufferedReadableByteChannel> f;
private Object obj;
private BiFunction<ReadObjectRequest, SettableApiFuture<Object>, BufferedReadableByteChannel>
f;
private ReadObjectRequest request;

private BufferedReadableByteChannelSessionBuilder(
BufferHandle buffer,
BiFunction<Object, SettableApiFuture<Object>, UnbufferedReadableByteChannel> f) {
BiFunction<ReadObjectRequest, SettableApiFuture<Object>, UnbufferedReadableByteChannel>
f) {
this.f = f.andThen(c -> new DefaultBufferedReadableByteChannel(buffer, c));
}

public BufferedReadableByteChannelSessionBuilder setObject(Object obj) {
this.obj = requireNonNull(obj, "obj must be non null");
public BufferedReadableByteChannelSessionBuilder setReadObjectRequest(
ReadObjectRequest request) {
this.request = requireNonNull(request, "request must be non null");
return this;
}

public BufferedReadableByteChannelSession<Object> build() {
return new ChannelSession.BufferedReadSession<>(
ApiFutures.immediateFuture(obj),
ApiFutures.immediateFuture(request),
f.andThen(StorageByteChannels.readable()::createSynchronized));
}
}

public static final class UnbufferedReadableByteChannelSessionBuilder {

// TODO: object -> ReadObjectRequest
private BiFunction<Object, SettableApiFuture<Object>, UnbufferedReadableByteChannel> f;
private Object obj;
private BiFunction<
ReadObjectRequest, SettableApiFuture<Object>, UnbufferedReadableByteChannel>
f;
private ReadObjectRequest request;

private UnbufferedReadableByteChannelSessionBuilder(
BiFunction<Object, SettableApiFuture<Object>, UnbufferedReadableByteChannel> f) {
BiFunction<ReadObjectRequest, SettableApiFuture<Object>, UnbufferedReadableByteChannel>
f) {
this.f = f;
}

public UnbufferedReadableByteChannelSessionBuilder setObject(Object obj) {
this.obj = requireNonNull(obj, "obj must be non null");
public UnbufferedReadableByteChannelSessionBuilder setReadObjectRequest(
ReadObjectRequest request) {
this.request = requireNonNull(request, "request must be non null");
return this;
}

public UnbufferedReadableByteChannelSession<Object> build() {
return new ChannelSession.UnbufferedReadSession<>(
ApiFutures.immediateFuture(obj),
ApiFutures.immediateFuture(request),
f.andThen(StorageByteChannels.readable()::createSynchronized));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.StorageV2ProtoUtils.seekReadObjectRequest;
import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.http.HttpStatusCodes;
Expand Down Expand Up @@ -59,17 +60,11 @@ final class GapicUnbufferedReadableByteChannel
GapicUnbufferedReadableByteChannel(
SettableApiFuture<Object> result,
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
Object obj,
ReadObjectRequest req,
Hasher hasher) {
this.result = result;
this.read = read;
this.req =
ReadObjectRequest.newBuilder()
.setBucket(obj.getBucket())
.setObject(obj.getName())
.setGeneration(obj.getGeneration())
.setReadOffset(blobOffset)
.build();
this.req = req;
this.hasher = hasher;
this.iter = new LazyServerStreamIterator();
}
Expand Down Expand Up @@ -130,7 +125,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
ByteBuffer content = checksummedData.getContent().asReadOnlyByteBuffer();
Crc32cLengthUnknown expected = Crc32cValue.of(checksummedData.getCrc32C());
try {
hasher.validate(expected, content.duplicate());
hasher.validate(expected, content::duplicate);
} catch (IOException e) {
close();
throw e;
Expand Down Expand Up @@ -240,21 +235,7 @@ private void ensureOpen() {
synchronized (this) {
if (!streamInitialized) {
if (serverStream == null) {
ReadObjectRequest request = req;

boolean setOffset = blobOffset > 0 && blobOffset != req.getReadOffset();
boolean setLimit = blobLimit < Long.MAX_VALUE && blobLimit != req.getReadLimit();
if (setOffset || setLimit) {
ReadObjectRequest.Builder b = request.toBuilder();
if (setOffset) {
b.setReadOffset(blobOffset);
}
if (setLimit) {
b.setReadLimit(blobLimit);
}
request = b.build();
}

ReadObjectRequest request = seekReadObjectRequest(req, blobOffset, blobLimit);
serverStream = read.call(request);
}
responseIterator = serverStream.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.StorageV2ProtoUtils.seekReadObjectRequest;
import static com.google.cloud.storage.Utils.todo;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.storage.v2.Object;
import com.google.storage.v2.ReadObjectRequest;
Expand All @@ -36,26 +38,31 @@ final class GrpcBlobReadChannel implements ReadChannel {

private final LazyReadChannel lazyReadChannel;

private Long position;
private Long limit;
private int chunkSize = 16 * 1024 * 1024;

GrpcBlobReadChannel(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read, Supplier<Object> start) {
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ReadObjectRequest request) {
this.lazyReadChannel =
new LazyReadChannel(
Suppliers.memoize(
() ->
ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.buffered(ByteBuffer.allocate(chunkSize))
.setObject(start.get())
.build()));
() -> {
ReadObjectRequest req = seekReadObjectRequest(request, position, limit);
return ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.buffered(ByteBuffer.allocate(chunkSize))
.setReadObjectRequest(req)
.build();
}));
}

@Override
public void setChunkSize(int chunkSize) {
Preconditions.checkState(!isOpen(), "Unable to change chunkSize after write");
checkState(!isOpen(), "Unable to change chunkSize after read");
this.chunkSize = chunkSize;
}

Expand All @@ -78,7 +85,9 @@ public void close() {

@Override
public void seek(long position) throws IOException {
todo();
checkArgument(position >= 0, "position must be >= 0");
checkState(!isOpen(), "Unable to change position after read");
this.position = position;
}

@Override
Expand All @@ -88,12 +97,15 @@ public RestorableState<ReadChannel> capture() {

@Override
public ReadChannel limit(long limit) {
return todo();
checkArgument(limit >= 0, "limit must be >= 0");
checkState(!isOpen(), "Unable to change limit after read");
this.limit = limit;
return this;
}

@Override
public long limit() {
return todo();
return limit != null ? limit : Long.MAX_VALUE;
}

@Override
Expand Down
Loading