Skip to content

Commit 147b2bc

Browse files
authored
chore(spanner): add experimental location-aware foundation classes (#13596)
- Adds experimental location routing base classes (recipe parsing, key-to-target encoding, range cache, channel finder, router). - Integrates routing into request lifecycle for reads/queries/transactions under GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API. - Adds golden coverage using textprotos: - finder_test.textproto - range_cache_test.textproto - recipe_test.textproto - final RPC integration with RPC calls will be tracked as follow-up.
1 parent d3eb851 commit 147b2bc

30 files changed

+21889
-35
lines changed

spanner/batch.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
298298
// Read or query partition.
299299
if p.rreq != nil {
300300
rpc = func(ctx context.Context, resumeToken []byte, opts ...gax.CallOption) (streamingReceiver, error) {
301-
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
301+
req := &sppb.ReadRequest{
302302
Session: p.rreq.Session,
303303
Transaction: p.rreq.Transaction,
304304
Table: p.rreq.Table,
@@ -310,7 +310,11 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
310310
ResumeToken: resumeToken,
311311
DataBoostEnabled: p.rreq.DataBoostEnabled,
312312
DirectedReadOptions: p.rreq.DirectedReadOptions,
313-
}, opts...)
313+
}
314+
if t.locationRouter != nil {
315+
t.locationRouter.prepareReadRequest(req)
316+
}
317+
client, err := client.StreamingRead(ctx, req, opts...)
314318
if err != nil {
315319
return client, err
316320
}
@@ -327,7 +331,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
327331
}
328332
} else {
329333
rpc = func(ctx context.Context, resumeToken []byte, opts ...gax.CallOption) (streamingReceiver, error) {
330-
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
334+
req := &sppb.ExecuteSqlRequest{
331335
Session: p.qreq.Session,
332336
Transaction: p.qreq.Transaction,
333337
Sql: p.qreq.Sql,
@@ -339,7 +343,11 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
339343
ResumeToken: resumeToken,
340344
DataBoostEnabled: p.qreq.DataBoostEnabled,
341345
DirectedReadOptions: p.qreq.DirectedReadOptions,
342-
}, opts...)
346+
}
347+
if t.locationRouter != nil {
348+
t.locationRouter.prepareExecuteSQLRequest(req)
349+
}
350+
client, err := client.ExecuteStreamingSql(ctx, req, opts...)
343351
if err != nil {
344352
return client, err
345353
}
@@ -356,13 +364,25 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
356364
return client, err
357365
}
358366
}
359-
return stream(
367+
return streamWithTransactionCallbacks(
360368
contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader),
361369
sh.session.logger,
362370
t.sm.sc.metricsTracerFactory,
363371
rpc,
372+
nil,
373+
func(err error) error {
374+
return err
375+
},
376+
nil,
364377
t.setTimestamp,
365-
t.release, client.(*grpcSpannerClient))
378+
t.release,
379+
client.(*grpcSpannerClient),
380+
func(prs *sppb.PartialResultSet) {
381+
if t.locationRouter != nil {
382+
t.locationRouter.observePartialResultSet(prs)
383+
}
384+
},
385+
)
366386
}
367387

368388
// MarshalBinary implements BinaryMarshaler.

