Skip to content
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d9aa4d1
copy spec changes
edgao Aug 23, 2023
4d10768
logistics
edgao Aug 23, 2023
5e0f08f
remove normalization from build
edgao Aug 23, 2023
e160ad6
remove unnecessary change
edgao Aug 23, 2023
dd0a8db
inject param to stagingcsvgenerator+stagingconsumerfactory
edgao Aug 23, 2023
0e1f518
inject to GcsUtils
edgao Aug 23, 2023
b5ae4d3
hardcode snowflake
edgao Aug 23, 2023
0c8248d
hardcode in bigquery
edgao Aug 23, 2023
1dbd194
Merge branch 'master' into edgao/dv2/release
edgao Aug 24, 2023
5a978ed
derp, fix default behavior
edgao Aug 24, 2023
050c1d9
derp
edgao Aug 24, 2023
46e1e67
derp
edgao Aug 24, 2023
0f14339
maybe make bigquery tests pass
edgao Aug 24, 2023
7c4cd0c
fix snowflake tests?
edgao Aug 24, 2023
2402243
Merge branch 'master' into edgao/dv2/release
edgao Aug 24, 2023
dc619e0
fix snowflake unit tests
edgao Aug 24, 2023
a7f92d1
more snowflake test fix
edgao Aug 24, 2023
86f4ffc
Automated Commit - Format and Process Resources Changes
edgao Aug 24, 2023
b244993
disable legacy DATs on snowflake + bigquery
edgao Aug 24, 2023
9c3bcac
Update upgrade copy
evantahler Aug 24, 2023
cb5e32b
Merge branch 'master' into edgao/dv2/release
evantahler Aug 24, 2023
be9b921
also disable these tests :shrug:
edgao Aug 25, 2023
155e391
one more
edgao Aug 25, 2023
05b58a3
prevent concurrent T+D
edgao Aug 25, 2023
c31f1ff
add better locks
edgao Aug 28, 2023
723a13f
Merge branch 'master' into edgao/dv2/release
evantahler Aug 28, 2023
bda59c6
Merge branch 'edgao/dv2/release' into edgao/dv2/snowflake/locks
edgao Aug 28, 2023
8ca2c27
Automated Commit - Formatting Changes
evantahler Aug 28, 2023
66648a7
git pls
edgao Aug 28, 2023
9ec91ad
add mustRun parameter for explicitnes
edgao Aug 28, 2023
3b2f818
more comments
edgao Aug 28, 2023
c117798
Automated Commit - Formatting Changes
edgao Aug 28, 2023
5954328
Merge branch 'edgao/dv2/release' into edgao/dv2/snowflake/locks
edgao Aug 28, 2023
8135cf0
derp. also make bigquery staging lock things
edgao Aug 29, 2023
9270ab6
use putIfAbsent
edgao Aug 29, 2023
d9cdcc4
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Aug 29, 2023
0aa4d2b
logistics
edgao Aug 29, 2023
7e94392
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Aug 30, 2023
b30707e
better putIfAbsent
edgao Aug 30, 2023
7d0f922
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 6, 2023
e214fec
botched merge
edgao Sep 7, 2023
10b8356
refactor
edgao Sep 7, 2023
85fc426
prevent incremental T+D for bq staging
edgao Sep 7, 2023
a032c35
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 12, 2023
cf229de
spotbugs
edgao Sep 12, 2023
0d23374
why no autoformat?
edgao Sep 12, 2023
3fb9769
Automated Commit - Format and Process Resources Changes
edgao Sep 12, 2023
c359ce6
Merge branch 'master' into edgao/dv2/snowflake/locks
edgao Sep 12, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@
import static io.airbyte.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.countOfTypingDedupingThreads;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
import static java.util.Collections.singleton;

