Skip to main content
Tweeted twitter.com/StackCodeReview/status/1559057301578956800
Added directory structure overview
Source Link

This is how the pipeline works on a high level: The queries are saved in textfiles in the folder /app/queries. First these textfiles are configured via config_query.sh, by replacing the start and end strings of the queries. Then extract.sh passes the configured queries to the respective cliscli's and saves the data in /app/tmp/raw. The raw data is then processed and cleaned via transform.py and saved to /app/tmp/clean. Finally, all these steps are run via run.sh. If config_query.sh sets up a range between the start and end date that is longer than five days, run.sh splits up the queries into chunks of five days in order to avoid a timeout of the server.

This is an overview the directory structure:

app/
├── queries/
│   └── gads.sql
│   └── leads.flux
│   └── li.sql
│   └── ms.sql
│   └── subs.flux
├── src/
│   └── config_query.sh
│   └── extract.sh
│   └── transform.py
└── tmp/
    ├── clean/
    │   └── gads.csv
    │   └── leads.csv
    │   └── li.csv
    │   └── ms.csv
    │   └── subs.csv
    ├── raw/
    │   └── gads.csv
    │   └──leads.csv
    │   └── li.csv
    │   └── ms.csv
    │   └── subs.csv
    └── gads_startdate_enddate.csv
    └── leads_startdate_enddate.csv
    └── li_startdate_enddate.csv
    └── ms_startdate_enddate.csv
    └── subs_startdate_enddate.csv
#!/bin/bash
# Run config_query.sh, extract.sh and transform.py.
# Split up jobs in chunks of 5 days if start date and end date are far apart.

# Example usage: ./run.sh start_date end_date

# pass date in format YYYY-mm-dd:
start=$1
end=$2

# Check if start and end are valid dates
for date in $start $end
do
  # Check format
  format="^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
  if [[ $start =~ $format && $end =~ $format ]]
    then
      echo "Date $1 is in valid format (YYYY-MM-DD)"
    else
      echo "Date $1 is in an invalid format (not YYYY-mm-dd)."
      exit $?
  fi

# Check if date value is valid
  date -d $date
  if [ $? -ne 0 ]; then
   echo "${date} is not a valid date. Enter a valid date."
   exit $?
  fi
done

# Setup root directory path
ROOT="/app"

# Setup clean files directory paths
CLEAN_GADS="${ROOT}/tmp/clean/gads.csv"
CLEAN_LEADS="${ROOT}/tmp/clean/leads.csv"
CLEAN_LI="${ROOT}/tmp/clean/li.csv"
CLEAN_MS="${ROOT}/tmp/clean/ms.csv"
CLEAN_SUBS="${ROOT}/tmp/clean/subs.csv"

# Setup output paths
GADS="${ROOT}/tmp/gads_${start}_${end}.csv"
LEADS="${ROOT}/tmp/leads_${start}_${end}.csv"
LI="${ROOT}/tmp/li_${start}_${end}.csv"
MS="${ROOT}/tmp/ms_${start}_${end}.csv"
SUBS="${ROOT}/tmp/subs_${start}_${end}.csv"

# Delete previously created reports
find $ROOT/tmp -maxdepth 1 -type f -delete

# Setup schema for output paths
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$GADS"
echo "New_ID,date,path,leads,registrations,trials,basic-trial,business-trial,education-trial,enterprise-trial" > "$LEADS"
echo "New_ID,date,campaign,impressions,clicks,spend" > "$LI"
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$MS"
echo "New_ID,subscription,date,total_subscriptions,new_subscriptions,isTrial,offer,segment"segment,lost_subscriptions" > "$SUBS"

# Download data in 5-day chunks:
while [[ "$start" < "$end" ]]; do

  temp=$(date -I -d "$start + 5 day")

  if [[ "$temp" < "$end" ]]; then
    ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
  else
    ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
  fi

  # Append data to files
  tail -n +2 "$CLEAN_GADS" >> "$GADS"
  tail -n +2 "$CLEAN_LEADS" >> "$LEADS"
  tail -n +2 "$CLEAN_LI" >> "$LI"
  tail -n +2 "$CLEAN_MS" >> "$MS"
  tail -n +2 "$CLEAN_SUBS" >> "$SUBS"

  start=$(date -I -d "$start + 5 day")
