1
\$\begingroup\$

Front Matter

I'm learning Scala and have not gotten used to functional programming and the language. I'm hoping a review of my naively implemented code can help me bridge my object-oriented ways to something more functional and Scala-prescribed.

I've uploaded all of my code here: https://gitlab.com/-/snippets/2209023/

Problem

I have to merge 2 data sets and product a XML output.

  • one dataset is entity data, e.g. entity.csv
name, ip,
name1,,
name1,1.2.3.4,
name3,1.2.3.4,
name4,1.2.3.4,
  • another data set is streaming data, e.g. streaming.csv
name, event,
name1, event1,
name1, event2,
name2, event3,
name3, event4,

Concerns

I was not sure how to merge the 2 datasets without storing the data in a list of model objects. Is there a functional programming pattern to do this type of joining/merging?

I thought that if I get a list of objects then I can feed it to a templating engine to produce the XML output. Is there a functional programming pattern to render the merged data as XML?

Approach

My naive approach is to:

  1. get a list of all the device names
  2. iterate through this list and fill out all of the relevant data needed for the XML output
  3. feed the list to a templating engine to product the XML file
    • This part has not been completed yet

Implementation

Step 1: Get List of Devices

In Main.scala, get a list of all the devices, e.g. devices_list:

  val streaming = spark.read
    ...
    .load("src/main/resources/streaming.csv") // using steaming data because we want devices that has events
  ...
  val devices_df = streaming
    .select(
      "name",  
    )
    .distinct

  val devices_list = new Devices(devices_df).devices()

In Devices.scala:

import org.apache.spark.sql.{Dataset, Row}
import scala.collection.mutable.ListBuffer

class Devices(
  val devices_df: Dataset[Row],
){
  private var _devices = List[Device]()

  def devices(): List[Device] = {
    _get_devices()
  }

  private[this] def _get_devices(): List[Device] = {
    if (_devices.isEmpty) {
      _devices = _initialize_list_of_devices()
    }
    _devices
  }

  private[this] def _initialize_list_of_devices(): List[Device] = {
    val devices_list = ListBuffer[Device]()
    for (device <- devices_df.collect()) {
      devices_list += new Device(
        device.getAs[String]("name"),
      )
    }
    devices_list.toList
  }
}

In Device.scala:

class Device(
  val name: String,
){
  private var _events = List[String]()
  private var _ip = ""

  def getName(): String = {
    name
  }
  def getEvents(): List[String] = {
    _events
  }
  def setEvents(events: List[String]): Unit = {
    _events = events
  }
  def getIp(): String = {
    _ip
  }
  def setIp(ip: String): Unit = {
    _ip = ip
  }
}

Step 2: Fill Out Data for Each Device

In Main.scala, iterate through the list of devices and use DataFiller class:

  for (device <- devices_list) {
    new DataFiller(
      device,
      streaming,
      entity,
    ).fill()
  }

In DataFiller:

import org.apache.spark.sql.{Dataset, SparkSession, Row}
import org.apache.spark.sql.functions.col

class DataFiller(
  var device: Device,
  val streaming: Dataset[Row],
  val entity: Dataset[Row],
){
  def fill() = {
    _fillEvents()
    _fillIp()
  }

  private[this] def _fillEvents(): Unit = {
    device.setEvents(
      streaming
        .filter(col("name") === device.getName())
        .select("event")
        .rdd.map(event => {
          event.getAs[String]("event")
        })
        .collect.toList
    )
  }
  private[this] def _fillIp(): Unit = {
    val ip_column = device_network_info
      .filter(col("name") === device.getName())
      .filter(row => {
        !row.getAs[String]("ip").isEmpty
      })
      .select("ip")

    var ip:Any = ""
    if(!ip_column.rdd.isEmpty) {
      ip = ip_column.first().get(0)
      device.setIp(ip.toString)
    }
  }
}

Step 3: Output to XML

I have not implemented this part yet. I was thinking to use a templating engine and render the devices_list. I was not sure how to do this functionally without having a list with all of the data in model classes.

Thank you for your time 🙏

Here's the entire Main.scala:

import org.apache.spark.sql.SparkSession

object Main extends App {
  val spark = SparkSession
    .builder()
    .master("local") 
    .getOrCreate()

  val streaming = spark.read
    ...
    .load("src/main/resources/streaming.csv")
  val entity = spark.read
    ...
    .load("src/main/resources/entity.csv")

  val devices_df = streaming
    .select(
      "name",  
    )
    .distinct

  val devices_list = new Devices(devices_df).devices()

  for (device <- devices_list) {
    new DataFiller(
      device,
      streaming,
      entity,
    ).fill()
  }

  // output devices_list to XML

  spark.stop()
}
```
\$\endgroup\$
3
  • 2
    \$\begingroup\$ The current question title, which states your concerns about the code, applies to too many questions on this site to be useful. The site standard is for the title to simply state the task accomplished by the code. Please see How do I ask a good question?. \$\endgroup\$ Commented Nov 19, 2021 at 7:08
  • 2
    \$\begingroup\$ Welcome to Code Review! I changed the title so that it describes what the code does per site goals: "State what your code does in your title, not your main concerns about it.". Please check that I haven't misrepresented your code, and correct it if I have. \$\endgroup\$ Commented Nov 19, 2021 at 8:05
  • \$\begingroup\$ @TobySpeight thank you for your help. I think that’s a better description than anything I can think. \$\endgroup\$ Commented Nov 19, 2021 at 8:18

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.