Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d9aa4d1
copy spec changes
edgao Aug 23, 2023
4d10768
logistics
edgao Aug 23, 2023
5e0f08f
remove normalization from build
edgao Aug 23, 2023
e160ad6
remove unnecessary change
edgao Aug 23, 2023
dd0a8db
inject param to stagingcsvgenerator+stagingconsumerfactory
edgao Aug 23, 2023
0e1f518
inject to GcsUtils
edgao Aug 23, 2023
b5ae4d3
hardcode snowflake
edgao Aug 23, 2023
0c8248d
hardcode in bigquery
edgao Aug 23, 2023
1dbd194
Merge branch 'master' into edgao/dv2/release
edgao Aug 24, 2023
5a978ed
derp, fix default behavior
edgao Aug 24, 2023
050c1d9
derp
edgao Aug 24, 2023
46e1e67
derp
edgao Aug 24, 2023
0f14339
maybe make bigquery tests pass
edgao Aug 24, 2023
7c4cd0c
fix snowflake tests?
edgao Aug 24, 2023
2402243
Merge branch 'master' into edgao/dv2/release
edgao Aug 24, 2023
dc619e0
fix snowflake unit tests
edgao Aug 24, 2023
a7f92d1
more snowflake test fix
edgao Aug 24, 2023
86f4ffc
Automated Commit - Format and Process Resources Changes
edgao Aug 24, 2023
b244993
disable legacy DATs on snowflake + bigquery
edgao Aug 24, 2023
9c3bcac
Update upgrade copy
evantahler Aug 24, 2023
cb5e32b
Merge branch 'master' into edgao/dv2/release
evantahler Aug 24, 2023
be9b921
also disable these tests :shrug:
edgao Aug 25, 2023
155e391
one more
edgao Aug 25, 2023
05b58a3
prevent concurrent T+D
edgao Aug 25, 2023
c31f1ff
add better locks
edgao Aug 28, 2023
723a13f
Merge branch 'master' into edgao/dv2/release
evantahler Aug 28, 2023
bda59c6
Merge branch 'edgao/dv2/release' into edgao/dv2/snowflake/locks
edgao Aug 28, 2023
8ca2c27
Automated Commit - Formatting Changes
evantahler Aug 28, 2023
66648a7
git pls
edgao Aug 28, 2023
9ec91ad
add mustRun parameter for explicitnes
edgao Aug 28, 2023
3b2f818
more comments
edgao Aug 28, 2023
c117798
Automated Commit - Formatting Changes
edgao Aug 28, 2023
5954328
Merge branch 'edgao/dv2/release' into edgao/dv2/snowflake/locks
edgao Aug 28, 2023
8135cf0
derp. also make bigquery staging lock things
edgao Aug 29, 2023
9270ab6
use putIfAbsent
edgao Aug 29, 2023
d9cdcc4
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Aug 29, 2023
0aa4d2b
logistics
edgao Aug 29, 2023
7e94392
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Aug 30, 2023
b30707e
better putIfAbsent
edgao Aug 30, 2023
7d0f922
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 6, 2023
e214fec
botched merge
edgao Sep 7, 2023
10b8356
refactor
edgao Sep 7, 2023
85fc426
prevent incremental T+D for bq staging
edgao Sep 7, 2023
a032c35
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 12, 2023
cf229de
spotbugs
edgao Sep 12, 2023
0d23374
why no autoformat?
edgao Sep 12, 2023
3fb9769
Automated Commit - Format and Process Resources Changes
edgao Sep 12, 2023
c359ce6
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 12, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add better locks
  • Loading branch information
edgao committed Aug 28, 2023
commit c31f1ff5f5548326834c521733fd5955ba761938
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,7 +48,13 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<StreamId> streamsWithSuccesfulSetup;
// We only want to run a single instance of T+D per stream at a time. These objects are used for synchronization per stream.
private final Map<StreamId, Object> tdLocks;
// Use a read-write lock because we need the same semantics:
// * any number of threads can insert to the raw tables at the same time, as long as T+D isn't running (i.e. "read lock")
// * T+D must run in complete isolation (i.e. "write lock")
private final Map<StreamId, ReadWriteLock> tdLocks;
// These locks are used to prevent multiple simultaneous attempts to T+D the same stream.
// We use tryLock with these so that we don't queue up multiple T+D runs for the same stream.
private final Map<StreamId, Lock> internalTdLocks;

public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
Expand All @@ -58,6 +68,7 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
this.v2RawTableMigrator = v2RawTableMigrator;
this.streamsWithSuccesfulSetup = new HashSet<>();
this.tdLocks = new HashMap<>();
this.internalTdLocks = new HashMap<>();
}

public DefaultTyperDeduper(
Expand All @@ -68,12 +79,6 @@ public DefaultTyperDeduper(
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2RawTableMigrator<>());
}

/**
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
*/
public void prepareTables() throws Exception {
if (overwriteStreamsWithTmpTable != null) {
throw new IllegalStateException("Tables were already prepared.");
Expand Down Expand Up @@ -116,43 +121,51 @@ public void prepareTables() throws Exception {
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
}

tdLocks.put(stream.id(), new Object());
// Use fair locking. This slows down lock operations, but that performance hit is by far dwarfed
// by our IO costs. This lock needs to be fair because the raw table writers are running almost constantly,
// and we don't want them to starve T+D.
tdLocks.put(stream.id(), new ReentrantReadWriteLock(true));
// This lock doesn't need to be fair; any T+D instance is equivalent and we'll skip T+D if we can't
// immediately acquire the lock.
internalTdLocks.put(stream.id(), new ReentrantLock());
streamsWithSuccesfulSetup.add(stream.id());
}
}