done

This is how the pipeline works on a high level: The queries are saved in textfiles in the folder /app/queries. First these textfiles are configured via config_query.sh, by replacing the start and end strings of the queries. Then extract.sh passes the configured queries to the respective clis and saves the data in /app/tmp/raw. The raw data is then processed and cleaned via transform.py and saved to /app/tmp/clean. Finally, all these steps are run via run.sh. If config_query.sh sets up a range between the start and end date that is longer than five days, run.sh splits up the queries into chunks of five days in order to avoid a timeout of the server.

#!/bin/bash
# Run config_query.sh, extract.sh and transform.py.
# Split up jobs in chunks of 5 days if start date and end date are far apart.

# Example usage: ./run.sh start_date end_date

# pass date in format YYYY-mm-dd:
start=$1
end=$2

# Check if start and end are valid dates
for date in $start $end
do
  # Check format
  format="^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
  if [[ $start =~ $format && $end =~ $format ]]
    then
      echo "Date $1 is in valid format (YYYY-MM-DD)"
    else
      echo "Date $1 is in an invalid format (not YYYY-mm-dd)."
      exit $?
  fi

# Check if date value is valid
  date -d $date
  if [ $? -ne 0 ]; then
   echo "${date} is not a valid date. Enter a valid date."
   exit $?
  fi
done

# Setup root directory path
ROOT="/app"

# Setup clean files directory paths
CLEAN_GADS="${ROOT}/tmp/clean/gads.csv"
CLEAN_LEADS="${ROOT}/tmp/clean/leads.csv"
CLEAN_LI="${ROOT}/tmp/clean/li.csv"
CLEAN_MS="${ROOT}/tmp/clean/ms.csv"
CLEAN_SUBS="${ROOT}/tmp/clean/subs.csv"

# Setup output paths
GADS="${ROOT}/tmp/gads_${start}_${end}.csv"
LEADS="${ROOT}/tmp/leads_${start}_${end}.csv"
LI="${ROOT}/tmp/li_${start}_${end}.csv"
MS="${ROOT}/tmp/ms_${start}_${end}.csv"
SUBS="${ROOT}/tmp/subs_${start}_${end}.csv"

# Delete previously created reports
find $ROOT/tmp -maxdepth 1 -type f -delete

# Setup schema for output paths
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$GADS"
echo "New_ID,date,path,leads,registrations,trials,basic-trial,business-trial,education-trial,enterprise-trial" > "$LEADS"
echo "New_ID,date,campaign,impressions,clicks,spend" > "$LI"
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$MS"
echo "New_ID,subscription,date,total_subscriptions,new_subscriptions,isTrial,offer,segment" > "$SUBS"

# Download data in 5-day chunks:
while [[ "$start" < "$end" ]]; do

  temp=$(date -I -d "$start + 5 day")

  if [[ "$temp" < "$end" ]]; then
    ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
  else
    ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
  fi

  # Append data to files
  tail -n +2 "$CLEAN_GADS" >> "$GADS"
  tail -n +2 "$CLEAN_LEADS" >> "$LEADS"
  tail -n +2 "$CLEAN_LI" >> "$LI"
  tail -n +2 "$CLEAN_MS" >> "$MS"
  tail -n +2 "$CLEAN_SUBS" >> "$SUBS"

  start=$(date -I -d "$start + 5 day")
done

This is how the pipeline works on a high level: The queries are saved in textfiles in the folder /app/queries. First these textfiles are configured via config_query.sh, by replacing the start and end strings of the queries. Then extract.sh passes the configured queries to the respective cli's and saves the data in /app/tmp/raw. The raw data is then processed and cleaned via transform.py and saved to /app/tmp/clean. Finally, all these steps are run via run.sh. If config_query.sh sets up a range between the start and end date that is longer than five days, run.sh splits up the queries into chunks of five days in order to avoid a timeout of the server.

This is an overview the directory structure:

