Skip to content

Commit b426e7d

Browse files
committed
chore: add MinFlushSizeFlushPolicy
Update appendable uploads to use min flush size instead of max flush size.
1 parent c601cca commit b426e7d

10 files changed

+986
-99
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/AppendableBlobUploadConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import static com.google.cloud.storage.ByteSizeConstants._2MiB;
19+
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
2020
import static java.util.Objects.requireNonNull;
2121

2222
import com.google.api.core.BetaApi;
@@ -38,7 +38,7 @@
3838
public final class AppendableBlobUploadConfig {
3939

4040
private static final AppendableBlobUploadConfig INSTANCE =
41-
new AppendableBlobUploadConfig(FlushPolicy.maxFlushSize(_2MiB));
41+
new AppendableBlobUploadConfig(FlushPolicy.minFlushSize(_256KiB));
4242

4343
private final FlushPolicy flushPolicy;
4444

@@ -50,8 +50,7 @@ private AppendableBlobUploadConfig(FlushPolicy flushPolicy) {
5050
* The {@link FlushPolicy} which will be used to determine when and how many bytes to flush to
5151
* GCS.
5252
*
53-
* <p><i>Default:</i> {@link FlushPolicy#maxFlushSize(int) FlushPolicy.maxFlushSize(2 * 1024 *
54-
* 1024)}
53+
* <p><i>Default:</i> {@link FlushPolicy#minFlushSize(int) FlushPolicy.minFlushSize(256 * 1024)}
5554
*
5655
* @see #withFlushPolicy(FlushPolicy)
5756
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
@@ -64,6 +63,8 @@ public FlushPolicy getFlushPolicy() {
6463
/**
6564
* Return an instance with the {@code FlushPolicy} set to be the specified value.
6665
*
66+
* <p><i>Default:</i> {@link FlushPolicy#minFlushSize(int) FlushPolicy.minFlushSize(256 * 1024)}
67+
*
6768
* @see #getFlushPolicy()
6869
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
6970
*/
@@ -83,10 +84,11 @@ public AppendableBlobUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
8384
*
8485
* <pre>{@code
8586
* AppendableBlobUploadConfig.of()
86-
* .withFlushPolicy(FlushPolicy.maxFlushSize(2 * 1024 * 1024))
87+
* .withFlushPolicy(FlushPolicy.minFlushSize(256 * 1024))
8788
* }</pre>
8889
*
8990
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
91+
* @see FlushPolicy#minFlushSize(int)
9092
*/
9193
@BetaApi
9294
public static AppendableBlobUploadConfig of() {

google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ static int position(Buffer b) {
6161
return b.position();
6262
}
6363

64+
static int remaining(Buffer b) {
65+
return b.remaining();
66+
}
67+
68+
static boolean hasRemaining(Buffer b) {
69+
return b.hasRemaining();
70+
}
71+
72+
static void compact(ByteBuffer b) {
73+
b.compact();
74+
}
75+
6476
/** attempt to drain all of {@code content} into {@code dst} */
6577
static long copy(ByteBuffer content, ByteBuffer dst) {
6678
return copy(content, new ByteBuffer[] {dst}, 0, 1);

google-cloud-storage/src/main/java/com/google/cloud/storage/FlushPolicy.java

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.google.api.core.BetaApi;
2222
import com.google.api.core.InternalExtensionOnly;
23-
import com.google.cloud.storage.GapicBidiWritableByteChannelSessionBuilder.AppendableUploadBuilder;
24-
import com.google.cloud.storage.GapicBidiWritableByteChannelSessionBuilder.AppendableUploadBuilder.BufferedAppendableUploadBuilder;
23+
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
24+
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2525
import com.google.common.base.MoreObjects;
2626
import com.google.common.base.Preconditions;
2727
import java.util.Objects;
@@ -63,7 +63,29 @@ public static MaxFlushSizeFlushPolicy maxFlushSize(int maxFlushSize) {
6363
return maxFlushSize().withMaxFlushSize(maxFlushSize);
6464
}
6565

66-
abstract BufferedAppendableUploadBuilder apply(AppendableUploadBuilder builder);
66+
/**
67+
* Default instance factory method for {@link MinFlushSizeFlushPolicy}.
68+
*
69+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
70+
*/
71+
@BetaApi
72+
public static MinFlushSizeFlushPolicy minFlushSize() {
73+
return MinFlushSizeFlushPolicy.INSTANCE;
74+
}
75+
76+
/**
77+
* Alias for {@link FlushPolicy#minFlushSize() FlushPolicy.minFlushSize()}{@code .}{@link
78+
* MinFlushSizeFlushPolicy#withMinFlushSize(int) withMinFlushSize(int)}
79+
*
80+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
81+
*/
82+
@BetaApi
83+
public static MinFlushSizeFlushPolicy minFlushSize(int minFlushSize) {
84+
return minFlushSize().withMinFlushSize(minFlushSize);
85+
}
86+
87+
abstract BufferedWritableByteChannel createBufferedChannel(
88+
UnbufferedWritableByteChannel unbuffered);
6789

6890
@Override
6991
public abstract boolean equals(Object obj);
@@ -127,8 +149,9 @@ public MaxFlushSizeFlushPolicy withMaxFlushSize(int maxFlushSize) {
127149
}
128150

129151
@Override
130-
BufferedAppendableUploadBuilder apply(AppendableUploadBuilder builder) {
131-
return builder.buffered(BufferHandle.allocate(maxFlushSize));
152+
BufferedWritableByteChannel createBufferedChannel(UnbufferedWritableByteChannel unbuffered) {
153+
return new DefaultBufferedWritableByteChannel(
154+
BufferHandle.allocate(maxFlushSize), unbuffered);
132155
}
133156

134157
@Override
@@ -153,4 +176,85 @@ public String toString() {
153176
return MoreObjects.toStringHelper(this).add("maxFlushSize", maxFlushSize).toString();
154177
}
155178
}
179+
180+
/**
181+
* Define a {@link FlushPolicy} where a min number of bytes will be required before a flush GCS
182+
* happens.
183+
*
184+
* <p>If there are not enough bytes to trigger a flush, they will be held in memory until there
185+
* are enough bytes, or an explicit flush is performed by closing the channel.
186+
*
187+
* <p>Instances of this class are immutable and thread safe.
188+
*
189+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
190+
*/
191+
@Immutable
192+
@BetaApi
193+
public static final class MinFlushSizeFlushPolicy extends FlushPolicy {
194+
private static final MinFlushSizeFlushPolicy INSTANCE = new MinFlushSizeFlushPolicy(_2MiB);
195+
196+
private final int minFlushSize;
197+
198+
public MinFlushSizeFlushPolicy(int minFlushSize) {
199+
this.minFlushSize = minFlushSize;
200+
}
201+
202+
/**
203+
* The minimum number of bytes to include in each automatic flush
204+
*
205+
* <p><i>Default:</i> {@code 2097152 (2 MiB)}
206+
*
207+
* @see #withMinFlushSize(int)
208+
*/
209+
@BetaApi
210+
public int getMinFlushSize() {
211+
return minFlushSize;
212+
}
213+
214+
/**
215+
* Return an instance with the {@code minFlushSize} set to the specified value.
216+
*
217+
* <p><i>Default:</i> {@code 2097152 (2 MiB)}
218+
*
219+
* @param minFlushSize The number of bytes to buffer before flushing.
220+
* @return The new instance
221+
* @see #getMinFlushSize()
222+
*/
223+
@BetaApi
224+
public MinFlushSizeFlushPolicy withMinFlushSize(int minFlushSize) {
225+
Preconditions.checkArgument(minFlushSize >= 0, "minFlushSize >= 0 (%s >= 0)", minFlushSize);
226+
if (this.minFlushSize == minFlushSize) {
227+
return this;
228+
}
229+
return new MinFlushSizeFlushPolicy(minFlushSize);
230+
}
231+
232+
@Override
233+
BufferedWritableByteChannel createBufferedChannel(UnbufferedWritableByteChannel unbuffered) {
234+
return new MinFlushBufferedWritableByteChannel(
235+
BufferHandle.allocate(minFlushSize), unbuffered);
236+
}
237+
238+
@Override
239+
public boolean equals(Object o) {
240+
if (this == o) {
241+
return true;
242+
}
243+
if (!(o instanceof MinFlushSizeFlushPolicy)) {
244+
return false;
245+
}
246+
MinFlushSizeFlushPolicy that = (MinFlushSizeFlushPolicy) o;
247+
return minFlushSize == that.minFlushSize;
248+
}
249+
250+
@Override
251+
public int hashCode() {
252+
return Objects.hashCode(minFlushSize);
253+
}
254+
255+
@Override
256+
public String toString() {
257+
return MoreObjects.toStringHelper(this).add("minFlushSize", minFlushSize).toString();
258+
}
259+
}
156260
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -178,29 +178,17 @@ AppendableUploadBuilder withRetryConfig(RetrierWithAlg retrier) {
178178
return this;
179179
}
180180

181-
/**
182-
* Buffer using {@code byteBuffer} worth of space before attempting to flush.
183-
*
184-
* <p>The provided {@link ByteBuffer} <i>should</i> be aligned with GCSs block size of <a
185-
* target="_blank"
186-
* href="https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload">256
187-
* KiB</a>.
188-
*/
189-
BufferedAppendableUploadBuilder buffered(ByteBuffer byteBuffer) {
190-
return buffered(BufferHandle.handleOf(byteBuffer));
191-
}
192-
193-
BufferedAppendableUploadBuilder buffered(BufferHandle bufferHandle) {
194-
return new BufferedAppendableUploadBuilder(bufferHandle);
181+
BufferedAppendableUploadBuilder buffered(FlushPolicy flushPolicy) {
182+
return new BufferedAppendableUploadBuilder(flushPolicy);
195183
}
196184

197185
final class BufferedAppendableUploadBuilder {
198-
private final BufferHandle bufferHandle;
186+
private final FlushPolicy flushPolicy;
199187
private ApiFuture<BidiAppendableWrite> start;
200188
private UnaryCallable<GetObjectRequest, Object> get;
201189

202-
BufferedAppendableUploadBuilder(BufferHandle bufferHandle) {
203-
this.bufferHandle = bufferHandle;
190+
BufferedAppendableUploadBuilder(FlushPolicy flushPolicy) {
191+
this.flushPolicy = flushPolicy;
204192
}
205193

206194
/**
@@ -249,7 +237,7 @@ BufferedWritableByteChannelSession<BidiWriteObjectResponse> build() {
249237
.andThen(
250238
c ->
251239
new AppendableBlobUploadImpl.AppendableObjectBufferedWritableByteChannel(
252-
new DefaultBufferedWritableByteChannel(bufferHandle, c), c)));
240+
flushPolicy.createBufferedChannel(c), c)));
253241
}
254242
}
255243
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory;
5252
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
5353
import com.google.cloud.storage.Conversions.Decoder;
54-
import com.google.cloud.storage.GapicBidiWritableByteChannelSessionBuilder.AppendableUploadBuilder;
5554
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
5655
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
5756
import com.google.cloud.storage.HmacKey.HmacKeyState;
@@ -1435,7 +1434,7 @@ public AppendableBlobUpload appendableBlobUpload(
14351434
: getBidiWriteObjectRequest(blob, opts);
14361435
BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
14371436
ApiFuture<BidiAppendableWrite> startAppendableWrite = ApiFutures.immediateFuture(baw);
1438-
AppendableUploadBuilder appendableUploadBuilder =
1437+
WritableByteChannelSession<BufferedWritableByteChannel, BidiWriteObjectResponse> build =
14391438
ResumableMedia.gapic()
14401439
.write()
14411440
.bidiByteChannel(storageClient.bidiWriteObjectCallable())
@@ -1465,11 +1464,8 @@ public boolean shouldRetry(
14651464
.idempotent()
14661465
.shouldRetry(previousThrowable, null);
14671466
}
1468-
}));
1469-
WritableByteChannelSession<BufferedWritableByteChannel, BidiWriteObjectResponse> build =
1470-
uploadConfig
1471-
.getFlushPolicy()
1472-
.apply(appendableUploadBuilder)
1467+
}))
1468+
.buffered(uploadConfig.getFlushPolicy())
14731469
.setStartAsync(startAppendableWrite)
14741470
.setGetCallable(storageClient.getObjectCallable())
14751471
.build();

0 commit comments

Comments
 (0)