Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ static BaseServiceException coalesce(Throwable t) {
* @returns {@code StorageException}
*/
public static StorageException translate(IOException exception) {
if (exception.getMessage().contains("Connection closed prematurely")) {
return new StorageException(
0, exception.getMessage(), CONNECTION_CLOSED_PREMATURELY, exception);
String message = exception.getMessage();
if (message != null
&& (message.contains("Connection closed prematurely")
|| message.contains("Premature EOF"))) {
return new StorageException(0, message, CONNECTION_CLOSED_PREMATURELY, exception);
} else {
// default
return new StorageException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.media.MediaHttpDownloader;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.EmptyContent;
import com.google.api.client.http.GenericUrl;
Expand Down Expand Up @@ -283,7 +284,7 @@ public void onFailure(GoogleJsonError googleJsonError, HttpHeaders httpHeaders)
}

private static StorageException translate(IOException exception) {
return new StorageException(exception);
return StorageException.translate(exception);
}

private static StorageException translate(GoogleJsonError exception) {
Expand Down Expand Up @@ -750,10 +751,14 @@ public long read(
} else {
req.setReturnRawInputStream(false);
}
req.getMediaHttpDownloader().setBytesDownloaded(position);
req.getMediaHttpDownloader().setDirectDownloadEnabled(true);

if (position > 0) {
req.getRequestHeaders().setRange(String.format("bytes=%d-", position));
}
MediaHttpDownloader mediaHttpDownloader = req.getMediaHttpDownloader();
mediaHttpDownloader.setDirectDownloadEnabled(true);
req.executeMedia().download(outputStream);
return req.getMediaHttpDownloader().getNumBytesDownloaded();
return mediaHttpDownloader.getNumBytesDownloaded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a refactor or is there magical Apiary functionality hidden in this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be magical apiary behavior behind this. The previous code wouldn't set the Range header ever, but the new code does.

I don't fully understand what's the subtle difference between req.executeMedia().download(os) and req.executeMediaAndDownloadTo(os) but the latter results in the Range header being set if position > 0.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have since moved away from the previous executeAndDownload method change. This was not properly respecting returnRawInputStream which would break gzip automatic transcoding if you wanted to actually download the zipped bytes. Instead, we are now setting the range header ourselves.

} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = translate(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.junit.Assert.fail;

import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.io.JsonEOFException;
Expand Down Expand Up @@ -96,6 +97,7 @@ public void validateBehavior() {
} else if (shouldRetry && !defaultShouldRetryResult && !legacyShouldRetryResult) {
actualBehavior = Behavior.SAME;
message = "both are rejecting when we want a retry";
fail(message);
} else if (shouldRetry && defaultShouldRetryResult && legacyShouldRetryResult) {
actualBehavior = Behavior.SAME;
message = "both are allowing";
Expand All @@ -111,6 +113,7 @@ public void validateBehavior() {
} else if (!shouldRetry && defaultShouldRetryResult && legacyShouldRetryResult) {
actualBehavior = Behavior.SAME;
message = "both are too permissive";
fail(message);
} else if (!shouldRetry && defaultShouldRetryResult && !legacyShouldRetryResult) {
actualBehavior = Behavior.DEFAULT_MORE_PERMISSIBLE;
message = "default is too permissive";
Expand Down Expand Up @@ -298,7 +301,7 @@ enum ThrowableCategory {
STORAGE_EXCEPTION_GOOGLE_JSON_ERROR_503(new StorageException(C.JSON_503)),
STORAGE_EXCEPTION_GOOGLE_JSON_ERROR_504(new StorageException(C.JSON_504)),
STORAGE_EXCEPTION_SOCKET_TIMEOUT_EXCEPTION(new StorageException(C.SOCKET_TIMEOUT_EXCEPTION)),
STORAGE_EXCEPTION_SOCKET_EXCEPTION(new StorageException(C.SOCKET_EXCEPTION)),
STORAGE_EXCEPTION_SOCKET_EXCEPTION(StorageException.translate(C.SOCKET_EXCEPTION)),
STORAGE_EXCEPTION_SSL_EXCEPTION(new StorageException(C.SSL_EXCEPTION)),
STORAGE_EXCEPTION_SSL_EXCEPTION_CONNECTION_SHUTDOWN(
new StorageException(C.SSL_EXCEPTION_CONNECTION_SHUTDOWN)),
Expand All @@ -322,6 +325,9 @@ enum ThrowableCategory {
"connectionClosedPrematurely",
"connectionClosedPrematurely",
C.CONNECTION_CLOSED_PREMATURELY)),
STORAGE_EXCEPTION_0_CONNECTION_CLOSED_PREMATURELY_IO_CAUSE_NO_REASON(
StorageException.translate(C.CONNECTION_CLOSED_PREMATURELY)),
STORAGE_EXCEPTION_0_IO_PREMATURE_EOF(StorageException.translate(C.IO_PREMATURE_EOF)),
EMPTY_JSON_PARSE_ERROR(new IllegalArgumentException("no JSON input found")),
JACKSON_EOF_EXCEPTION(C.JACKSON_EOF_EXCEPTION),
STORAGE_EXCEPTION_0_JACKSON_EOF_EXCEPTION(
Expand Down Expand Up @@ -400,6 +406,7 @@ private static final class C {
new JsonEOFException(null, JsonToken.VALUE_STRING, "parse-exception");
private static final MalformedJsonException GSON_MALFORMED_EXCEPTION =
new MalformedJsonException("parse-exception");
private static final IOException IO_PREMATURE_EOF = new IOException("Premature EOF");

private static HttpResponseException newHttpResponseException(
int httpStatusCode, String name) {
Expand Down Expand Up @@ -919,6 +926,28 @@ private static ImmutableList<Case> getAllCases() {
HandlerCategory.NONIDEMPOTENT,
ExpectRetry.NO,
Behavior.DEFAULT_MORE_STRICT),
new Case(
ThrowableCategory
.STORAGE_EXCEPTION_0_CONNECTION_CLOSED_PREMATURELY_IO_CAUSE_NO_REASON,
HandlerCategory.IDEMPOTENT,
ExpectRetry.YES,
Behavior.SAME),
new Case(
ThrowableCategory
.STORAGE_EXCEPTION_0_CONNECTION_CLOSED_PREMATURELY_IO_CAUSE_NO_REASON,
HandlerCategory.NONIDEMPOTENT,
ExpectRetry.NO,
Behavior.DEFAULT_MORE_STRICT),
new Case(
ThrowableCategory.STORAGE_EXCEPTION_0_IO_PREMATURE_EOF,
HandlerCategory.IDEMPOTENT,
ExpectRetry.YES,
Behavior.SAME),
new Case(
ThrowableCategory.STORAGE_EXCEPTION_0_IO_PREMATURE_EOF,
HandlerCategory.NONIDEMPOTENT,
ExpectRetry.NO,
Behavior.DEFAULT_MORE_STRICT),
new Case(
ThrowableCategory.STORAGE_EXCEPTION_0_INTERNAL_ERROR,
HandlerCategory.IDEMPOTENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static Collection<Object[]> testCases() throws IOException {
.and(
(m, trc) ->
trc.getScenarioId()
< 7) // Temporarily exclude resumable media scenarios
!= 7) // Temporarily exclude resumable upload scenarios
)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.defaultSetup;
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.serviceAccount;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.google.cloud.storage.conformance.retry.RpcMethodMappings.Mappings.ObjectAcl;
import com.google.cloud.storage.conformance.retry.RpcMethodMappings.Mappings.Objects;
import com.google.cloud.storage.conformance.retry.RpcMethodMappings.Mappings.ServiceAccount;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
Expand Down Expand Up @@ -108,6 +110,11 @@
final class RpcMethodMappings {
private static final Logger LOGGER = Logger.getLogger(RpcMethodMappings.class.getName());

private static final Predicate<TestRetryConformance> groupIsDownload =
methodGroupIs("storage.objects.download");
private static final Predicate<TestRetryConformance> groupIsResumableUpload =
methodGroupIs("storage.resumable.upload");

static final int _2MiB = 2 * 1024 * 1024;
final Multimap<RpcMethod, RpcMethodMapping> funcMap;

Expand Down Expand Up @@ -1079,7 +1086,8 @@ private static void delete(ArrayList<RpcMethodMapping> a) {
private static void get(ArrayList<RpcMethodMapping> a) {
a.add(
RpcMethodMapping.newBuilder(39, objects.get)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withApplicable(
and(not(TestRetryConformance::isPreconditionsProvided), not(groupIsDownload)))
.withSetup(defaultSetup.andThen(Local.blobInfoWithoutGeneration))
.withTest(
(ctx, c) ->
Expand All @@ -1088,13 +1096,15 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(239, objects.get)
.withApplicable(TestRetryConformance::isPreconditionsProvided)
.withApplicable(
and(TestRetryConformance::isPreconditionsProvided, not(groupIsDownload)))
.withTest(
(ctx, c) ->
ctx.peek(state -> ctx.getStorage().get(state.getBlob().getBlobId())))
.build());
a.add(
RpcMethodMapping.newBuilder(40, objects.get)
.withApplicable(not(groupIsDownload))
.withTest(
(ctx, c) ->
ctx.map(
Expand All @@ -1108,6 +1118,7 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(41, objects.get)
.withApplicable(not(groupIsDownload))
.withTest(
(ctx, c) ->
ctx.map(
Expand Down Expand Up @@ -1196,7 +1207,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(60, objects.get)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withApplicable(
and(not(TestRetryConformance::isPreconditionsProvided), not(groupIsDownload)))
.withTest((ctx, c) -> ctx.peek(state -> assertTrue(state.getBlob().exists())))
.build());
a.add(
Expand Down Expand Up @@ -1297,10 +1309,12 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(75, objects.get)
.withApplicable(not(groupIsDownload))
.withTest((ctx, c) -> ctx.peek(state -> state.getBlob().reload()))
.build());
a.add(
RpcMethodMapping.newBuilder(76, objects.get)
.withApplicable(not(groupIsDownload))
.withTest(
(ctx, c) ->
ctx.peek(
Expand All @@ -1311,7 +1325,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(107, objects.get)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withApplicable(
and(not(TestRetryConformance::isPreconditionsProvided), not(groupIsDownload)))
.withTest(
(ctx, c) ->
ctx.map(state -> state.with(state.getBucket().get(c.getObjectName()))))
Expand All @@ -1321,7 +1336,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
private static void insert(ArrayList<RpcMethodMapping> a) {
a.add(
RpcMethodMapping.newBuilder(46, objects.insert)
.withApplicable(TestRetryConformance::isPreconditionsProvided)
.withApplicable(
and(TestRetryConformance::isPreconditionsProvided, not(groupIsResumableUpload)))
.withSetup(defaultSetup.andThen(Local.blobInfoWithGenerationZero))
.withTest(
(ctx, c) ->
Expand All @@ -1336,7 +1352,8 @@ private static void insert(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(47, objects.insert)
.withApplicable(TestRetryConformance::isPreconditionsProvided)
.withApplicable(
and(TestRetryConformance::isPreconditionsProvided, not(groupIsResumableUpload)))
.withSetup(defaultSetup.andThen(Local.blobInfoWithGenerationZero))
.withTest(
(ctx, c) ->
Expand Down Expand Up @@ -1932,4 +1949,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
private static void put(ArrayList<RpcMethodMapping> a) {}
}
}

private static Predicate<TestRetryConformance> methodGroupIs(String s) {
return (c) -> s.equals(c.getMethod().getGroup());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.conformance.storage.v1.InstructionList;
import com.google.cloud.conformance.storage.v1.Method;
import com.google.common.base.Joiner;
import com.google.common.base.Suppliers;
import com.google.errorprone.annotations.Immutable;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -37,7 +38,9 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* An individual resolved test case correlating config from {@link
Expand All @@ -58,13 +61,16 @@ final class TestRetryConformance {
BASE_ID = formatter.format(now).replaceAll("[:]", "").substring(0, 6);
}

private static final int _512KiB = 512 * 1024;
private static final int _8MiB = 8 * 1024 * 1024;

private final String projectId;
private final String bucketName;
private final String bucketName2;
private final String userProject;
private final String objectName;

private final byte[] helloWorldUtf8Bytes = "Hello, World!!!".getBytes(StandardCharsets.UTF_8);
private final Supplier<byte[]> lazyHelloWorldUtf8Bytes;
private final Path helloWorldFilePath = resolvePathForResource();
private final ServiceAccountCredentials serviceAccountCredentials =
resolveServiceAccountCredentials();
Expand Down Expand Up @@ -126,6 +132,33 @@ final class TestRetryConformance {
String.format(
"%s_s%03d-%s-m%03d_obj1",
BASE_ID, scenarioId, instructionsString.toLowerCase(), mappingId);
lazyHelloWorldUtf8Bytes =
Suppliers.memoize(
() -> {
// define a lazy supplier for bytes.
// Not all tests need data for an object, though some tests - resumable upload - needs
// more than 8MiB.
// We want to avoid allocating 8.1MiB for each test unnecessarily, especially since we
// instantiate all permuted test cases. ~1000 * 8.1MiB ~~ > 8GiB.
String helloWorld = "Hello, World!";
int baseDataSize;
switch (method.getName()) {
case "storage.objects.insert":
baseDataSize = _8MiB + 1;
break;
case "storage.objects.get":
baseDataSize = _512KiB;
break;
default:
baseDataSize = helloWorld.length();
break;
}
int endInclusive = (baseDataSize / helloWorld.length());
return IntStream.rangeClosed(1, endInclusive)
.mapToObj(i -> helloWorld)
.collect(Collectors.joining())
.getBytes(StandardCharsets.UTF_8);
});
}

public String getProjectId() {
Expand Down Expand Up @@ -153,7 +186,7 @@ public String getObjectName() {
}

public byte[] getHelloWorldUtf8Bytes() {
return helloWorldUtf8Bytes;
return lazyHelloWorldUtf8Bytes.get();
}

public Path getHelloWorldFilePath() {
Expand Down
Loading