app/
├── queries/
│   └── gads.sql
│   └── leads.flux
│   └── li.sql
│   └── ms.sql
│   └── subs.flux
├── src/
│   └── config_query.sh
│   └── extract.sh
│   └── transform.py
└── tmp/
    ├── clean/
    │   └── gads.csv
    │   └── leads.csv
    │   └── li.csv
    │   └── ms.csv
    │   └── subs.csv
    ├── raw/
    │   └── gads.csv
    │   └──leads.csv
    │   └── li.csv
    │   └── ms.csv
    │   └── subs.csv
    └── gads_startdate_enddate.csv
    └── leads_startdate_enddate.csv
    └── li_startdate_enddate.csv
    └── ms_startdate_enddate.csv
    └── subs_startdate_enddate.csv
#!/bin/bash
# Run config_query.sh, extract.sh and transform.py.
# Split up jobs in chunks of 5 days if start date and end date are far apart.

# Example usage: ./run.sh start_date end_date

# pass date in format YYYY-mm-dd:
start=$1
end=$2

# Check if start and end are valid dates
for date in $start $end
do
  # Check format
  format="^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
  if [[ $start =~ $format && $end =~ $format ]]
    then
      echo "Date $1 is in valid format (YYYY-MM-DD)"
    else
      echo "Date $1 is in an invalid format (not YYYY-mm-dd)."
      exit $?
  fi

# Check if date value is valid
  date -d $date
  if [ $? -ne 0 ]; then
   echo "${date} is not a valid date. Enter a valid date."
   exit $?
  fi
done

# Setup root directory path
ROOT="/app"

# Setup clean files directory paths
CLEAN_GADS="${ROOT}/tmp/clean/gads.csv"
CLEAN_LEADS="${ROOT}/tmp/clean/leads.csv"
CLEAN_LI="${ROOT}/tmp/clean/li.csv"
CLEAN_MS="${ROOT}/tmp/clean/ms.csv"
CLEAN_SUBS="${ROOT}/tmp/clean/subs.csv"

# Setup output paths
GADS="${ROOT}/tmp/gads_${start}_${end}.csv"
LEADS="${ROOT}/tmp/leads_${start}_${end}.csv"
LI="${ROOT}/tmp/li_${start}_${end}.csv"
MS="${ROOT}/tmp/ms_${start}_${end}.csv"
SUBS="${ROOT}/tmp/subs_${start}_${end}.csv"

# Delete previously created reports
find $ROOT/tmp -maxdepth 1 -type f -delete

# Setup schema for output paths
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$GADS"
echo "New_ID,date,path,leads,registrations,trials,basic-trial,business-trial,education-trial,enterprise-trial" > "$LEADS"
echo "New_ID,date,campaign,impressions,clicks,spend" > "$LI"
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$MS"
echo "New_ID,subscription,date,total_subscriptions,new_subscriptions,isTrial,offer,segment,lost_subscriptions" > "$SUBS"

# Download data in 5-day chunks:
while [[ "$start" < "$end" ]]; do

  temp=$(date -I -d "$start + 5 day")

  if [[ "$temp" < "$end" ]]; then
    ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
  else
    ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
  fi

  # Append data to files
  tail -n +2 "$CLEAN_GADS" >> "$GADS"
  tail -n +2 "$CLEAN_LEADS" >> "$LEADS"
  tail -n +2 "$CLEAN_LI" >> "$LI"
  tail -n +2 "$CLEAN_MS" >> "$MS"
  tail -n +2 "$CLEAN_SUBS" >> "$SUBS"

  start=$(date -I -d "$start + 5 day")
done
added 18 characters in body
Source Link

Example queries in /app/queries:

Example queries:

Example queries in /app/queries:

Source Link

Implementing a data extraction pipeline with Bash

I wrote the following scripts to extract data from BigQuery and InfluxDB using the bigquery cli and the influx cli. The code works as expected. I included all files for completeness, but I'm primarily interested in feedback on the bash scripts config_query.sh, extract.sh and run.sh. You can also comment on the python file and on my queries, if you want, but the focus should be on my bash scripts. The overall goal is to make the scripts as maintainable and reliable as possible, execution speed doesn't matter.

This is how the pipeline works on a high level: The queries are saved in textfiles in the folder /app/queries. First these textfiles are configured via config_query.sh, by replacing the start and end strings of the queries. Then extract.sh passes the configured queries to the respective clis and saves the data in /app/tmp/raw. The raw data is then processed and cleaned via transform.py and saved to /app/tmp/clean. Finally, all these steps are run via run.sh. If config_query.sh sets up a range between the start and end date that is longer than five days, run.sh splits up the queries into chunks of five days in order to avoid a timeout of the server.