spanner/batch_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,3 +370,119 @@ func TestPartitionRead_Multiplexed(t *testing.T) {
370370
t.Errorf("TestPartitionQuery_Multiplexed: expected 2 requests to be handled, got: %d", handled)
371371
}
372372
}
373+
374+
func TestBatchExecute_Query_PreparesRoutingHint(t *testing.T) {
375+
t.Setenv(experimentalLocationAPIEnvVar, "true")
376+
377+
ctx := context.Background()
378+
server, client, teardown := setupMockedTestServer(t)
379+
defer teardown()
380+
381+
txn, err := client.BatchReadOnlyTransaction(ctx, StrongRead())
382+
if err != nil {
383+
t.Fatal(err)
384+
}
385+
defer txn.Cleanup(ctx)
386+
if txn.locationRouter == nil {
387+
t.Fatal("expected location router to be enabled")
388+
}
389+
txn.locationRouter.observePartialResultSet(&sppb.PartialResultSet{
390+
CacheUpdate: &sppb.CacheUpdate{DatabaseId: 7},
391+
})
392+
393+
partitions, err := txn.PartitionQuery(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), PartitionOptions{MaxPartitions: 1})
394+
if err != nil {
395+
t.Fatal(err)
396+
}
397+
if len(partitions) == 0 {
398+
t.Fatal("expected at least one partition")
399+
}
400+
if err := server.TestSpanner.PutPartitionResult(partitions[0].pt, server.CreateSingleRowSingersResult(0)); err != nil {
401+
t.Fatal(err)
402+
}
403+
404+
iter := txn.Execute(ctx, partitions[0])
405+
defer iter.Stop()
406+
if err := iter.Do(func(*Row) error { return nil }); err != nil {
407+
t.Fatal(err)
408+
}
409+
410+
requests := drainRequestsFromServer(server.TestSpanner)
411+
var executeReq *sppb.ExecuteSqlRequest
412+
for _, req := range requests {
413+
if r, ok := req.(*sppb.ExecuteSqlRequest); ok && len(r.GetPartitionToken()) > 0 {
414+
executeReq = r
415+
break
416+
}
417+
}
418+
if executeReq == nil {
419+
t.Fatal("expected an ExecuteSqlRequest with partition token")
420+
}
421+
if executeReq.GetRoutingHint() == nil {
422+
t.Fatal("expected routing hint on ExecuteSqlRequest")
423+
}
424+
if got := executeReq.GetRoutingHint().GetDatabaseId(); got != 7 {
425+
t.Fatalf("unexpected routing hint database id: got %d, want 7", got)
426+
}
427+
if executeReq.GetRoutingHint().GetOperationUid() == 0 {
428+
t.Fatal("expected operation uid to be set on routing hint")
429+
}
430+
}
431+
432+
func TestBatchExecute_Read_PreparesRoutingHint(t *testing.T) {
433+
t.Setenv(experimentalLocationAPIEnvVar, "true")
434+
435+
ctx := context.Background()
436+
server, client, teardown := setupMockedTestServer(t)
437+
defer teardown()
438+
439+
txn, err := client.BatchReadOnlyTransaction(ctx, StrongRead())
440+
if err != nil {
441+
t.Fatal(err)
442+
}
443+
defer txn.Cleanup(ctx)
444+
if txn.locationRouter == nil {
445+
t.Fatal("expected location router to be enabled")
446+
}
447+
txn.locationRouter.observePartialResultSet(&sppb.PartialResultSet{
448+
CacheUpdate: &sppb.CacheUpdate{DatabaseId: 9},
449+
})
450+
451+
partitions, err := txn.PartitionRead(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{MaxPartitions: 1})
452+
if err != nil {
453+
t.Fatal(err)
454+
}
455+
if len(partitions) == 0 {
456+
t.Fatal("expected at least one partition")
457+
}
458+
if err := server.TestSpanner.PutPartitionResult(partitions[0].pt, server.CreateSingleRowSingersResult(0)); err != nil {
459+
t.Fatal(err)
460+
}
461+
462+
iter := txn.Execute(ctx, partitions[0])
463+
defer iter.Stop()
464+
if err := iter.Do(func(*Row) error { return nil }); err != nil {
465+
t.Fatal(err)
466+
}
467+
468+
requests := drainRequestsFromServer(server.TestSpanner)
469+
var readReq *sppb.ReadRequest
470+
for _, req := range requests {
471+
if r, ok := req.(*sppb.ReadRequest); ok && len(r.GetPartitionToken()) > 0 {
472+
readReq = r
473+
break
474+
}
475+
}
476+
if readReq == nil {
477+
t.Fatal("expected a ReadRequest with partition token")
478+
}
479+
if readReq.GetRoutingHint() == nil {
480+
t.Fatal("expected routing hint on ReadRequest")
481+
}
482+
if got := readReq.GetRoutingHint().GetDatabaseId(); got != 9 {
483+
t.Fatalf("unexpected routing hint database id: got %d, want 9", got)
484+
}
485+
if readReq.GetRoutingHint().GetOperationUid() == 0 {
486+
t.Fatal("expected operation uid to be set on routing hint")
487+
}
488+
}

