Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d9aa4d1
copy spec changes
edgao Aug 23, 2023
4d10768
logistics
edgao Aug 23, 2023
5e0f08f
remove normalization from build
edgao Aug 23, 2023
e160ad6
remove unnecessary change
edgao Aug 23, 2023
dd0a8db
inject param to stagingcsvgenerator+stagingconsumerfactory
edgao Aug 23, 2023
0e1f518
inject to GcsUtils
edgao Aug 23, 2023
b5ae4d3
hardcode snowflake
edgao Aug 23, 2023
0c8248d
hardcode in bigquery
edgao Aug 23, 2023
1dbd194
Merge branch 'master' into edgao/dv2/release
edgao Aug 24, 2023
5a978ed
derp, fix default behavior
edgao Aug 24, 2023
050c1d9
derp
edgao Aug 24, 2023
46e1e67
derp
edgao Aug 24, 2023
0f14339
maybe make bigquery tests pass
edgao Aug 24, 2023
7c4cd0c
fix snowflake tests?
edgao Aug 24, 2023
2402243
Merge branch 'master' into edgao/dv2/release
edgao Aug 24, 2023
dc619e0
fix snowflake unit tests
edgao Aug 24, 2023
a7f92d1
more snowflake test fix
edgao Aug 24, 2023
86f4ffc
Automated Commit - Format and Process Resources Changes
edgao Aug 24, 2023
b244993
disable legacy DATs on snowflake + bigquery
edgao Aug 24, 2023
9c3bcac
Update upgrade copy
evantahler Aug 24, 2023
cb5e32b
Merge branch 'master' into edgao/dv2/release
evantahler Aug 24, 2023
be9b921
also disable these tests :shrug:
edgao Aug 25, 2023
155e391
one more
edgao Aug 25, 2023
05b58a3
prevent concurrent T+D
edgao Aug 25, 2023
c31f1ff
add better locks
edgao Aug 28, 2023
723a13f
Merge branch 'master' into edgao/dv2/release
evantahler Aug 28, 2023
bda59c6
Merge branch 'edgao/dv2/release' into edgao/dv2/snowflake/locks
edgao Aug 28, 2023
8ca2c27
Automated Commit - Formatting Changes
evantahler Aug 28, 2023
66648a7
git pls
edgao Aug 28, 2023
9ec91ad
add mustRun parameter for explicitnes
edgao Aug 28, 2023
3b2f818
more comments
edgao Aug 28, 2023
c117798
Automated Commit - Formatting Changes
edgao Aug 28, 2023
5954328
Merge branch 'edgao/dv2/release' into edgao/dv2/snowflake/locks
edgao Aug 28, 2023
8135cf0
derp. also make bigquery staging lock things
edgao Aug 29, 2023
9270ab6
use putIfAbsent
edgao Aug 29, 2023
d9cdcc4
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Aug 29, 2023
0aa4d2b
logistics
edgao Aug 29, 2023
7e94392
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Aug 30, 2023
b30707e
better putIfAbsent
edgao Aug 30, 2023
7d0f922
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 6, 2023
e214fec
botched merge
edgao Sep 7, 2023
10b8356
refactor
edgao Sep 7, 2023
85fc426
prevent incremental T+D for bq staging
edgao Sep 7, 2023
a032c35
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 12, 2023
cf229de
spotbugs
edgao Sep 12, 2023
0d23374
why no autoformat?
edgao Sep 12, 2023
3fb9769
Automated Commit - Format and Process Resources Changes
edgao Sep 12, 2023
c359ce6
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 12, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into edgao/dv2/release
  • Loading branch information
edgao committed Aug 24, 2023
commit 1dbd19446aab5fae6d9a8f990c76a1f2914e8625
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public class BigQueryStagingConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStagingConsumerFactory.class);

public AirbyteMessageConsumer create(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final BigQueryStagingOperations bigQueryGcsOperations,
final BufferCreateFunction onCreateBuffer,
final Function<JsonNode, BigQueryRecordFormatter> recordFormatterCreator,
final Function<String, String> tmpTableNameTransformer,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace)
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final BigQueryStagingOperations bigQueryGcsOperations,
final BufferCreateFunction onCreateBuffer,
final Function<JsonNode, BigQueryRecordFormatter> recordFormatterCreator,
final Function<String, String> tmpTableNameTransformer,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace)
throws Exception {
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs = createWriteConfigs(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(JsonNode config,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
final JsonNode s3Options = findS3Options(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public void createSchemaIfNotExists(final JdbcDatabase database, final String sc
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
return String.format(
"""
CREATE TABLE IF NOT EXISTS "%s"."%s" (
"%s" VARCHAR PRIMARY KEY,
"%s" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
"%s" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"%s" VARIANT
) data_retention_time_in_days = 0;""",
CREATE TABLE IF NOT EXISTS "%s"."%s" (
"%s" VARCHAR PRIMARY KEY,
"%s" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
"%s" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"%s" VARIANT
) data_retention_time_in_days = 0;""",
schemaName,
tableName,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
Expand Down Expand Up @@ -100,7 +100,8 @@ public void insertRecordsInternal(final JdbcDatabase database,
// SqlOperationsUtils.insertRawRecordsInSingleQuery to support a different column order.
insertQuery = String.format(
"INSERT INTO \"%s\".\"%s\" (\"%s\", \"%s\", \"%s\") SELECT column1, parse_json(column2), column3 FROM VALUES\n",
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT);
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT);
final String recordQuery = "(?, ?, ?),\n";
SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQuery, recordQuery, database, records);
}
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.