Example queries:

# BigQuery query:
WITH dates AS (
    SELECT
    *
    FROM
        UNNEST(GENERATE_DATE_ARRAY('2022-01-01', DATE_SUB('2022-01-01', INTERVAL 1 DAY), INTERVAL 1 DAY)) AS date
), gads_data AS (
    SELECT DATE(__hevo_report_date) AS date,
        campaign,
        ad_group,
        sum(impressions) AS impressions,
        sum(clicks) AS clicks,
        ROUND(sum(cost)/1000000, 2) AS spend
    FROM `example_table`
    WHERE DATE(__hevo_report_date) >= '2022-01-01' AND DATE(__hevo_report_date) < '2022-01-01'
    GROUP BY date, campaign, ad_group
)

SELECT *
FROM dates
LEFT OUTER JOIN gads_data
ON dates.date = gads_data.date

# Influx query:
from(bucket: "example_bucket")
  |> range(start: 2022-01-01T00:00:00.000Z, stop: 2022-01-01T00:00:00.000Z)
  |> aggregateWindow(every: 1d, fn: sum, column: "val")
  |> yield(name: "mean")

Here are the scripts:

config_query.sh:

#!/bin/bash
# Configures start date and end date of query files

START_DATE=$1
END_DATE=$2

# Setup directories
HOME="/app"
F1="${HOME}/queries/leads.flux"
F2="${HOME}/queries/subs.flux"
F3="${HOME}/queries/gads.sql"
F4="${HOME}/queries/ms.sql"
F5="${HOME}/queries/li.sql"

# Replaces 'start' and 'stop' in Influx queries:
for f in $F1 $F2
do
  sed -i -E  "s/start: .{4}-.{2}-.{2}T00:00:00.000Z, stop: .{4}-.{2}-.{2}/start: ${START_DATE}T00:00:00.000Z, stop: ${END_DATE}/g" $f
done

# Replaces 'START_DATE' and 'END_DATE' in Bigquery queries:
for f in $F3 $F4 $F5
do
  sed -i -E  "s/\('.{4}-.{2}-.{2}', DATE_SUB\('.{4}-.{2}-.{2}',/\('$START_DATE', DATE_SUB\('$END_DATE',/g" $f;
  sed -i -E  "s/>= '.{4}-.{2}-.{2}'/>= '$START_DATE'/g" $f;
  sed -i -E  "s/< '.{4}-.{2}-.{2}'/< '$END_DATE'/g" $f;
done

extract.sh:

#!/bin/bash
# Saves query output to csv-files in folder tmp/raw

# Setup filepaths:
ROOT="/app"
QUERIES="${ROOT}/queries"
RAW="${ROOT}/tmp/raw"

# Setup query paths:
Q1="${QUERIES}/leads.flux"
Q2="${QUERIES}/subs.flux"
Q3="${QUERIES}/gads.sql"
Q4="${QUERIES}/ms.sql"
Q5="${QUERIES}/li.sql"

# Setup target paths:
F1="${RAW}/leads.csv"
F2="${RAW}/subs.csv"
F3="${RAW}/gads.csv"
F4="${RAW}/ms.csv"
F5="${RAW}/li.csv"

# Check if connection to BigQuery works:
bq query --dry_run --use_legacy_sql=false < $Q3 > $F3
if [ $? -ne 0 ]; then
 echo "No connection to BigQuery possible. Program exiting."
 exit $?;
fi

# Run queries:
for ((i=1;i<=5;i++))
do
    if (( $i <= 2 )); then
      # run influx
      eval "influx query --file \$Q${i} -r > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
    else
      # run bq
      eval "(bq query -q --use_legacy_sql=false --format=csv --max_rows=100000 < \$Q${i}) > \$F${i} && echo '$(date): Extracted \$Q${i} to \$F${i}...'";
    fi

    if [ $? -ne 0 ]; then
     echo "An error occurred. Program exiting."
     exit $?;
    fi

done

transform.py:

import pandas as pd

