4
\$\begingroup\$

I need help in rewriting my code to be less repetitive. I am used to coding procedural and not object-oriented. My scala program is for Databricks.

how would you combine cmd 3 and 5 together? Does this involve using polymorphism? My notebook first import parquet staging files in parallel. Then it run notebooks in parallel. I am repeating my parallel function, tryNotebookRun, twice, but for different scenarios.

////cmd 1
// Set Environment
var client = "client"
var storageAccount = "storageaccount"
var container = client + "-dl"
// Connect to Azure DataLake
spark.conf.set(
  "fs.azure.account.key." + storageAccount + ".dfs.core.windows.net",
  dbutils.secrets.get(scope = storageAccount, key = storageAccount)
  )
// Set database
spark.sql("USE " + client)

////cmd 2
//import needed packages
import scala.concurrent.duration._
import scala.concurrent.{Future, blocking, Await}
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import scala.util.control.NonFatal
import scala.util.{Try, Success, Failure}
import java.util.concurrent.Executors
import com.databricks.WorkflowException
import collection.mutable._
import scala.collection.mutable.Map

////cmd 3
///this part set up functions and class for importing stg parquet files as spark tables in parallel
// the next two functions are for retry purpose. if running a process fail, it will retry
def tryRun (path: String, schema: String, table: String): Try[Any] = {
  Try{
    dbutils.fs.rm(s"dbfs:/user/hive/warehouse/$client.db/$schema$table", true)
    spark.sql(s"DROP TABLE IF EXISTS $schema$table")
    var df = sqlContext.read.parquet(s"$path/$schema$table/*.parquet")
    df.write.saveAsTable(schema + table)
  }
}
def runWithRetry(path: String, schema: String, table: String, maxRetries: Int = 3) = {
  var numRetries = 0
  while (numRetries < maxRetries){
    tryRun(path, schema, table) match {
      case Success(_) => numRetries = maxRetries
      case Failure(_) => numRetries = numRetries + 1
    }
  }
}
case class tableInfo(path: String, schema: String, table: String)
def parallelRuns(tableList: scala.collection.mutable.MutableList[tableInfo]): Future[Seq[Any]] = {
  val numRunsInParallel = 5
  // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
  // This code limits the number of parallel notebooks.
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numRunsInParallel))
  Future.sequence(
    tableList.map { item =>
      Future {
        runWithRetry(item.path, item.schema, item.table)
      }
      .recover {
        case NonFatal(e) => s"ERROR: ${e.getMessage}"
      }
    }
  )
}

////cmd 4
///Load STG data in the format of Parquet files from Data Lake to Databrick
//Variables
val schema = "STG"
val dataFolder = List(schema)
var tableCollection = MutableList[tableInfo]()
//List of data to be added
val tableList = List(
 "AdverseEvents",
 "Allergies"
)

for (table <- tableList){
  for (folder <- dataFolder){
    var path = s"abfss://$container@$storageAccount.dfs.core.windows.net/$folder"
    var a = tableInfo(path, schema, table)
    tableCollection += a
  }
}
val res = parallelRuns(tableCollection)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value

////cmd 5
///this part set up functions and class for running cdm notebooks in parallel
/// the next two functions are for retry purpose. if running a process fail, it will retry
def tryNotebookRun (path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String]): Try[Any] = {
  Try(
    if (parameters.nonEmpty){
      dbutils.notebook.run(path, timeout, parameters)
    }
    else{
      dbutils.notebook.run(path, timeout)
    }
  )
}
def runWithRetry(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String], maxRetries: Int = 3) = {
  var numRetries = 0
  while (numRetries < maxRetries){
    tryNotebookRun(path, timeout, parameters) match {
      case Success(_) => numRetries = maxRetries
      case Failure(_) => numRetries = numRetries + 1
    }
  }
}
case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])
def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[Any]] = {
  val numNotebooksInParallel = 5
  // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
  // This code limits the number of parallel notebooks.
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
  val ctx = dbutils.notebook.getContext()
  Future.sequence(
    notebooks.map { notebook =>
      Future {
        dbutils.notebook.setContext(ctx)
        runWithRetry(notebook.path, notebook.timeout, notebook.parameters)
      }
      .recover {
        case NonFatal(e) => s"ERROR: ${e.getMessage}"
      }
    }
  )
}

////cmd 6
//run notebooks in parallel
val notebooks = Seq(
  NotebookData("AUDAdverseEvents", 0, Map("client"->client)),
  NotebookData("AUDAllergies", 0, Map("client"->client))
)
val res = parallelNotebooks(notebooks)
Await.result(res, 3000000 seconds) // this is a blocking call.
res.value
```
\$\endgroup\$
1
  • 1
    \$\begingroup\$ The current question title, which states your concerns about the code, applies to too many questions on this site to be useful. The site standard is for the title to simply state the task accomplished by the code. Please see How do I ask a good question?. \$\endgroup\$ Commented Nov 27, 2020 at 8:12

1 Answer 1

2
\$\begingroup\$
  • in tryRun()/runWithRetry():

    • Naming is extremely poor. The function name should show what it does. "Run" what? The comment doesn't even help -- it just says "a process". What does it do?
    • There is no need to use a Try/Success/Failure construct as you are capturing neither the result of the process on success nor the exception on failure. You can just use the standard try/catch keywords.
    • You don't specify the return type of runWithRetry() - what's your intention? How is the caller supposed to know if it succeeded or failed?
    • Why don't you use the case class tableInfo as a parameter to these? More importantly from an OO point of view, why aren't these methods defined inside the class?
  • in parallelRuns():

    • Why are you using (explicitly, in fact) a MutableList? The list is not mutated, and seems to have no need to be mutated.
    • Why are you explicitly defining the parallelism instead of just using .par to get a Parallel Collection?
    • What is the comment about "notebooks" referring to?
    • What is the .recover {} clause supposed to do? The Future, as written, can't fail.
  • in cmd 4:

    • Please use yield inside the for to create a list, instead of building up the list item by item.
  • in cmd 5:

    • Create a class that does the necessary steps, and then have two things that inherit from it, specializing as necessary.
\$\endgroup\$
1
  • \$\begingroup\$ hello, thank you for the comments. I do have an updated version that is cleaner now. These are great tips \$\endgroup\$ Commented Mar 10, 2021 at 21:56

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.