@@ -28,6 +28,8 @@ import (
2828
2929 "cloud.google.com/go/internal/trace"
3030 sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
31+ "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp"
32+ grpcgcppb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
3133 "github.com/googleapis/gax-go/v2"
3234 "go.opentelemetry.io/otel/attribute"
3335 "go.opentelemetry.io/otel/metric"
@@ -37,6 +39,7 @@ import (
3739 gtransport "google.golang.org/api/transport/grpc"
3840 "google.golang.org/grpc"
3941 "google.golang.org/grpc/codes"
42+ "google.golang.org/grpc/credentials/insecure"
4043 "google.golang.org/grpc/encoding/gzip"
4144 "google.golang.org/grpc/metadata"
4245
@@ -121,6 +124,131 @@ func (c *Client) ClientID() string {
121124 return c .sc .id
122125}
123126
127+ func createGCPMultiEndpoint (cfg * grpcgcp.GCPMultiEndpointOptions , config ClientConfig , opts ... option.ClientOption ) (* grpcgcp.GCPMultiEndpoint , error ) {
128+ if cfg .GRPCgcpConfig == nil {
129+ cfg .GRPCgcpConfig = & grpcgcppb.ApiConfig {}
130+ }
131+ if cfg .GRPCgcpConfig .Method == nil || len (cfg .GRPCgcpConfig .Method ) == 0 {
132+ cfg .GRPCgcpConfig .Method = []* grpcgcppb.MethodConfig {
133+ {
134+ Name : []string {"/google.spanner.v1.Spanner/CreateSession" },
135+ Affinity : & grpcgcppb.AffinityConfig {
136+ Command : grpcgcppb .AffinityConfig_BIND ,
137+ AffinityKey : "name" ,
138+ },
139+ },
140+ {
141+ Name : []string {"/google.spanner.v1.Spanner/BatchCreateSessions" },
142+ Affinity : & grpcgcppb.AffinityConfig {
143+ Command : grpcgcppb .AffinityConfig_BIND ,
144+ AffinityKey : "session.name" ,
145+ },
146+ },
147+ {
148+ Name : []string {"/google.spanner.v1.Spanner/DeleteSession" },
149+ Affinity : & grpcgcppb.AffinityConfig {
150+ Command : grpcgcppb .AffinityConfig_UNBIND ,
151+ AffinityKey : "name" ,
152+ },
153+ },
154+ {
155+ Name : []string {"/google.spanner.v1.Spanner/GetSession" },
156+ Affinity : & grpcgcppb.AffinityConfig {
157+ Command : grpcgcppb .AffinityConfig_BOUND ,
158+ AffinityKey : "name" ,
159+ },
160+ },
161+ {
162+ Name : []string {
163+ "/google.spanner.v1.Spanner/BeginTransaction" ,
164+ "/google.spanner.v1.Spanner/Commit" ,
165+ "/google.spanner.v1.Spanner/ExecuteBatchDml" ,
166+ "/google.spanner.v1.Spanner/ExecuteSql" ,
167+ "/google.spanner.v1.Spanner/ExecuteStreamingSql" ,
168+ "/google.spanner.v1.Spanner/PartitionQuery" ,
169+ "/google.spanner.v1.Spanner/PartitionRead" ,
170+ "/google.spanner.v1.Spanner/Read" ,
171+ "/google.spanner.v1.Spanner/Rollback" ,
172+ "/google.spanner.v1.Spanner/StreamingRead" ,
173+ },
174+ Affinity : & grpcgcppb.AffinityConfig {
175+ Command : grpcgcppb .AffinityConfig_BOUND ,
176+ AffinityKey : "session" ,
177+ },
178+ },
179+ }
180+ }
181+ // Append emulator options if SPANNER_EMULATOR_HOST has been set.
182+ if emulatorAddr := os .Getenv ("SPANNER_EMULATOR_HOST" ); emulatorAddr != "" {
183+ emulatorOpts := []option.ClientOption {
184+ option .WithEndpoint (emulatorAddr ),
185+ option .WithGRPCDialOption (grpc .WithTransportCredentials (insecure .NewCredentials ())),
186+ option .WithoutAuthentication (),
187+ internaloption .SkipDialSettingsValidation (),
188+ }
189+ opts = append (opts , emulatorOpts ... )
190+ // Replace all endpoints with emulator target.
191+ for _ , meo := range cfg .MultiEndpoints {
192+ meo .Endpoints = []string {emulatorAddr }
193+ }
194+ }
195+
196+ // Set the number of channels to the default value if not specified.
197+ if cfg .GRPCgcpConfig .GetChannelPool () == nil || cfg .GRPCgcpConfig .GetChannelPool ().GetMaxSize () == 0 {
198+ cfg .GRPCgcpConfig .ChannelPool = & grpcgcppb.ChannelPoolConfig {
199+ MinSize : numChannels ,
200+ MaxSize : numChannels ,
201+ }
202+ }
203+ // Set MinSize equal to MaxSize to create all the channels beforehand.
204+ cfg .GRPCgcpConfig .ChannelPool .MinSize = cfg .GRPCgcpConfig .ChannelPool .GetMaxSize ()
205+
206+ cfg .GRPCgcpConfig .ChannelPool .BindPickStrategy = grpcgcppb .ChannelPoolConfig_ROUND_ROBIN
207+
208+ cfg .DialFunc = func (ctx context.Context , target string , dopts ... grpc.DialOption ) (* grpc.ClientConn , error ) {
209+ copts := opts
210+
211+ for _ , do := range dopts {
212+ copts = append (copts , option .WithGRPCDialOption (do ))
213+ }
214+
215+ allOpts := allClientOpts (1 , config .Compression , copts ... )
216+
217+ // Overwrite endpoint and pool config.
218+ allOpts = append (allOpts ,
219+ option .WithEndpoint (target ),
220+ option .WithGRPCConnectionPool (1 ),
221+ option .WithGRPCConn (nil ),
222+ )
223+
224+ return gtransport .Dial (ctx , allOpts ... )
225+ }
226+
227+ gme , err := grpcgcp .NewGCPMultiEndpoint (cfg )
228+ return gme , err
229+ }
230+
231+ // To use GCPMultiEndpoint in gtransport.Dial (via gtransport.WithConnPool option)
232+ // we implement gtransport.ConnPool interface using this wrapper.
233+ type gmeWrapper struct {
234+ * grpcgcp.GCPMultiEndpoint
235+ }
236+
237+ // Make sure gmeWrapper implements ConnPool interface.
238+ var _ gtransport.ConnPool = (* gmeWrapper )(nil )
239+
240+ func (gw * gmeWrapper ) Conn () * grpc.ClientConn {
241+ // GCPMultiEndpoint does not expose any ClientConn.
242+ // This is safe because Cloud Spanner client doesn't use this function and instead
243+ // makes calls directly using Invoke and NewStream from the grpc.ClientConnInterface
244+ // which GCPMultiEndpoint implements.
245+ return nil
246+ }
247+
248+ func (gw * gmeWrapper ) Num () int {
249+ return int (gw .GCPMultiEndpoint .GCPConfig ().GetChannelPool ().GetMaxSize ())
250+ }
251+
124252// ClientConfig has configurations for the client.
125253type ClientConfig struct {
126254 // NumChannels is the number of gRPC channels.
@@ -241,6 +369,10 @@ func NewClient(ctx context.Context, database string, opts ...option.ClientOption
241369// NewClientWithConfig creates a client to a database. A valid database name has
242370// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
243371func NewClientWithConfig (ctx context.Context , database string , config ClientConfig , opts ... option.ClientOption ) (c * Client , err error ) {
372+ return newClientWithConfig (ctx , database , config , nil , opts ... )
373+ }
374+
375+ func newClientWithConfig (ctx context.Context , database string , config ClientConfig , gme * grpcgcp.GCPMultiEndpoint , opts ... option.ClientOption ) (c * Client , err error ) {
244376 // Validate database path.
245377 if err := validDatabaseName (database ); err != nil {
246378 return nil , err
@@ -265,16 +397,25 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
265397 if config .NumChannels == 0 {
266398 config .NumChannels = numChannels
267399 }
268- // gRPC options.
269- allOpts := allClientOpts (config .NumChannels , config .Compression , opts ... )
270- pool , err := gtransport .DialPool (ctx , allOpts ... )
271- if err != nil {
272- return nil , err
273- }
274400
275- if hasNumChannelsConfig && pool .Num () != config .NumChannels {
276- pool .Close ()
277- return nil , spannerErrorf (codes .InvalidArgument , "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value." , config .NumChannels , pool .Num ())
401+ var pool gtransport.ConnPool
402+
403+ if gme != nil {
404+ // Use GCPMultiEndpoint if provided.
405+ pool = & gmeWrapper {gme }
406+ } else {
407+ // Create gtransport ConnPool as usual if MultiEndpoint is not used.
408+ // gRPC options.
409+ allOpts := allClientOpts (config .NumChannels , config .Compression , opts ... )
410+ pool , err = gtransport .DialPool (ctx , allOpts ... )
411+ if err != nil {
412+ return nil , err
413+ }
414+
415+ if hasNumChannelsConfig && pool .Num () != config .NumChannels {
416+ pool .Close ()
417+ return nil , spannerErrorf (codes .InvalidArgument , "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value." , config .NumChannels , pool .Num ())
418+ }
278419 }
279420
280421 // TODO(loite): Remove as the original map cannot be changed by the user
@@ -343,6 +484,48 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
343484 return c , nil
344485}
345486
487+ // NewMultiEndpointClient is the same as NewMultiEndpointClientWithConfig with
488+ // the default client configuration.
489+ //
490+ // A valid database name has the
491+ // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
492+ func NewMultiEndpointClient (ctx context.Context , database string , gmeCfg * grpcgcp.GCPMultiEndpointOptions , opts ... option.ClientOption ) (* Client , * grpcgcp.GCPMultiEndpoint , error ) {
493+ return NewMultiEndpointClientWithConfig (ctx , database , ClientConfig {SessionPoolConfig : DefaultSessionPoolConfig , DisableRouteToLeader : false }, gmeCfg , opts ... )
494+ }
495+
496+ // NewMultiEndpointClientWithConfig creates a client to a database using GCPMultiEndpoint.
497+ //
498+ // The purposes of GCPMultiEndpoint are:
499+ //
500+ // - Fallback to an alternative endpoint (host:port) when the original
501+ // endpoint is completely unavailable.
502+ // - Be able to route a Cloud Spanner call to a specific group of endpoints.
503+ // - Be able to reconfigure endpoints in runtime.
504+ //
505+ // The GRPCgcpConfig and DialFunc in the GCPMultiEndpointOptions are optional
506+ // and will be configured automatically.
507+ //
508+ // For GCPMultiEndpoint the number of channels is configured via MaxSize of the
509+ // ChannelPool config in the GRPCgcpConfig.
510+ //
511+ // The GCPMultiEndpoint returned can be used to update the endpoints in runtime.
512+ //
513+ // A valid database name has the
514+ // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
515+ func NewMultiEndpointClientWithConfig (ctx context.Context , database string , config ClientConfig , gmeCfg * grpcgcp.GCPMultiEndpointOptions , opts ... option.ClientOption ) (c * Client , gme * grpcgcp.GCPMultiEndpoint , err error ) {
516+ gme , err = createGCPMultiEndpoint (gmeCfg , config , opts ... )
517+ if err != nil {
518+ return nil , nil , err
519+ }
520+ // Align number of channels.
521+ config .NumChannels = int (gme .GCPConfig ().GetChannelPool ().GetMaxSize ())
522+ c , err = newClientWithConfig (ctx , database , config , gme , opts ... )
523+ if err != nil {
524+ return nil , nil , err
525+ }
526+ return
527+ }
528+
346529// Combines the default options from the generated client, the default options
347530// of the hand-written client and the user options to one list of options.
348531// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts
0 commit comments