# Setup directories:
ROOT_DIR = '/app'
RAW_DIR = ROOT_DIR + '/tmp/raw'
CLEAN_DIR = ROOT_DIR + '/tmp/clean'

# Setup Schemas:
SCHEMA = ['date', 'campaign', 'ad_group', 'impressions', 'clicks', 'spend']
SCHEMA_LI = ['date', 'campaign', 'impressions', 'clicks', 'spend']
SCHEMA_LEADS = ['date', 'path', 'leads', 'registrations', 'trials',
                'basic-trial', 'business-trial', 'education-trial', 'enterprise-trial']
SCHEMA_SUBS = ['New_ID', 'subscription', 'date', 'total_subscriptions',
               'new_subscriptions', 'isTrial', 'offer', 'segment']


def transform_li():
    try:
        # Read Data
        file = RAW_DIR + '/li.csv'
        df = pd.read_csv(file, header=0)
        df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
        df.insert(0, 'New_ID', range(0, 0 + len(df)))

    except KeyError:
        raise KeyError("Your query output does not conform to the specified schema."
                       "Have you perhaps used 'bq query --dry_run'?")

    except pd.errors.EmptyDataError:
        df = pd.DataFrame(columns=SCHEMA_LI.insert(0, 'NEW_ID'))

    finally:
        # Save Data
        path_to = CLEAN_DIR + '/li.csv'
        df.to_csv(path_to, index=False)


def transform_ms():
    try:
        # Read Data
        file = RAW_DIR + '/ms.csv'
        df = pd.read_csv(file, header=0)
        df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
        df.insert(0, 'New_ID', range(0, 0 + len(df)))

    except KeyError:
        raise KeyError("Your query output does not conform to the specified schema."
                       "Have you perhaps used 'bq query --dry_run'?")

    except pd.errors.EmptyDataError:
        df = pd.DataFrame(columns=SCHEMA.insert(0, 'NEW_ID'))

    finally:
        # Save Data
        path_to = CLEAN_DIR + '/ms.csv'
        df.to_csv(path_to, index=False)


def transform_gads():
    try:
        file = RAW_DIR + '/gads.csv'
        df = pd.read_csv(file, sep=',', header=0)
        df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d %H:%M:%S')
        df.insert(0, 'New_ID', range(0, 0 + len(df)))

    except KeyError:
        raise KeyError("Your query output does not conform to the specified schema."
                       "Have you perhaps used 'bq query --dry_run'?")

    except pd.errors.EmptyDataError:
        df = pd.DataFrame(columns=SCHEMA.insert(0, 'NEW_ID'))

    finally:
        # Save Data
        path_to = CLEAN_DIR + '/gads.csv'
        df.to_csv(path_to, index=False)


def transform_subs():
    try:
        # Read Data
        file = RAW_DIR + '/subs.csv'
        df = pd.read_csv(file,
                         header=3,
                         usecols=['_time', '_value', 'added_values', '_field',
                                  'isTrial', 'offer', 'segment', 'sub_values'])
        df.rename(columns={'_field': 'subscription',
                           '_time': 'date',
                           '_value': 'total_subscriptions',
                           'added_values': 'new_subscriptions',
                           'sub_values': 'lost_subscriptions'}, inplace=True)
        df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%dT%H:%M:%S.%fZ')
        df['date'] = df['date'] - pd.Timedelta(days=1)
        df.insert(0, 'New_ID', range(0, 0 + len(df)))

    except pd.errors.EmptyDataError:
        df = pd.DataFrame(columns=SCHEMA_SUBS.insert(0, 'NEW_ID'))

    except KeyError:
        raise KeyError("Your query output does not conform to the specified schema."
                       "Have you perhaps used 'bq query --dry_run'?")

    finally:
        # Save Data
        path_to = CLEAN_DIR + '/subs.csv'
        df.to_csv(path_to, index=False)