import autovalue.shaded.kotlin.Pair;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -30,7 +38,7 @@
* In a typical sync, destinations should call the methods:
* <ol>
* <li>{@link #prepareTables()} once at the start of the sync</li>
* <li>{@link #typeAndDedupe(String, String)} as needed throughout the sync</li>
* <li>{@link #typeAndDedupe(String, String, boolean)} as needed throughout the sync</li>
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
* </ol>
* Note that createFinalTables initializes some internal state. The other methods will throw an
Expand All @@ -51,6 +59,16 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<Pair<String, String>> streamsWithSuccessfulSetup;
// We only want to run a single instance of T+D per stream at a time. These objects are used for
// synchronization per stream.
// Use a read-write lock because we need the same semantics:
// * any number of threads can insert to the raw tables at the same time, as long as T+D isn't
// running (i.e. "read lock")
// * T+D must run in complete isolation (i.e. "write lock")
private final Map<StreamId, ReadWriteLock> tdLocks;
// These locks are used to prevent multiple simultaneous attempts to T+D the same stream.
// We use tryLock with these so that we don't queue up multiple T+D runs for the same stream.
private final Map<StreamId, Lock> internalTdLocks;

private final ExecutorService executorService;

Expand All @@ -66,6 +84,8 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
this.v1V2Migrator = v1V2Migrator;
this.v2TableMigrator = v2TableMigrator;
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
this.tdLocks = new HashMap<>();
this.internalTdLocks = new HashMap<>();
this.executorService = Executors.newFixedThreadPool(countOfTypingDedupingThreads(defaultThreadCount),
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
}
Expand All @@ -79,12 +99,6 @@ public DefaultTyperDeduper(
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>(), defaultThreadCount);
}

/**
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
*/
public void prepareTables() throws Exception {
if (overwriteStreamsWithTmpTable != null) {
throw new IllegalStateException("Tables were already prepared.");
Expand Down Expand Up @@ -137,38 +151,43 @@ private CompletableFuture<Optional<Exception>> prepareTablesFuture(final StreamC
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
}
streamsWithSuccessfulSetup.add(new Pair<>(stream.id().originalNamespace(), stream.id().originalName()));

// Use fair locking. This slows down lock operations, but that performance hit is by far dwarfed
// by our IO costs. This lock needs to be fair because the raw table writers are running almost
// constantly,
// and we don't want them to starve T+D.
tdLocks.put(stream.id(), new ReentrantReadWriteLock(true));
// This lock doesn't need to be fair; any T+D instance is equivalent and we'll skip T+D if we can't
// immediately acquire the lock.
internalTdLocks.put(stream.id(), new ReentrantLock());

return Optional.empty();
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Exception occurred while preparing tables for stream " + stream.id().originalName(), e);
return Optional.of(e);
}
}, this.executorService);
}