spanner/channel_finder.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
Copyright 2026 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package spanner
18+
19+
import (
20+
"sync"
21+
"sync/atomic"
22+
23+
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
24+
)
25+
26+
type channelFinder struct {
27+
updateMu sync.Mutex
28+
29+
databaseID atomic.Uint64
30+
recipeCache *keyRecipeCache
31+
rangeCache *keyRangeCache
32+
}
33+
34+
func newChannelFinder(endpointCache channelEndpointCache) *channelFinder {
35+
return &channelFinder{
36+
recipeCache: newKeyRecipeCache(),
37+
rangeCache: newKeyRangeCache(endpointCache),
38+
}
39+
}
40+
41+
func (f *channelFinder) useDeterministicRandom() {
42+
f.rangeCache.useDeterministicRandom()
43+
}
44+
45+
func (f *channelFinder) update(update *sppb.CacheUpdate) {
46+
if update == nil {
47+
return
48+
}
49+
f.updateMu.Lock()
50+
defer f.updateMu.Unlock()
51+
52+
currentID := f.databaseID.Load()
53+
if currentID != update.GetDatabaseId() {
54+
if currentID != 0 {
55+
f.recipeCache.clear()
56+
f.rangeCache.clear()
57+
}
58+
f.databaseID.Store(update.GetDatabaseId())
59+
}
60+
if update.GetKeyRecipes() != nil {
61+
f.recipeCache.addRecipes(update.GetKeyRecipes())
62+
}
63+
f.rangeCache.addRanges(update)
64+
}
65+
66+
func (f *channelFinder) findServerRead(req *sppb.ReadRequest, preferLeader bool) channelEndpoint {
67+
if req == nil {
68+
return nil
69+
}
70+
f.recipeCache.computeReadKeys(req)
71+
hint := ensureReadRoutingHint(req)
72+
return f.fillRoutingHint(preferLeader, rangeModeCoveringSplit, req.GetDirectedReadOptions(), hint)
73+
}
74+
75+
func (f *channelFinder) findServerReadWithTransaction(req *sppb.ReadRequest) channelEndpoint {
76+
if req == nil {
77+
return nil
78+
}
79+
return f.findServerRead(req, preferLeaderFromSelector(req.GetTransaction()))
80+
}
81+
82+
func (f *channelFinder) findServerExecuteSQL(req *sppb.ExecuteSqlRequest, preferLeader bool) channelEndpoint {
83+
if req == nil {
84+
return nil
85+
}
86+
f.recipeCache.computeQueryKeys(req)
87+
hint := ensureExecuteSQLRoutingHint(req)
88+
return f.fillRoutingHint(preferLeader, rangeModePickRandom, req.GetDirectedReadOptions(), hint)
89+
}
90+
91+
func (f *channelFinder) findServerExecuteSQLWithTransaction(req *sppb.ExecuteSqlRequest) channelEndpoint {
92+
if req == nil {
93+
return nil
94+
}
95+
return f.findServerExecuteSQL(req, preferLeaderFromSelector(req.GetTransaction()))
96+
}
97+
98+
func (f *channelFinder) findServerBeginTransaction(req *sppb.BeginTransactionRequest) channelEndpoint {
99+
if req == nil || req.GetMutationKey() == nil {
100+
return nil
101+
}
102+
target := f.recipeCache.mutationToTargetRange(req.GetMutationKey())
103+
if target == nil {
104+
return nil
105+
}
106+
hint := &sppb.RoutingHint{Key: append([]byte(nil), target.start...)}
107+
if len(target.limit) > 0 {
108+
hint.LimitKey = append([]byte(nil), target.limit...)
109+
}
110+
return f.fillRoutingHint(preferLeaderFromTransactionOptions(req.GetOptions()), rangeModeCoveringSplit, &sppb.DirectedReadOptions{}, hint)
111+
}
112+
113+
func (f *channelFinder) fillRoutingHint(preferLeader bool, mode rangeMode, directedReadOptions *sppb.DirectedReadOptions, hint *sppb.RoutingHint) channelEndpoint {
114+
if hint == nil {
115+
return nil
116+
}
117+
databaseID := f.databaseID.Load()
118+
if databaseID == 0 {
119+
return nil
120+
}
121+
hint.DatabaseId = databaseID
122+
return f.rangeCache.fillRoutingHint(preferLeader, mode, directedReadOptions, hint)
123+
}
124+
125+
func preferLeaderFromSelector(selector *sppb.TransactionSelector) bool {
126+
if selector == nil {
127+
return true
128+
}
129+
switch s := selector.GetSelector().(type) {
130+
case *sppb.TransactionSelector_Begin:
131+
if s.Begin == nil || s.Begin.GetReadOnly() == nil {
132+
return true
133+
}
134+
return s.Begin.GetReadOnly().GetStrong()
135+
case *sppb.TransactionSelector_SingleUse:
136+
if s.SingleUse == nil || s.SingleUse.GetReadOnly() == nil {
137+
return true
138+
}
139+
return s.SingleUse.GetReadOnly().GetStrong()
140+
default:
141+
return true
142+
}
143+
}
144+
145+
func preferLeaderFromTransactionOptions(options *sppb.TransactionOptions) bool {
146+
if options == nil || options.GetReadOnly() == nil {
147+
return true
148+
}
149+
return options.GetReadOnly().GetStrong()
150+
}

0 commit comments

Comments
 (0)