def transform_leads():
    try:
        # Read Data
        file = RAW_DIR + '/leads.csv'
        df = pd.read_csv(file, header=3, usecols=['_time', 'val', 'path', '_field'])

        # Transform Data
        df = pd.pivot_table(df, values='val', index=['_time', 'path'], columns='_field').reset_index()
        df['trials'] = df['basic-trial'] + df['education-trial'] + df['business-trial'] + df['enterprise-trial']
        df.rename(columns={'_time': 'date',
                           'registration': 'leads',
                           'registration_completed': 'registrations'}, inplace=True)
        df = df.astype({'leads': int,
                        'registrations': int,
                        'trials': int,
                        'basic-trial': int,
                        'business-trial': int,
                        'education-trial': int,
                        'enterprise-trial': int})
        df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%dT%H:%M:%SZ')
        df['date'] = df['date'] - pd.Timedelta(days=1)
        cols = SCHEMA_LEADS
        df = df[cols]
        df.insert(0, 'New_ID', range(0, 0 + len(df)))

    except pd.errors.EmptyDataError:
        df = pd.DataFrame(columns=SCHEMA_LEADS.insert(0, 'NEW_ID'))

    except KeyError:
        raise KeyError("Your query output does not conform to the specified schema."
                       "Have you perhaps used 'bq query --dry_run'?")

    finally:
        # Save Data
        path_to = CLEAN_DIR + '/leads.csv'
        df.to_csv(path_to, index=False)


if __name__ == '__main__':
    # Execute all functions in transform.py
    import transform
    for i in dir(transform):
        item = getattr(transform, i)
        if callable(item):
            item()

run.sh:

#!/bin/bash
# Run config_query.sh, extract.sh and transform.py.
# Split up jobs in chunks of 5 days if start date and end date are far apart.

# Example usage: ./run.sh start_date end_date

# pass date in format YYYY-mm-dd:
start=$1
end=$2

# Check if start and end are valid dates
for date in $start $end
do
  # Check format
  format="^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
  if [[ $start =~ $format && $end =~ $format ]]
    then
      echo "Date $1 is in valid format (YYYY-MM-DD)"
    else
      echo "Date $1 is in an invalid format (not YYYY-mm-dd)."
      exit $?
  fi

# Check if date value is valid
  date -d $date
  if [ $? -ne 0 ]; then
   echo "${date} is not a valid date. Enter a valid date."
   exit $?
  fi
done

# Setup root directory path
ROOT="/app"

# Setup clean files directory paths
CLEAN_GADS="${ROOT}/tmp/clean/gads.csv"
CLEAN_LEADS="${ROOT}/tmp/clean/leads.csv"
CLEAN_LI="${ROOT}/tmp/clean/li.csv"
CLEAN_MS="${ROOT}/tmp/clean/ms.csv"
CLEAN_SUBS="${ROOT}/tmp/clean/subs.csv"

# Setup output paths
GADS="${ROOT}/tmp/gads_${start}_${end}.csv"
LEADS="${ROOT}/tmp/leads_${start}_${end}.csv"
LI="${ROOT}/tmp/li_${start}_${end}.csv"
MS="${ROOT}/tmp/ms_${start}_${end}.csv"
SUBS="${ROOT}/tmp/subs_${start}_${end}.csv"

# Delete previously created reports
find $ROOT/tmp -maxdepth 1 -type f -delete

# Setup schema for output paths
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$GADS"
echo "New_ID,date,path,leads,registrations,trials,basic-trial,business-trial,education-trial,enterprise-trial" > "$LEADS"
echo "New_ID,date,campaign,impressions,clicks,spend" > "$LI"
echo "New_ID,date,campaign,ad_group,impressions,clicks,spend" > "$MS"
echo "New_ID,subscription,date,total_subscriptions,new_subscriptions,isTrial,offer,segment" > "$SUBS"

# Download data in 5-day chunks:
while [[ "$start" < "$end" ]]; do

  temp=$(date -I -d "$start + 5 day")

  if [[ "$temp" < "$end" ]]; then
    ./src/config_query.sh $start $temp && ./src/extract.sh && python ./src/transform.py
  else
    ./src/config_query.sh $start $end && ./src/extract.sh && python ./src/transform.py
  fi

  # Append data to files
  tail -n +2 "$CLEAN_GADS" >> "$GADS"
  tail -n +2 "$CLEAN_LEADS" >> "$LEADS"
  tail -n +2 "$CLEAN_LI" >> "$LI"
  tail -n +2 "$CLEAN_MS" >> "$MS"
  tail -n +2 "$CLEAN_SUBS" >> "$SUBS"

  start=$(date -I -d "$start + 5 day")
done