/**
* Execute typing and deduping for a single stream (i.e. fetch new raw records into the final table,
* etc.).
* <p>
* This method is thread-safe; multiple threads can call it concurrently.
*
* @param originalNamespace The stream's namespace, as declared in the configured catalog
* @param originalName The stream's name, as declared in the configured catalog
*/
public void typeAndDedupe(final String originalNamespace, final String originalName) throws Exception {
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
final var streamConfig = parsedCatalog.getStream(originalNamespace, originalName);
synchronized(tdLocks.get(streamConfig.id())) {
if (streamsWithSuccesfulSetup.stream()
.noneMatch(streamId -> streamId.originalNamespace().equals(originalNamespace) && streamId.originalName().equals(originalName))) {
// For example, if T+D setup fails, but the consumer tries to run T+D on all streams during close,
// we should skip it.
LOGGER.warn("Skipping typing and deduping for {}.{} because we could not set up the tables for this stream.", originalNamespace, originalName);
return;
if (!streamsWithSuccesfulSetup.contains(streamConfig.id())) {
// For example, if T+D setup fails, but the consumer tries to run T+D on all streams during close,
// we should skip it.
LOGGER.warn("Skipping typing and deduping for {}.{} because we could not set up the tables for this stream.", originalNamespace, originalName);
return;
}

final Lock internalLock = internalTdLocks.get(streamConfig.id());
if (internalLock.tryLock()) {
LOGGER.info("Waiting for raw table writes to pause for {}.{}", originalNamespace, originalName);
final Lock externalLock = tdLocks.get(streamConfig.id()).writeLock();
externalLock.lock();
try {
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
destinationHandler.execute(sql);
} finally {
LOGGER.info("Allowing other threads to proceed for {}.{}", originalNamespace, originalName);
externalLock.unlock();
internalLock.unlock();
}
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
destinationHandler.execute(sql);
} else {
LOGGER.info("Another thread is already trying to run typing and deduping for {}.{}. Skipping it here.", originalNamespace, originalName);
}
}

/**
* Does any "end of sync" work. For most streams, this is a noop.
* <p>
* For OVERWRITE streams where we're writing to a temp table, this is where we swap the temp table
* into the final table.
*/
public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) {
final var streamConfig = parsedCatalog.getStream(originalNamespace, originalName);
return tdLocks.get(streamConfig.id()).readLock();
}

public void commitFinalTables() throws Exception {
LOGGER.info("Committing final tables");
for (final StreamConfig streamConfig : parsedCatalog.streams()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class NoopTyperDeduper implements TyperDeduper {

@Override
Expand All @@ -12,8 +16,44 @@ public void prepareTables() throws Exception {
}

@Override
public void typeAndDedupe(String originalNamespace, String originalName) throws Exception {
public void typeAndDedupe(final String originalNamespace, final String originalName) throws Exception {

}

@Override
public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) {
// Return a fake lock that does nothing.
return new Lock() {
@Override
public void lock() {

}

@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public boolean tryLock() {
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {

}

@Override
public Condition newCondition() {
return null;
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,48 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.util.concurrent.locks.Lock;

public interface TyperDeduper {

/**
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
*/
void prepareTables() throws Exception;

/**
* Suggest that we execute typing and deduping for a single stream (i.e. fetch new raw records into
* the final table, etc.).
* <p>
* This method is thread-safe; multiple threads can call it concurrently. If T+D is already running
* for the given stream, this method may choose to do nothing.
* <p>
* This method relies on callers to prevent concurrent modification to the underlying raw tables
* using {@link #getRawTableInsertLock(String, String)}. Raw table writes should be guarded by
* {@code getRawTableInsertLock().lock()} and {@code getRawTableInsertLock().unlock()}. While
* {@code typeAndDedupe} is executing, that lock will be unavailable. This is typically implemented
* using a {@link java.util.concurrent.locks.ReentrantReadWriteLock} with fairness enabled.
*
* @param originalNamespace The stream's namespace, as declared in the configured catalog
* @param originalName The stream's name, as declared in the configured catalog
*/
void typeAndDedupe(String originalNamespace, String originalName) throws Exception;

/**
* Get the lock that should be used to synchronize inserts to the raw table for a given stream.
* {@link #typeAndDedupe(String, String)} will not run while this lock is held.
*/
Lock getRawTableInsertLock(final String originalNamespace, final String originalName);

/**
* Does any "end of sync" work. For most streams, this is a noop.
* <p>
* For OVERWRITE streams where we're writing to a temp table, this is where we swap the temp table
* into the final table.
*/
void commitFinalTables() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -78,10 +79,16 @@ public static void copyIntoTableFromStage(final JdbcDatabase database,
final TyperDeduper typerDeduper)
throws Exception {
try {
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
tableName, schemaName);
final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(streamNamespace, streamName);
rawTableInsertLock.lock();
try {
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
tableName, schemaName);
} finally {
rawTableInsertLock.unlock();
}

AirbyteStreamNameNamespacePair streamId = new AirbyteStreamNameNamespacePair(streamName, streamNamespace);
final AirbyteStreamNameNamespacePair streamId = new AirbyteStreamNameNamespacePair(streamName, streamNamespace);
if (!typerDeduperValve.containsKey(streamId)) {
typerDeduperValve.addStream(streamId);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe putIfAbsent is better here to avoid races

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's a good call, though I think the valve is very nonthreadsafe anyway

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 9270ab6

Expand Down