Skip to content

destinations v2: snowflake: single-threaded T+D per stream#29878

Merged
Edward Gao (edgao) merged 48 commits into
masterfrom
edgao/dv2/snowflake/locks
Sep 12, 2023
Merged

destinations v2: snowflake: single-threaded T+D per stream#29878
Edward Gao (edgao) merged 48 commits into
masterfrom
edgao/dv2/snowflake/locks

Conversation

@edgao
Copy link
Copy Markdown
Contributor

@edgao Edward Gao (edgao) commented Aug 25, 2023

Closes #30048

Add locks to both T+D and raw table commits for snowflake/bigquery:

  • any number of copyIntoTableFromStage calls can run concurrently
  • typeAndDedupe can only run one at a time

And reenables mid-sync T+D execution for both snowflake, and bigquery GCS.

Async folks - PTAL at GeneralStagingFunctions and let me know if it looks problematic for destination-bigquery.

Example logs:
image

  • One thread tries to start T+D (the Waiting for raw table writes to pause log message at the top)
  • In the meantime, another thread is flushing to the raw tables, so nothing happens
  • The flush finishes, so the first thread is able to start running T+D (Attempting typing and deduping - note the timestamp is a few seconds after the first message)
  • The second thread also tries to trigger T+D, but skips it because T+D is already running (Another thread is already trying to run typing and deduping)
@octavia-squidington-iii Octavia Squidington III (octavia-squidington-iii) added area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Aug 25, 2023
@edgao Edward Gao (edgao) changed the base branch from master to edgao/dv2/release August 25, 2023 22:36
@edgao Edward Gao (edgao) marked this pull request as ready for review August 25, 2023 22:38
// we should skip it.
LOGGER.warn("Skipping typing and deduping for {}.{} because we could not set up the tables for this stream.", originalNamespace, originalName);
return;
synchronized(tdLocks.get(streamConfig.id())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java question about synchronized(): Will all threads eventually run this block, but wait their turn, or, will any thread which can't acquire the lock give up and move on? As we will T&D once more at the end of every sync, if there is a race condition, it seems like we should opt for the faster choice, and the later thread should just skip this step.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. synchronized means they'll all wait their turn. probably need to use a stronger concurrency object here.

we'll still need a way to force T+D to run at the end of a sync (i.e. using the wait-my-turn behavior). There's probably something fancy to do there, e.g. track the last time we started a T+D run vs the last time we committed raw data and skip if there's no new data... but that can be a future improvement

Copy link
Copy Markdown
Contributor

@evantahler Evan Tahler (evantahler) Aug 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll still need a way to force T+D to run at the end of a sync

for some reason I thought we hooked into the shutdown behavior for that 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown behavior

we do. I think I was just mistaken about this - was worried about a case where we have an in-flight T+D run while the shutdown hook is running, which I don't think is possible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an explicit mustRun param anyway, because I was having a hard time writing a comment explaining why it isn't necessary 🤷 and if anyone ever gets clever and runs T+D in a separate thread, then we probably need it anyway

@github-actions

This comment was marked as outdated.

@github-actions

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@jbfbell Joe Bell (jbfbell) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edward Gao (@edgao) do you have a sense of the performance impact for this vs. only T&D'ing at the end?

@edgao
Copy link
Copy Markdown
Contributor Author

destination-snowflake will slow down, and bigquery will remain equally slow. This PR makes T+D completely block raw table writes, whereas previously you could write new raw records while T+D runs (which is wrong, but faster 😛 ). Async snowflake was making use of that, but bigquery at least was already single-threaded anyway.

I'm inclined to close this PR, unless we want to keep the early-sync T+D run.

@evantahler
Copy link
Copy Markdown
Contributor

Evan Tahler (evantahler) commented Sep 1, 2023

I'm inclined to close this PR, unless we want to keep the early-sync T+D run.

I would like to keep the early T&D run... maybe we do it only the first batch... but I don't want to write off "see your data mid-sync" just yet

@edgao
Copy link
Copy Markdown
Contributor Author

oh, git massively screwed up that merge 🤦 typeAndDedupeTask needs to have the lock logic, and typeAndDedupe(String, String) needs to call typeAndDedupeTask

@edgao Edward Gao (edgao) enabled auto-merge (squash) September 12, 2023 15:28
@github-actions

This comment was marked as outdated.

@github-actions

This comment was marked as outdated.

@github-actions

This comment was marked as outdated.

@github-actions

This comment was marked as outdated.

@github-actions
Copy link
Copy Markdown
Contributor

destination-bigquery test report (commit 0d23374753) - ✅

⏲️ Total pipeline duration: 04mn10s

Step Result
Java Connector Unit Tests
Build connector tar
Build destination-bigquery docker image for platform linux/x86_64
Java Connector Integration Tests
Validate airbyte-integrations/connectors/destination-bigquery/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-bigquery test
@github-actions
Copy link
Copy Markdown
Contributor

destination-snowflake test report (commit 0d23374753) - ✅

⏲️ Total pipeline duration: 14mn50s

Step Result
Java Connector Unit Tests
Build connector tar
Build destination-snowflake docker image for platform linux/x86_64
Java Connector Integration Tests
Validate airbyte-integrations/connectors/destination-snowflake/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-snowflake test
@github-actions
Copy link
Copy Markdown
Contributor

destination-snowflake test report (commit c359ce6bbe) - ✅

⏲️ Total pipeline duration: 15mn16s

Step Result
Java Connector Unit Tests
Build connector tar
Build destination-snowflake docker image for platform linux/x86_64
Java Connector Integration Tests
Validate airbyte-integrations/connectors/destination-snowflake/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-snowflake test
@github-actions
Copy link
Copy Markdown
Contributor

destination-bigquery test report (commit c359ce6bbe) - ✅

⏲️ Total pipeline duration: 14mn08s

Step Result
Java Connector Unit Tests
Build connector tar
Build destination-bigquery docker image for platform linux/x86_64
Java Connector Integration Tests
Validate airbyte-integrations/connectors/destination-bigquery/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-bigquery test
@edgao Edward Gao (edgao) merged commit e0ce2ac into master Sep 12, 2023
@edgao Edward Gao (edgao) deleted the edgao/dv2/snowflake/locks branch September 12, 2023 17:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment