|
52 | 52 | import com.google.cloud.spanner.Mutation; |
53 | 53 | import com.google.cloud.spanner.Mutation.WriteBuilder; |
54 | 54 | import com.google.cloud.spanner.Options; |
| 55 | +import com.google.cloud.spanner.Options.RpcPriority; |
55 | 56 | import com.google.cloud.spanner.Partition; |
56 | 57 | import com.google.cloud.spanner.PartitionOptions; |
57 | 58 | import com.google.cloud.spanner.ReadContext; |
|
128 | 129 | import com.google.spanner.executor.v1.MutationAction.Mod; |
129 | 130 | import com.google.spanner.executor.v1.MutationAction.UpdateArgs; |
130 | 131 | import com.google.spanner.executor.v1.OperationResponse; |
| 132 | +import com.google.spanner.executor.v1.PartitionedUpdateAction; |
| 133 | +import com.google.spanner.executor.v1.PartitionedUpdateAction.ExecutePartitionedUpdateOptions; |
131 | 134 | import com.google.spanner.executor.v1.QueryAction; |
132 | 135 | import com.google.spanner.executor.v1.ReadAction; |
133 | 136 | import com.google.spanner.executor.v1.RestoreCloudDatabaseAction; |
@@ -886,6 +889,13 @@ private Status executeAction( |
886 | 889 | } else if (action.hasExecutePartition()) { |
887 | 890 | return executeExecutePartition( |
888 | 891 | action.getExecutePartition(), outcomeSender, executionContext); |
| 892 | + } else if (action.hasPartitionedUpdate()) { |
| 893 | + if (dbPath == null) { |
| 894 | + throw SpannerExceptionFactory.newSpannerException( |
| 895 | + ErrorCode.INVALID_ARGUMENT, "Database path must be set for this action"); |
| 896 | + } |
| 897 | + DatabaseClient dbClient = getClient().getDatabaseClient(DatabaseId.of(dbPath)); |
| 898 | + return executePartitionedUpdate(action.getPartitionedUpdate(), dbClient, outcomeSender); |
889 | 899 | } else if (action.hasCloseBatchTxn()) { |
890 | 900 | return executeCloseBatchTxn(action.getCloseBatchTxn(), outcomeSender, executionContext); |
891 | 901 | } else if (action.hasExecuteChangeStreamQuery()) { |
@@ -1974,6 +1984,33 @@ private Status executeExecutePartition( |
1974 | 1984 | } |
1975 | 1985 | } |
1976 | 1986 |
|
| 1987 | + /** Execute a partitioned update which runs different partitions in parallel. */ |
| 1988 | + private Status executePartitionedUpdate( |
| 1989 | + PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) { |
| 1990 | + try { |
| 1991 | + ExecutePartitionedUpdateOptions options = action.getOptions(); |
| 1992 | + Long count = |
| 1993 | + dbClient.executePartitionedUpdate( |
| 1994 | + Statement.of(action.getUpdate().getSql()), |
| 1995 | + Options.tag(options.getTag()), |
| 1996 | + Options.priority(RpcPriority.fromProto(options.getRpcPriority()))); |
| 1997 | + SpannerActionOutcome outcome = |
| 1998 | + SpannerActionOutcome.newBuilder() |
| 1999 | + .setStatus(toProto(Status.OK)) |
| 2000 | + .addDmlRowsModified(count) |
| 2001 | + .build(); |
| 2002 | + sender.sendOutcome(outcome); |
| 2003 | + return sender.finishWithOK(); |
| 2004 | + } catch (SpannerException e) { |
| 2005 | + return sender.finishWithError(toStatus(e)); |
| 2006 | + } catch (Exception e) { |
| 2007 | + return sender.finishWithError( |
| 2008 | + toStatus( |
| 2009 | + SpannerExceptionFactory.newSpannerException( |
| 2010 | + ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage()))); |
| 2011 | + } |
| 2012 | + } |
| 2013 | + |
1977 | 2014 | /** Build a child partition record proto out of childPartitionRecord returned by client. */ |
1978 | 2015 | private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord) |
1979 | 2016 | throws Exception { |
|
0 commit comments