Example usage: * *
{@code
- * TransactionContextFuture txnFuture = manager.beginAsync();
* final String column = "FirstName";
- * txnFuture.then(
- * new AsyncTransactionFunction() {
- * @Override
- * public ApiFuture apply(TransactionContext txn, Void input)
- * throws Exception {
- * return txn.readRowAsync(
- * "Singers", Key.of(singerId), Collections.singleton(column));
- * }
- * })
- * .then(
- * new AsyncTransactionFunction() {
- * @Override
- * public ApiFuture apply(TransactionContext txn, Struct input)
- * throws Exception {
- * String name = input.getString(column);
- * txn.buffer(
- * Mutation.newUpdateBuilder("Singers")
- * .set(column)
- * .to(name.toUpperCase())
- * .build());
- * return ApiFutures.immediateFuture(null);
- * }
- * })
+ * final long singerId = 1L;
+ * AsyncTransactionManager manager = client.transactionManagerAsync();
+ * TransactionContextFuture txnFuture = manager.beginAsync();
+ * txnFuture
+ * .then((transaction, ignored) ->
+ * transaction.readRowAsync("Singers", Key.of(singerId), Collections.singleton(column)),
+ * executor)
+ * .then((transaction, row) ->
+ * transaction.bufferAsync(
+ * Mutation.newUpdateBuilder("Singers")
+ * .set(column).to(row.getString(column).toUpperCase())
+ * .build()),
+ * executor)
+ * .commitAsync();
* }
*/
interface AsyncTransactionStep extends ApiFutureExample of using {@link AsyncTransactionManager} with lambda expressions (Java 8 and - * higher). + *
Example of using {@link AsyncTransactionManager}. * *
{@code
* long singerId = 1L;
@@ -449,56 +448,11 @@ CommitResponse writeAtLeastOnceWithOptions(
* .then(
* (transaction, row) -> {
* String name = row.getString(column);
- * transaction.buffer(
+ * return transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
- * return ApiFutures.immediateFuture(null);
- * })
- * .commitAsync();
- * try {
- * commitTimestamp.get();
- * break;
- * } catch (AbortedException e) {
- * Thread.sleep(e.getRetryDelayInMillis());
- * transactionFuture = manager.resetForRetryAsync();
- * }
- * }
- * }
- * }
- *
- * Example of using {@link AsyncTransactionManager} (Java 7). - * - *
{@code
- * final long singerId = 1L;
- * try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
- * TransactionContextFuture transactionFuture = manager.beginAsync();
- * while (true) {
- * final String column = "FirstName";
- * CommitTimestampFuture commitTimestamp =
- * transactionFuture.then(
- * new AsyncTransactionFunction() {
- * @Override
- * public ApiFuture apply(TransactionContext transaction, Void input)
- * throws Exception {
- * return transaction.readRowAsync(
- * "Singers", Key.of(singerId), Collections.singleton(column));
- * }
- * })
- * .then(
- * new AsyncTransactionFunction() {
- * @Override
- * public ApiFuture apply(TransactionContext transaction, Struct input)
- * throws Exception {
- * String name = input.getString(column);
- * transaction.buffer(
- * Mutation.newUpdateBuilder("Singers")
- * .set(column)
- * .to(name.toUpperCase())
- * .build());
- * return ApiFutures.immediateFuture(null);
- * }
* })
* .commitAsync();
* try {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
index 47f2c338994..fbfc472bf53 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
@@ -675,6 +675,11 @@ public void buffer(Mutation mutation) {
delegate.buffer(mutation);
}
+ @Override
+ public ApiFuture bufferAsync(Mutation mutation) {
+ return delegate.bufferAsync(mutation);
+ }
+
@Override
public Struct readRowUsingIndex(String table, String index, Key key, Iterable columns) {
try {
@@ -703,6 +708,11 @@ public void buffer(Iterable mutations) {
delegate.buffer(mutations);
}
+ @Override
+ public ApiFuture bufferAsync(Iterable mutations) {
+ return delegate.bufferAsync(mutations);
+ }
+
@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
try {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
index 64c45b12c02..2590d5b309d 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
@@ -91,6 +91,11 @@ public interface TransactionContext extends ReadContext {
*/
void buffer(Mutation mutation);
+ /** Same as {@link #buffer(Mutation)}, but is guaranteed to be non-blocking. */
+ default ApiFuture bufferAsync(Mutation mutation) {
+ throw new UnsupportedOperationException("method should be overwritten");
+ }
+
/**
* Buffers mutations to be applied if the transaction commits successfully. The effects of the
* mutations will not be visible to subsequent operations in the transaction. All buffered
@@ -98,6 +103,11 @@ public interface TransactionContext extends ReadContext {
*/
void buffer(Iterable mutations);
+ /** Same as {@link #buffer(Iterable)}, but is guaranteed to be non-blocking. */
+ default ApiFuture bufferAsync(Iterable mutations) {
+ throw new UnsupportedOperationException("method should be overwritten");
+ }
+
/**
* Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
* it will result in an {@code IllegalArgumentException}. The effects of the DML statement will be
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
index 2484b9d3c6d..e04dace003b 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
@@ -54,7 +54,9 @@
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -75,6 +77,9 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
*/
private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction";
+ private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
+ "Transaction has already committed";
+
@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
static class Builder extends AbstractReadContext.Builder {
@@ -146,7 +151,9 @@ public void removeListener(Runnable listener) {
}
}
- @GuardedBy("lock")
+ private final Object committingLock = new Object();
+
+ @GuardedBy("committingLock")
private volatile boolean committing;
@GuardedBy("lock")
@@ -155,8 +162,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private volatile int runningAsyncOperations;
- @GuardedBy("lock")
- private List mutations = new ArrayList<>();
+ private final Queue mutations = new ConcurrentLinkedQueue<>();
@GuardedBy("lock")
private boolean aborted;
@@ -280,6 +286,16 @@ void commit() {
volatile ApiFuture commitFuture;
ApiFuture commitAsync() {
+ List mutationsProto = new ArrayList<>();
+ synchronized (committingLock) {
+ if (committing) {
+ throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
+ }
+ committing = true;
+ if (!mutations.isEmpty()) {
+ Mutation.toProto(mutations, mutationsProto);
+ }
+ }
final SettableApiFuture res = SettableApiFuture.create();
final SettableApiFuture finishOps;
CommitRequest.Builder builder =
@@ -303,14 +319,8 @@ ApiFuture commitAsync() {
} else {
finishOps = finishedAsyncOperations;
}
- if (!mutations.isEmpty()) {
- List mutationsProto = new ArrayList<>();
- Mutation.toProto(mutations, mutationsProto);
- builder.addAllMutations(mutationsProto);
- }
- // Ensure that no call to buffer mutations that would be lost can succeed.
- mutations = null;
}
+ builder.addAllMutations(mutationsProto);
finishOps.addListener(
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
return res;
@@ -603,22 +613,44 @@ public void onDone(boolean withBeginTransaction) {
@Override
public void buffer(Mutation mutation) {
- synchronized (lock) {
- checkNotNull(mutations, "Context is closed");
+ synchronized (committingLock) {
+ if (committing) {
+ throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
+ }
mutations.add(checkNotNull(mutation));
}
}
+ @Override
+ public ApiFuture bufferAsync(Mutation mutation) {
+ // Normally, we would call the async method from the sync method, but this is also safe as
+ // both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
+ // really used when the sync method is called.
+ buffer(mutation);
+ return ApiFutures.immediateFuture(null);
+ }
+
@Override
public void buffer(Iterable mutations) {
- synchronized (lock) {
- checkNotNull(this.mutations, "Context is closed");
+ synchronized (committingLock) {
+ if (committing) {
+ throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
+ }
for (Mutation mutation : mutations) {
this.mutations.add(checkNotNull(mutation));
}
}
}
+ @Override
+ public ApiFuture bufferAsync(Iterable mutations) {
+ // Normally, we would call the async method from the sync method, but this is also safe as
+ // both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
+ // really used when the sync method is called.
+ buffer(mutations);
+ return ApiFutures.immediateFuture(null);
+ }
+
@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java
index ea01fecc269..1a63c538653 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java
@@ -56,7 +56,7 @@ public class AsyncResultSetImplStressTest {
private static final int TEST_RUNS = 25;
/** Timeout is applied to each test case individually. */
- @Rule public Timeout timeout = new Timeout(120, TimeUnit.SECONDS);
+ @Rule public Timeout timeout = new Timeout(240, TimeUnit.SECONDS);
@Parameter(0)
public int resultSetSize;
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
index e9f5ac73728..49c698b06a3 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
@@ -16,7 +16,11 @@
package com.google.cloud.spanner;
+import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -44,6 +48,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class AsyncResultSetImplTest {
@@ -301,6 +307,99 @@ public void pauseResume() throws InterruptedException {
}
}
+ @Test
+ public void testCallbackIsNotCalledWhilePaused() throws InterruptedException, ExecutionException {
+ Executor executor = Executors.newSingleThreadExecutor();
+ final int simulatedRows = 100;
+ ResultSet delegate = mock(ResultSet.class);
+ when(delegate.next())
+ .thenAnswer(
+ new Answer() {
+ int row = 0;
+
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ row++;
+ if (row > simulatedRows) {
+ return false;
+ }
+ return true;
+ }
+ });
+ when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class));
+ final AtomicInteger callbackCounter = new AtomicInteger();
+ final BlockingDeque