/**
* Execute typing and deduping for a single stream (i.e. fetch new raw records into the final table,
* etc.).
* <p>
* This method is thread-safe; multiple threads can call it concurrently.
*
* @param originalNamespace The stream's namespace, as declared in the configured catalog
* @param originalName The stream's name, as declared in the configured catalog
*/
public void typeAndDedupe(final String originalNamespace, final String originalName) throws Exception {
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
public void typeAndDedupe(final String originalNamespace, final String originalName, final boolean mustRun) throws Exception {
final var streamConfig = parsedCatalog.getStream(originalNamespace, originalName);
if (!streamsWithSuccessfulSetup.contains(new Pair<>(originalNamespace, originalName))) {
// For example, if T+D setup fails, but the consumer tries to run T+D on all streams during close,
// we should skip it.
LOGGER.warn("Skipping typing and deduping for {}.{} because we could not set up the tables for this stream.", originalNamespace, originalName);
return;
}
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
destinationHandler.execute(sql);
final CompletableFuture<Optional<Exception>> task = typeAndDedupeTask(streamConfig, mustRun);
reduceExceptions(
singleton(task),
String.format(
"The Following Exceptions were thrown while typing and deduping %s.%s:\n",
originalNamespace,
originalName
)
);
}

public CompletableFuture<Optional<Exception>> typeAndDedupeTask(StreamConfig streamConfig) {
public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) {
final var streamConfig = parsedCatalog.getStream(originalNamespace, originalName);
return tdLocks.get(streamConfig.id()).readLock();
}

public CompletableFuture<Optional<Exception>> typeAndDedupeTask(final StreamConfig streamConfig, final boolean mustRun) {
return CompletableFuture.supplyAsync(() -> {
final var originalNamespace = streamConfig.id().originalNamespace();
final var originalName = streamConfig.id().originalName();
Expand All @@ -180,11 +199,38 @@ public CompletableFuture<Optional<Exception>> typeAndDedupeTask(StreamConfig str
originalName);
return Optional.empty();
}
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
destinationHandler.execute(sql);

final boolean run;
Comment thread
edgao marked this conversation as resolved.
final Lock internalLock = internalTdLocks.get(streamConfig.id());
if (mustRun) {
// If we must run T+D, then wait until we acquire the lock.
internalLock.lock();
run = true;
} else {
// Otherwise, try and get the lock. If another thread already has it, then we should noop here.
run = internalLock.tryLock();
}

if (run) {
LOGGER.info("Waiting for raw table writes to pause for {}.{}", originalNamespace, originalName);
final Lock externalLock = tdLocks.get(streamConfig.id()).writeLock();
externalLock.lock();
try {
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
destinationHandler.execute(sql);
} finally {
LOGGER.info("Allowing other threads to proceed for {}.{}", originalNamespace, originalName);
externalLock.unlock();
internalLock.unlock();
}
} else {
LOGGER.info("Another thread is already trying to run typing and deduping for {}.{}. Skipping it here.", originalNamespace,
originalName);
}
return Optional.empty();
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Exception occurred while typing and deduping stream " + originalName, e);
return Optional.of(e);
}
Expand All @@ -196,7 +242,7 @@ public void typeAndDedupe() throws Exception {
LOGGER.info("Typing and deduping all tables");
final Set<CompletableFuture<Optional<Exception>>> typeAndDedupeTasks = new HashSet<>();
parsedCatalog.streams().forEach(streamConfig -> {
typeAndDedupeTasks.add(typeAndDedupeTask(streamConfig));
typeAndDedupeTasks.add(typeAndDedupeTask(streamConfig, true));
});
CompletableFuture.allOf(typeAndDedupeTasks.toArray(CompletableFuture[]::new)).join();
reduceExceptions(typeAndDedupeTasks, "The Following Exceptions were thrown while typing and deduping tables:\n");
Expand Down Expand Up @@ -234,7 +280,7 @@ private CompletableFuture<Optional<Exception>> commitFinalTableTask(final Stream
LOGGER.info("Overwriting final table with tmp table for stream {}.{}", streamId.originalNamespace(), streamId.originalName());
try {
destinationHandler.execute(overwriteFinalTable);
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Exception Occurred while committing final table for stream " + streamId.originalName(), e);
return Optional.of(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class NoopTyperDeduper implements TyperDeduper {

@Override
Expand All @@ -12,8 +16,46 @@ public void prepareTables() throws Exception {
}

@Override
public void typeAndDedupe(String originalNamespace, String originalName) throws Exception {
public void typeAndDedupe(final String originalNamespace, final String originalName, final boolean mustRun) throws Exception {

}

@Override
public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) {
// Return a fake lock that does nothing.
return new Lock() {

@Override
public void lock() {

}

@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public boolean tryLock() {
return false;
}

@Override
public boolean tryLock(final long time, final TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {

}

@Override
public Condition newCondition() {
return null;
}

};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TypeAndDedupeOperationValve extends ConcurrentHashMap<AirbyteStream

private static final Supplier<Long> SYSTEM_NOW = () -> System.currentTimeMillis();

private ConcurrentHashMap<AirbyteStreamNameNamespacePair, Integer> incrementalIndex;
private final ConcurrentHashMap<AirbyteStreamNameNamespacePair, Integer> incrementalIndex;

private final Supplier<Long> nowness;

Expand All @@ -42,7 +42,7 @@ public TypeAndDedupeOperationValve() {
*
* @param nownessSupplier Supplier which will return a long value representing now
*/
public TypeAndDedupeOperationValve(Supplier<Long> nownessSupplier) {
public TypeAndDedupeOperationValve(final Supplier<Long> nownessSupplier) {
super();
incrementalIndex = new ConcurrentHashMap<>();
this.nowness = nownessSupplier;
Expand All @@ -66,6 +66,11 @@ public void addStream(final AirbyteStreamNameNamespacePair key) {
put(key, nowness.get());
}

public void addStreamIfAbsent(final AirbyteStreamNameNamespacePair key) {
putIfAbsent(key, nowness.get());
incrementalIndex.putIfAbsent(key, 0);
}

/**
* Whether we should type and dedupe at this point in time for this particular stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,54 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.util.concurrent.locks.Lock;

public interface TyperDeduper {

/**
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
*/
void prepareTables() throws Exception;

void typeAndDedupe(String originalNamespace, String originalName) throws Exception;

/**
* Suggest that we execute typing and deduping for a single stream (i.e. fetch new raw records into
* the final table, etc.).
* <p>
* This method is thread-safe; multiple threads can call it concurrently. If T+D is already running
* for the given stream, this method may choose to do nothing. If a caller wishes to force T+D to
* run (for example, at the end of a sync), they may set {@code mustRun} to true.
* <p>
* This method relies on callers to prevent concurrent modification to the underlying raw tables.
* This is most easily accomplished using {@link #getRawTableInsertLock(String, String)}, if the
* caller guards all raw table writes using {@code getRawTableInsertLock().lock()} and
* {@code getRawTableInsertLock().unlock()}. While {@code typeAndDedupe} is executing, that lock
* will be unavailable. However, callers are free to enforce this in other ways (for example,
* single- threaded callers do not need to use the lock).
*
* @param originalNamespace The stream's namespace, as declared in the configured catalog
* @param originalName The stream's name, as declared in the configured catalog
*/
void typeAndDedupe(String originalNamespace, String originalName, boolean mustRun) throws Exception;

/**
* Get the lock that should be used to synchronize inserts to the raw table for a given stream. This
* lock permits any number of threads to hold the lock, but
* {@link #typeAndDedupe(String, String, boolean)} will not proceed while this lock is held.
* <p>
* This lock provides fairness guarantees, i.e. typeAndDedupe will not starve while waiting for the
* lock (and similarly, raw table writers will not starve if many typeAndDedupe calls are queued).
*/
Lock getRawTableInsertLock(final String originalNamespace, final String originalName);

/**
* Does any "end of sync" work. For most streams, this is a noop.
* <p>
* For OVERWRITE streams where we're writing to a temp table, this is where we swap the temp table
* into the final table.
*/
void typeAndDedupe() throws Exception;

void commitFinalTables() throws Exception;
Expand Down
Loading