result,
LongConsumer committedBytesCallback) {
- this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
+ this.session = ResumableSession.json(httpClientContext, retrier, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
this.open = true;
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncSessionClosedException.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncSessionClosedException.java
new file mode 100644
index 0000000000..982796f1c4
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncSessionClosedException.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.api.core.BetaApi;
+
+/**
+ * Root exception for async tasks which fail due to a session being closed.
+ *
+ * @see BlobReadSession
+ */
+@BetaApi
+public final class AsyncSessionClosedException extends RuntimeException {
+ AsyncSessionClosedException(String msg) {
+ super(msg);
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Backoff.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Backoff.java
new file mode 100644
index 0000000000..f127995772
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Backoff.java
@@ -0,0 +1,316 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.time.Duration.ZERO;
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Encapsulated class to track a timeout and calculate a backoff.
+ *
+ * Error tracking is explicitly not tracked here. This class only tracks elapsed duration and
+ * timeout and whether there is budget for backoff.
+ *
+ *
This class does not use a clock, instead everything is tracked as durations provided by the
+ * user. This has the advantage that tests of it and anything that depends on it are able to be 100%
+ * reproducible.
+ *
+ *
This class also allows for a jittering algorithm to be provided to it, rather than being hard
+ * coded against a random number generator like {@link ThreadLocalRandom}.
+ *
+ *
This class is not thread safe.
+ */
+final class Backoff {
+
+ private final Duration initialBackoff;
+ private final Duration maxBackoff;
+ private final Duration timeout;
+ private final double retryDelayMultiplier;
+ private final Jitterer jitterer;
+
+ private Duration cumulativeBackoff;
+ private Duration previousBackoff;
+
+ private Backoff(
+ Duration initialBackoff,
+ double backoffDelayMultiplier,
+ Duration maxBackoff,
+ Duration timeout,
+ Jitterer jitterer) {
+ this.initialBackoff = initialBackoff;
+ this.maxBackoff = maxBackoff;
+ this.timeout = timeout;
+ this.jitterer = jitterer;
+ this.retryDelayMultiplier = backoffDelayMultiplier;
+ this.cumulativeBackoff = ZERO;
+ this.previousBackoff = ZERO;
+ }
+
+ /**
+ * Compute the next backoff given the provide {@code elapsed} duration between any previous
+ * invocation and this one.
+ *
+ *
If there is remaining backoff budget, a backoff will be computed and returned as a {@link
+ * BackoffDuration}. If the backoff budget doesn't have enough to allow for another backoff an
+ * {@link BackoffResults#EXHAUSTED} will be returned.
+ *
+ *
{@code EXHAUSTED} can happen in the following circumstances
+ *
+ *
+ * - If the existing {@link #cumulativeBackoff} + {@code elapsed} is >= {@link #timeout}
+ *
+ */
+ BackoffResult nextBackoff(Duration elapsed) {
+ checkArgument(
+ Durations.gtEq(elapsed, ZERO), "elapsed must be >= PT0S (%s >= %s)", elapsed, ZERO);
+ Duration cumulativeAndElapsed = cumulativeBackoff.plus(elapsed);
+ cumulativeBackoff = cumulativeAndElapsed;
+ if (Durations.gtEq(cumulativeAndElapsed, timeout)) {
+ return BackoffResults.EXHAUSTED;
+ }
+ Duration nextDelay =
+ Duration.ofNanos(Math.round(previousBackoff.toNanos() * retryDelayMultiplier));
+ if (Durations.eq(nextDelay, ZERO)) {
+ nextDelay = initialBackoff;
+ }
+ Duration nextBackoffWithJitter = jitterer.jitter(nextDelay);
+ Duration remainingUtilTimeout = timeout.minus(cumulativeAndElapsed);
+ Duration cappedBackoff = Durations.min(nextBackoffWithJitter, maxBackoff, remainingUtilTimeout);
+ previousBackoff = cappedBackoff;
+
+ return BackoffDuration.of(cappedBackoff);
+ }
+
+ /**
+ * Reset all state.
+ *
+ * After calling this method, backoff durations will reset to their initial values.
+ */
+ void reset() {
+ cumulativeBackoff = ZERO;
+ previousBackoff = ZERO;
+ }
+
+ Duration getCumulativeBackoff() {
+ return cumulativeBackoff;
+ }
+
+ Duration getTimeout() {
+ return timeout;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Backoff)) {
+ return false;
+ }
+ Backoff backoff = (Backoff) o;
+ return Double.compare(retryDelayMultiplier, backoff.retryDelayMultiplier) == 0
+ && Objects.equals(initialBackoff, backoff.initialBackoff)
+ && Objects.equals(maxBackoff, backoff.maxBackoff)
+ && Objects.equals(timeout, backoff.timeout)
+ && Objects.equals(jitterer, backoff.jitterer)
+ && Objects.equals(cumulativeBackoff, backoff.cumulativeBackoff);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ initialBackoff, maxBackoff, timeout, retryDelayMultiplier, jitterer, cumulativeBackoff);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("previousBackoff", previousBackoff)
+ .add("cumulativeBackoff", cumulativeBackoff)
+ .add("initialBackoff", initialBackoff)
+ .add("maxBackoff", maxBackoff)
+ .add("timeout", timeout)
+ .add("retryDelayMultiplier", retryDelayMultiplier)
+ .add("jitterer", jitterer)
+ .toString();
+ }
+
+ /** Convenience method to create a Backoff from RetrySettings. */
+ static Backoff.Builder from(RetrySettings retrySettings) {
+ return newBuilder()
+ .setInitialBackoff(retrySettings.getInitialRetryDelayDuration())
+ .setRetryDelayMultiplier(retrySettings.getRetryDelayMultiplier())
+ .setMaxBackoff(retrySettings.getMaxRetryDelayDuration())
+ .setTimeout(retrySettings.getTotalTimeoutDuration());
+ }
+
+ static Builder newBuilder() {
+ return new Builder();
+ }
+
+ static final class Builder {
+ private Duration initialBackoff;
+ private Duration maxBackoff;
+ private Duration timeout;
+ private double retryDelayMultiplier;
+ private Jitterer jitterer;
+
+ private Builder() {}
+
+ Builder setInitialBackoff(Duration initialBackoff) {
+ this.initialBackoff = initialBackoff;
+ return this;
+ }
+
+ Builder setMaxBackoff(Duration maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ return this;
+ }
+
+ Builder setTimeout(Duration timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ Builder setRetryDelayMultiplier(double retryDelayMultiplier) {
+ this.retryDelayMultiplier = retryDelayMultiplier;
+ return this;
+ }
+
+ Builder setJitterer(Jitterer jitterer) {
+ this.jitterer = jitterer;
+ return this;
+ }
+
+ Backoff build() {
+ checkState(retryDelayMultiplier >= 1.0, "retryDelayMultiplier must be >= 1.0");
+ Duration effectiveTimeout = requireNonNull(timeout, "timeout must be non null");
+ if (Durations.ltEq(effectiveTimeout, ZERO)) {
+ effectiveTimeout = Durations.EFFECTIVE_INFINITY;
+ }
+ return new Backoff(
+ requireNonNull(initialBackoff, "initialBackoff must be non null"),
+ retryDelayMultiplier,
+ requireNonNull(maxBackoff, "maxBackoff must be non null"),
+ effectiveTimeout,
+ requireNonNull(jitterer, "jitterer must be non null"));
+ }
+ }
+
+ interface BackoffResult {
+ String errorString();
+ }
+
+ enum BackoffResults implements BackoffResult {
+ EXHAUSTED;
+
+ @Override
+ public String errorString() {
+ return name();
+ }
+ }
+
+ static final class BackoffDuration implements BackoffResult {
+ private final Duration duration;
+
+ private BackoffDuration(Duration duration) {
+ this.duration = duration;
+ }
+
+ Duration getDuration() {
+ return duration;
+ }
+
+ @Override
+ public String errorString() {
+ return duration.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BackoffDuration)) {
+ return false;
+ }
+ BackoffDuration that = (BackoffDuration) o;
+ return Objects.equals(duration, that.duration);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(duration);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("duration", duration).toString();
+ }
+
+ static BackoffDuration of(Duration duration) {
+ return new BackoffDuration(duration);
+ }
+ }
+
+ /** Simple API to allow for the definition of a Jittering algorithm. */
+ @FunctionalInterface
+ interface Jitterer {
+ Duration jitter(Duration baseline);
+
+ static Jitterer threadLocalRandom() {
+ return ThreadLocalRandomJitterer.INSTANCE;
+ }
+
+ @VisibleForTesting
+ static Jitterer noJitter() {
+ return NoJitter.INSTANCE;
+ }
+ }
+
+ private static final class ThreadLocalRandomJitterer implements Jitterer {
+ private static final ThreadLocalRandomJitterer INSTANCE = new ThreadLocalRandomJitterer();
+
+ @Override
+ public Duration jitter(Duration baseline) {
+ if (Durations.gt(baseline, ZERO)) {
+ long nanos = baseline.toNanos();
+ long randNanos = ThreadLocalRandom.current().nextLong(nanos);
+ return baseline.plusNanos(randNanos);
+ }
+ return baseline;
+ }
+ }
+
+ private static final class NoJitter implements Jitterer {
+ private static final NoJitter INSTANCE = new NoJitter();
+
+ @Override
+ public Duration jitter(Duration baseline) {
+ return baseline;
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BackwardCompatibilityUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BackwardCompatibilityUtils.java
index 4b63b95a2b..f996b252dd 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BackwardCompatibilityUtils.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BackwardCompatibilityUtils.java
@@ -82,8 +82,9 @@ private static LifecycleRule deleteRuleEncode(DeleteRule from) {
String msg =
"The lifecycle condition "
+ resolveRuleActionType(from)
- + " is not currently supported. Please update to the latest version of google-cloud-java."
- + " Also, use LifecycleRule rather than the deprecated DeleteRule.";
+ + " is not currently supported. Please update to the latest version of"
+ + " google-cloud-java. Also, use LifecycleRule rather than the deprecated"
+ + " DeleteRule.";
// manually construct a log record, so we maintain class name and method name
// from the old implicit values.
LogRecord record = new LogRecord(Level.WARNING, msg);
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java
new file mode 100644
index 0000000000..c934a21b98
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java
@@ -0,0 +1,640 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.InternalApi;
+import com.google.api.core.InternalExtensionOnly;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
+import com.google.cloud.storage.RetryContext.OnFailure;
+import com.google.cloud.storage.RetryContext.OnSuccess;
+import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
+import com.google.cloud.storage.ZeroCopySupport.DisposableByteString;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.storage.v2.ReadRange;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@InternalApi
+@InternalExtensionOnly
+abstract class BaseObjectReadSessionStreamRead
+ implements ObjectReadSessionStreamRead {
+
+ protected final RangeSpec rangeSpec;
+ protected final RetryContext retryContext;
+ protected final AtomicLong readOffset;
+ protected boolean closed;
+ protected boolean tombstoned;
+ protected IOAutoCloseable onCloseCallback;
+
+ BaseObjectReadSessionStreamRead(
+ RangeSpec rangeSpec, RetryContext retryContext, IOAutoCloseable onCloseCallback) {
+ this(rangeSpec, new AtomicLong(rangeSpec.begin()), retryContext, onCloseCallback, false);
+ }
+
+ BaseObjectReadSessionStreamRead(
+ RangeSpec rangeSpec,
+ AtomicLong readOffset,
+ RetryContext retryContext,
+ IOAutoCloseable onCloseCallback,
+ boolean closed) {
+ this.rangeSpec = rangeSpec;
+ this.retryContext = retryContext;
+ this.readOffset = readOffset;
+ this.closed = closed;
+ this.tombstoned = false;
+ this.onCloseCallback = onCloseCallback;
+ }
+
+ abstract long readId();
+
+ @Override
+ public long readOffset() {
+ return readOffset.get();
+ }
+
+ @Override
+ public final void preFail() {
+ tombstoned = true;
+ }
+
+ @Override
+ public final ReadRange makeReadRange() {
+ long currentOffset = readOffset.get();
+ ReadRange.Builder b = ReadRange.newBuilder().setReadId(readId()).setReadOffset(currentOffset);
+ rangeSpec
+ .maxLength()
+ .ifPresent(
+ length -> {
+ long readSoFar = currentOffset - rangeSpec.begin();
+ b.setReadLength(length - readSoFar);
+ });
+ return b.build();
+ }
+
+ @Override
+ public void recordError(T t, OnSuccess onSuccess, OnFailure onFailure) {
+ retryContext.recordError(t, onSuccess, onFailure);
+ }
+
+ @Override
+ public boolean readyToSend() {
+ return !tombstoned && !retryContext.inBackoff();
+ }
+
+ @Override
+ public boolean canShareStreamWith(ObjectReadSessionStreamRead> other) {
+ return this.getClass() == other.getClass();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ try {
+ internalClose();
+ } finally {
+ onCloseCallback.close();
+ }
+ }
+
+ @Override
+ public void setOnCloseCallback(IOAutoCloseable onCloseCallback) {
+ this.onCloseCallback = this.onCloseCallback.andThen(onCloseCallback);
+ }
+
+ /** Base class of a read that will accumulate before completing by resolving a future */
+ abstract static class AccumulatingRead
+ extends BaseObjectReadSessionStreamRead> implements ApiFuture {
+ protected final List childRefs;
+ protected final SettableApiFuture complete;
+ protected final long readId;
+ protected final Hasher hasher;
+
+ private AccumulatingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ RetryContext retryContext,
+ IOAutoCloseable onCloseCallback) {
+ super(rangeSpec, retryContext, onCloseCallback);
+ this.readId = readId;
+ this.hasher = hasher;
+ this.complete = SettableApiFuture.create();
+ this.childRefs = Collections.synchronizedList(new ArrayList<>());
+ }
+
+ private AccumulatingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ List childRefs,
+ AtomicLong readOffset,
+ RetryContext retryContext,
+ boolean closed,
+ SettableApiFuture complete,
+ IOAutoCloseable onCloseCallback) {
+ super(rangeSpec, readOffset, retryContext, onCloseCallback, closed);
+ this.readId = readId;
+ this.childRefs = childRefs;
+ this.complete = complete;
+ this.hasher = hasher;
+ }
+
+ @Override
+ long readId() {
+ return readId;
+ }
+
+ @Override
+ public boolean acceptingBytes() {
+ return !complete.isDone() && !tombstoned;
+ }
+
+ @Override
+ public void accept(ChildRef childRef) throws IOException {
+ retryContext.reset();
+ int size = childRef.byteString().size();
+ childRefs.add(childRef);
+ readOffset.addAndGet(size);
+ }
+
+ @Override
+ public ApiFuture> fail(Throwable t) {
+ try {
+ tombstoned = true;
+ close();
+ } catch (IOException e) {
+ t.addSuppressed(e);
+ } finally {
+ complete.setException(t);
+ }
+ return complete;
+ }
+
+ @Override
+ public Hasher hasher() {
+ return hasher;
+ }
+
+ @Override
+ public void internalClose() throws IOException {
+ if (!closed) {
+ retryContext.reset();
+ closed = true;
+ GrpcUtils.closeAll(childRefs);
+ }
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ complete.addListener(listener, executor);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!complete.isCancelled()) {
+ fail(new CancellationException());
+ }
+ return complete.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public Result get() throws InterruptedException, ExecutionException {
+ return complete.get();
+ }
+
+ @Override
+ public Result get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return complete.get(timeout, unit);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return complete.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return complete.isDone();
+ }
+
+ @Override
+ public boolean canShareStreamWith(ObjectReadSessionStreamRead> other) {
+ return other instanceof AccumulatingRead;
+ }
+ }
+
+ /**
+ * Base class of a read that will be processed in a streaming manner (e.g. {@link
+ * ReadableByteChannel})
+ */
+ static class StreamingRead extends BaseObjectReadSessionStreamRead
+ implements UnbufferedReadableByteChannel {
+
+ private final Hasher hasher;
+ private final SettableApiFuture failFuture;
+ private final BlockingQueue queue;
+
+ private AtomicLong readId;
+ private boolean complete;
+ @Nullable private ChildRefHelper leftovers;
+
+ StreamingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ RetryContext retryContext,
+ IOAutoCloseable onCloseCallback) {
+ super(rangeSpec, retryContext, onCloseCallback);
+ this.readId = new AtomicLong(readId);
+ this.hasher = hasher;
+ this.closed = false;
+ this.failFuture = SettableApiFuture.create();
+ this.queue = new ArrayBlockingQueue<>(2);
+ this.complete = false;
+ this.leftovers = null;
+ }
+
+ @Override
+ long readId() {
+ return readId.get();
+ }
+
+ @Override
+ public Hasher hasher() {
+ return hasher;
+ }
+
+ @Override
+ public boolean acceptingBytes() {
+ return !closed && !tombstoned;
+ }
+
+ @Override
+ public void accept(ChildRef childRef) throws IOException {
+ retryContext.reset();
+ int size = childRef.byteString().size();
+ offer(childRef);
+ readOffset.addAndGet(size);
+ }
+
+ @Override
+ public void eof() throws IOException {
+ retryContext.reset();
+ offer(EofMarker.INSTANCE);
+ }
+
+ @Override
+ public ApiFuture> fail(Throwable t) {
+ try {
+ offer(new SmuggledFailure(t));
+ failFuture.set(null);
+ } catch (InterruptedIOException e) {
+ Thread.currentThread().interrupt();
+ failFuture.setException(e);
+ }
+ return failFuture;
+ }
+
+ @Override
+ public StreamingRead withNewReadId(long newReadId) {
+ readId.set(newReadId);
+ return this;
+ }
+
+ @Override
+ public boolean canShareStreamWith(ObjectReadSessionStreamRead> other) {
+ return false;
+ }
+
+ @Override
+ public void internalClose() throws IOException {
+ if (!closed) {
+ retryContext.reset();
+ closed = true;
+ if (leftovers != null) {
+ leftovers.ref.close();
+ }
+ GrpcUtils.closeAll(queue);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ @Override
+ public UnbufferedReadableByteChannel project() {
+ return this;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return Math.toIntExact(read(new ByteBuffer[] {dst}, 0, 1));
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts) throws IOException {
+ return read(dsts, 0, dsts.length);
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+ if (closed) {
+ throw new ClosedChannelException();
+ }
+ if (complete) {
+ close();
+ return -1;
+ }
+
+ long read = 0;
+ long dstsRemaining = Buffers.totalRemaining(dsts, offset, length);
+ if (leftovers != null) {
+ read += leftovers.copy(dsts, offset, length);
+ if (!leftovers.hasRemaining()) {
+ leftovers.ref.close();
+ leftovers = null;
+ }
+ }
+
+ Object poll;
+ while (read < dstsRemaining && (poll = queue.poll()) != null) {
+ if (poll instanceof ChildRef) {
+ ChildRefHelper ref = new ChildRefHelper((ChildRef) poll);
+ read += ref.copy(dsts, offset, length);
+ if (ref.hasRemaining()) {
+ leftovers = ref;
+ break;
+ } else {
+ ref.ref.close();
+ }
+ } else if (poll == EofMarker.INSTANCE) {
+ complete = true;
+ if (read == 0) {
+ close();
+ return -1;
+ }
+ break;
+ } else if (poll instanceof SmuggledFailure) {
+ SmuggledFailure throwable = (SmuggledFailure) poll;
+ close();
+ BaseServiceException coalesce = StorageException.coalesce(throwable.getSmuggled());
+ throw new IOException(coalesce);
+ } else {
+ //noinspection DataFlowIssue
+ Preconditions.checkState(
+ false, "unhandled queue element type %s", poll.getClass().getName());
+ }
+ }
+
+ return read;
+ }
+
+ private void offer(Closeable offer) throws InterruptedIOException {
+ try {
+ queue.put(offer);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ }
+
+ /**
+ * The queue items are added to is a queue of {@link Closeable}. This class smuggles a Throwable
+ * in a no-op Closable, such that the throwable can be in the queue.
+ *
+ * Refer to {@link #fail(Throwable)} to see where this class is instantiated.
+ */
+ static final class SmuggledFailure implements Closeable {
+ private final Throwable smuggled;
+
+ private SmuggledFailure(Throwable smuggled) {
+ this.smuggled = smuggled;
+ }
+
+ Throwable getSmuggled() {
+ return smuggled;
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ static final class ChildRefHelper {
+ private final ChildRef ref;
+
+ private final List buffers;
+
+ private ChildRefHelper(ChildRef ref) {
+ this.ref = ref;
+ this.buffers = ref.byteString().asReadOnlyByteBufferList();
+ }
+
+ long copy(ByteBuffer[] dsts, int offset, int length) {
+ long copied = 0;
+ for (ByteBuffer b : buffers) {
+ long copiedBytes = Buffers.copy(b, dsts, offset, length);
+ copied += copiedBytes;
+ if (b.hasRemaining()) break;
+ }
+ return copied;
+ }
+
+ boolean hasRemaining() {
+ for (ByteBuffer b : buffers) {
+ if (b.hasRemaining()) return true;
+ }
+ return false;
+ }
+ }
+
+ private static final class EofMarker implements Closeable {
+ private static final EofMarker INSTANCE = new EofMarker();
+
+ private EofMarker() {}
+
+ @Override
+ public void close() {}
+ }
+ }
+
+ static final class ByteArrayAccumulatingRead extends AccumulatingRead {
+
+ ByteArrayAccumulatingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ RetryContext retryContext,
+ IOAutoCloseable onCloseCallback) {
+ super(readId, rangeSpec, hasher, retryContext, onCloseCallback);
+ }
+
+ private ByteArrayAccumulatingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ List childRefs,
+ RetryContext retryContext,
+ AtomicLong readOffset,
+ boolean closed,
+ SettableApiFuture complete,
+ IOAutoCloseable onCloseCallback) {
+ super(
+ readId,
+ rangeSpec,
+ hasher,
+ childRefs,
+ readOffset,
+ retryContext,
+ closed,
+ complete,
+ onCloseCallback);
+ }
+
+ @Override
+ public ApiFuture project() {
+ return this;
+ }
+
+ @Override
+ public void eof() throws IOException {
+ retryContext.reset();
+ try {
+ ByteString base = ByteString.empty();
+ for (ChildRef ref : childRefs) {
+ base = base.concat(ref.byteString());
+ }
+ complete.set(base.toByteArray());
+ } finally {
+ close();
+ }
+ }
+
+ @Override
+ public ByteArrayAccumulatingRead withNewReadId(long newReadId) {
+ this.tombstoned = true;
+ return new ByteArrayAccumulatingRead(
+ newReadId,
+ rangeSpec,
+ hasher,
+ childRefs,
+ retryContext,
+ readOffset,
+ closed,
+ complete,
+ onCloseCallback);
+ }
+ }
+
+ static final class ZeroCopyByteStringAccumulatingRead
+ extends AccumulatingRead implements DisposableByteString {
+
+ private volatile ByteString byteString;
+
+ ZeroCopyByteStringAccumulatingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ RetryContext retryContext,
+ IOAutoCloseable onCloseCallback) {
+ super(readId, rangeSpec, hasher, retryContext, onCloseCallback);
+ }
+
+ public ZeroCopyByteStringAccumulatingRead(
+ long readId,
+ RangeSpec rangeSpec,
+ Hasher hasher,
+ List childRefs,
+ AtomicLong readOffset,
+ RetryContext retryContext,
+ boolean closed,
+ SettableApiFuture complete,
+ ByteString byteString,
+ IOAutoCloseable onCloseCallback) {
+ super(
+ readId,
+ rangeSpec,
+ hasher,
+ childRefs,
+ readOffset,
+ retryContext,
+ closed,
+ complete,
+ onCloseCallback);
+ this.byteString = byteString;
+ }
+
+ @Override
+ public ApiFuture project() {
+ return this;
+ }
+
+ @Override
+ public ByteString byteString() {
+ return byteString;
+ }
+
+ @Override
+ public void eof() throws IOException {
+ retryContext.reset();
+ ByteString base = ByteString.empty();
+ for (ChildRef ref : childRefs) {
+ base = base.concat(ref.byteString());
+ }
+ byteString = base;
+ complete.set(this);
+ }
+
+ @Override
+ public ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
+ this.tombstoned = true;
+ return new ZeroCopyByteStringAccumulatingRead(
+ newReadId,
+ rangeSpec,
+ hasher,
+ childRefs,
+ readOffset,
+ retryContext,
+ closed,
+ complete,
+ byteString,
+ onCloseCallback);
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
index e8ba2f3c61..b0e5ce639d 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
@@ -88,8 +88,8 @@ WriterFactory createFactory(Clock clock) throws IOException {
}
@InternalApi
- private static final class Factory implements WriterFactory {
- private static final Conversions.Decoder
+ static final class Factory implements WriterFactory {
+ static final Conversions.Decoder
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER =
Conversions.grpc().blobInfo().compose(BidiWriteObjectResponse::getResource);
@@ -122,7 +122,7 @@ public WritableByteChannelSession, BlobInfo> writeSession(
.setByteStringStrategy(ByteStringStrategy.copy())
.resumable()
.withRetryConfig(
- grpc.getOptions(), grpc.retryAlgorithmManager.idempotent())
+ grpc.retrier.withAlg(grpc.retryAlgorithmManager.idempotent()))
.buffered(BufferHandle.allocate(bufferSize))
.setStartAsync(startResumableWrite)
.build();
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
index 0f5a378f80..18e7cfff96 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
@@ -94,3 +94,73 @@ static BidiResumableWrite identity(BidiResumableWrite w) {
return w;
}
}
+
+final class BidiAppendableWrite implements BidiWriteObjectRequestBuilderFactory {
+
+ private final BidiWriteObjectRequest req;
+
+ public BidiAppendableWrite(BidiWriteObjectRequest req) {
+ this(req, false);
+ }
+
+ public BidiAppendableWrite(BidiWriteObjectRequest req, boolean takeOver) {
+ if (takeOver) {
+ this.req = req;
+ } else {
+ req =
+ req.toBuilder()
+ .setWriteObjectSpec(req.getWriteObjectSpec().toBuilder().setAppendable(true).build())
+ .build();
+ this.req = req;
+ }
+ }
+
+ public BidiWriteObjectRequest getReq() {
+ return req;
+ }
+
+ @Override
+ public BidiWriteObjectRequest.Builder newBuilder() {
+ return req.toBuilder();
+ }
+
+ @Override
+ public @Nullable String bucketName() {
+ if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
+ return req.getWriteObjectSpec().getResource().getBucket();
+ } else if (req.hasAppendObjectSpec()) {
+ return req.getAppendObjectSpec().getBucket();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "BidiAppendableWrite{" + "req=" + fmtProto(req) + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BidiAppendableWrite)) {
+ return false;
+ }
+ BidiAppendableWrite BidiAppendableWrite = (BidiAppendableWrite) o;
+ return Objects.equals(req, BidiAppendableWrite.getReq());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(req);
+ }
+
+ /**
+ * Helper function which is more specific than {@link Function#identity()}. Constraining the input
+ * and output to be exactly {@link BidiAppendableWrite}.
+ */
+ static BidiAppendableWrite identity(BidiAppendableWrite w) {
+ return w;
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java
index a458c079c2..7a11e0c5a7 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java
@@ -80,7 +80,6 @@ public String toString() {
interface BidiWriteObjectRequestBuilderFactory {
BidiWriteObjectRequest.Builder newBuilder();
- @Nullable
- String bucketName();
+ @Nullable String bucketName();
}
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java
index b84fcef6a5..8a6c3d1b7f 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java
@@ -402,7 +402,9 @@ public Builder setStorageClass(StorageClass storageClass) {
return this;
}
- /** @deprecated Use {@link #setTimeStorageClassUpdatedOffsetDateTime(OffsetDateTime)} */
+ /**
+ * @deprecated Use {@link #setTimeStorageClassUpdatedOffsetDateTime(OffsetDateTime)}
+ */
@Override
@Deprecated
public Builder setTimeStorageClassUpdated(Long timeStorageClassUpdated) {
@@ -423,7 +425,9 @@ Builder setMetageneration(Long metageneration) {
return this;
}
- /** @deprecated Use {@link #setDeleteTimeOffsetDateTime(OffsetDateTime)} */
+ /**
+ * @deprecated Use {@link #setDeleteTimeOffsetDateTime(OffsetDateTime)}
+ */
@Override
@Deprecated
Builder setDeleteTime(Long deleteTime) {
@@ -437,7 +441,9 @@ BlobInfo.Builder setDeleteTimeOffsetDateTime(OffsetDateTime deleteTime) {
return this;
}
- /** @deprecated Use {@link #setUpdateTimeOffsetDateTime(OffsetDateTime)} */
+ /**
+ * @deprecated Use {@link #setUpdateTimeOffsetDateTime(OffsetDateTime)}
+ */
@Override
@Deprecated
Builder setUpdateTime(Long updateTime) {
@@ -464,7 +470,9 @@ BlobInfo.Builder setCreateTimeOffsetDateTime(OffsetDateTime createTime) {
return this;
}
- /** @deprecated Use {@link #setCustomTimeOffsetDateTime(OffsetDateTime)} */
+ /**
+ * @deprecated Use {@link #setCustomTimeOffsetDateTime(OffsetDateTime)}
+ */
@Override
@Deprecated
public Builder setCustomTime(Long customTime) {
@@ -508,7 +516,9 @@ public Builder setTemporaryHold(Boolean temporaryHold) {
return this;
}
- /** @deprecated Use {@link #setRetentionExpirationTimeOffsetDateTime(OffsetDateTime)} */
+ /**
+ * @deprecated Use {@link #setRetentionExpirationTimeOffsetDateTime(OffsetDateTime)}
+ */
@Override
@Deprecated
Builder setRetentionExpirationTime(Long retentionExpirationTime) {
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java
new file mode 100644
index 0000000000..b79a290969
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalExtensionOnly;
+import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Interface representing those methods which can be used to write to and interact with an
+ * appendable upload.
+ *
+ * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
+ */
+@BetaApi
+@InternalExtensionOnly
+public interface BlobAppendableUpload extends BlobWriteSession {
+
+ /**
+ * Open the {@link AppendableUploadWriteableByteChannel AppendableUploadWriteableByteChannel} for
+ * this session.
+ *
+ * A session may only be {@code open}ed once. If multiple calls to open are made, an illegal
+ * state exception will be thrown
+ *
+ *
The returned {@code AppendableUploadWriteableByteChannel} can throw IOExceptions from any of
+ * its usual methods. Any {@link IOException} thrown can have a cause of a {@link
+ * StorageException}. However, not all {@code IOExceptions} will have {@code StorageException}s.
+ *
+ * @throws IOException When creating the {@link AppendableUploadWriteableByteChannel} if an
+ * unrecoverable underlying IOException occurs it can be rethrown
+ * @throws IllegalStateException if open is called more than once
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @Override
+ AppendableUploadWriteableByteChannel open() throws IOException;
+
+ /**
+ * Return an {@link ApiFuture}{@code } which will represent the state of the object in
+ * Google Cloud Storage.
+ *
+ * This future will not resolve until:
+ *
+ *
+ * - The object is successfully finalized in Google Cloud Storage by calling {@link
+ * AppendableUploadWriteableByteChannel#finalizeAndClose()
+ * AppendableUploadWriteableByteChannel#finalizeAndClose()}
+ *
- This session is detached from the upload without finalizing by calling {@link
+ * AppendableUploadWriteableByteChannel#closeWithoutFinalizing()
+ * AppendableUploadWriteableByteChannel#closeWithoutFinalizing()}
+ *
- The session is closed by calling {@link AppendableUploadWriteableByteChannel#close()
+ * AppendableUploadWriteableByteChannel#close()}
+ *
- A terminal failure occurs, the terminal failure will become the exception result
+ *
+ *
+ * NOTICE: Some fields may not be populated unless finalization has completed.
+ *
+ *
If a terminal failure is encountered, calling either {@link ApiFuture#get()} or {@link
+ * ApiFuture#get(long, TimeUnit)} will result in an {@link
+ * java.util.concurrent.ExecutionException} with the cause.
+ *
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @Override
+ ApiFuture getResult();
+
+ /**
+ * The {@link WritableByteChannel} returned from {@link BlobAppendableUpload#open()}.
+ *
+ * This interface allows writing bytes to an Appendable Upload, and provides methods to close
+ * this channel -- optionally finalizing the upload.
+ *
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ @InternalExtensionOnly
+ interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
+
+ /**
+ * Finalize the upload and close this instance to further {@link #write(ByteBuffer)}ing. This
+ * will close any underlying stream and release any releasable resources once out of scope.
+ *
+ *
Once this method is called, and returns no more writes to the object will be allowed by
+ * GCS.
+ *
+ *
This method and {@link #close()} are mutually exclusive. If one of the other methods are
+ * called before this method, this method will be a no-op.
+ *
+ * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
+ * @see BlobAppendableUploadConfig.CloseAction#FINALIZE_WHEN_CLOSING
+ * @see BlobAppendableUploadConfig#getCloseAction()
+ * @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ void finalizeAndClose() throws IOException;
+
+ /**
+ * Close this instance to further {@link #write(ByteBuffer)}ing without finalizing the upload.
+ * This will close any underlying stream and release any releasable resources once out of scope.
+ *
+ *
This method, {@link AppendableUploadWriteableByteChannel#finalizeAndClose()} and {@link
+ * AppendableUploadWriteableByteChannel#close()} are mutually exclusive. If one of the other
+ * methods are called before this method, this method will be a no-op.
+ *
+ * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
+ * @see BlobAppendableUploadConfig.CloseAction#CLOSE_WITHOUT_FINALIZING
+ * @see BlobAppendableUploadConfig#getCloseAction()
+ * @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ void closeWithoutFinalizing() throws IOException;
+
+ /**
+ * Close this instance to further {@link #write(ByteBuffer)}ing.
+ *
+ *
Whether the upload is finalized during this depends on the {@link
+ * BlobAppendableUploadConfig#getCloseAction()} provided to create the {@link
+ * BlobAppendableUpload}. If {@link BlobAppendableUploadConfig#getCloseAction()}{@code ==
+ * }{@link CloseAction#FINALIZE_WHEN_CLOSING}, {@link #finalizeAndClose()} will be called. If
+ * {@link BlobAppendableUploadConfig#getCloseAction()}{@code == }{@link
+ * CloseAction#CLOSE_WITHOUT_FINALIZING}, {@link #closeWithoutFinalizing()} will be called.
+ *
+ * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
+ * @see BlobAppendableUploadConfig#getCloseAction()
+ * @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ void close() throws IOException;
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java
new file mode 100644
index 0000000000..ae95356d74
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.ByteSizeConstants._256KiB;
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.core.ApiFutures;
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
+import com.google.api.gax.rpc.AbortedException;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
+import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import com.google.cloud.storage.TransportCompatibility.Transport;
+import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
+import com.google.cloud.storage.UnifiedOpts.Opts;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.BidiWriteObjectResponse;
+import com.google.storage.v2.Object;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Configuration parameters for an appendable uploads channel.
+ *
+ *
Instances of this class are immutable and thread safe.
+ *
+ * @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+@Immutable
+@BetaApi
+@TransportCompatibility({Transport.GRPC})
+public final class BlobAppendableUploadConfig {
+
+ private static final BlobAppendableUploadConfig INSTANCE =
+ new BlobAppendableUploadConfig(
+ FlushPolicy.minFlushSize(_256KiB),
+ Hasher.enabled(),
+ CloseAction.CLOSE_WITHOUT_FINALIZING);
+
+ private final FlushPolicy flushPolicy;
+ private final Hasher hasher;
+ private final CloseAction closeAction;
+
+ private BlobAppendableUploadConfig(
+ FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction) {
+ this.flushPolicy = flushPolicy;
+ this.hasher = hasher;
+ this.closeAction = closeAction;
+ }
+
+ /**
+ * The {@link FlushPolicy} which will be used to determine when and how many bytes to flush to
+ * GCS.
+ *
+ *
Default: {@link FlushPolicy#minFlushSize(int) FlushPolicy.minFlushSize(256 * 1024)}
+ *
+ * @see #withFlushPolicy(FlushPolicy)
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ public FlushPolicy getFlushPolicy() {
+ return flushPolicy;
+ }
+
+ /**
+ * Return an instance with the {@code FlushPolicy} set to be the specified value.
+ *
+ *
Default: {@link FlushPolicy#minFlushSize(int) FlushPolicy.minFlushSize(256 * 1024)}
+ *
+ * @see #getFlushPolicy()
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
+ requireNonNull(flushPolicy, "flushPolicy must be non null");
+ if (this.flushPolicy.equals(flushPolicy)) {
+ return this;
+ }
+ return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
+ }
+
+ /**
+ * The {@link CloseAction} which will dictate the behavior of {@link
+ * AppendableUploadWriteableByteChannel#close()}.
+ *
+ *
Default: {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
+ *
+ * @see #withCloseAction(CloseAction)
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ public CloseAction getCloseAction() {
+ return closeAction;
+ }
+
+ /**
+ * Return an instance with the {@code CloseAction} set to be the specified value. Default:
+ * {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
+ *
+ * @see #getCloseAction()
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
+ requireNonNull(closeAction, "closeAction must be non null");
+ if (this.closeAction == closeAction) {
+ return this;
+ }
+ return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
+ }
+
+ /**
+ * Whether crc32c validation will be performed for bytes returned by Google Cloud Storage
+ *
+ *
Default: {@code true}
+ *
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ boolean getCrc32cValidationEnabled() {
+ return Hasher.enabled().equals(hasher);
+ }
+
+ /**
+ * Return an instance with crc32c validation enabled based on {@code enabled}.
+ *
+ *
Default: {@code true}
+ *
+ * @param enabled Whether crc32c validation will be performed for bytes returned by Google Cloud
+ * Storage
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
+ if (enabled && Hasher.enabled().equals(hasher)) {
+ return this;
+ } else if (!enabled && Hasher.noop().equals(hasher)) {
+ return this;
+ }
+ return new BlobAppendableUploadConfig(
+ flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction);
+ }
+
+ /** Never to be made public until {@link Hasher} is public */
+ @InternalApi
+ Hasher getHasher() {
+ return hasher;
+ }
+
+ /**
+ * Default instance factory method.
+ *
+ *
The {@link FlushPolicy} of this instance is equivalent to the following:
+ *
+ *
{@code
+ * BlobAppendableUploadConfig.of()
+ * .withFlushPolicy(FlushPolicy.minFlushSize(256 * 1024))
+ * .withCloseAction(CloseAction.CLOSE_WITHOUT_FINALIZING)
+ * }
+ *
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ * @see FlushPolicy#minFlushSize(int)
+ */
+ @BetaApi
+ public static BlobAppendableUploadConfig of() {
+ return INSTANCE;
+ }
+
+ /**
+ * Enum providing the possible actions which can be taken during the {@link
+ * AppendableUploadWriteableByteChannel#close()} call.
+ *
+ * @see AppendableUploadWriteableByteChannel#close()
+ * @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
+ * @see BlobAppendableUploadConfig#getCloseAction()
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ public enum CloseAction {
+ /**
+ * Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the
+ * appendable upload should be finalized.
+ *
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ * @see AppendableUploadWriteableByteChannel#finalizeAndClose()
+ */
+ @BetaApi
+ FINALIZE_WHEN_CLOSING,
+ /**
+ * Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the
+ * appendable upload should NOT be finalized, allowing for takeover by another session or
+ * client.
+ *
+ * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ * @see AppendableUploadWriteableByteChannel#closeWithoutFinalizing()
+ */
+ @BetaApi
+ CLOSE_WITHOUT_FINALIZING
+ }
+
+ BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts opts) {
+ boolean takeOver = info.getGeneration() != null;
+ BidiWriteObjectRequest req =
+ takeOver
+ ? storage.getBidiWriteObjectRequestForTakeover(info, opts)
+ : storage.getBidiWriteObjectRequest(info, opts);
+
+ BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
+
+ WritableByteChannelSession
+ build =
+ ResumableMedia.gapic()
+ .write()
+ .bidiByteChannel(storage.storageClient.bidiWriteObjectCallable())
+ .setHasher(this.getHasher())
+ .setByteStringStrategy(ByteStringStrategy.copy())
+ .appendable()
+ .withRetryConfig(
+ storage.retrier.withAlg(
+ new BasicResultRetryAlgorithm