Front Matter
I'm learning Scala and have not gotten used to functional programming and the language. I'm hoping a review of my naively implemented code can help me bridge my object-oriented ways to something more functional and Scala-prescribed.
I've uploaded all of my code here: https://gitlab.com/-/snippets/2209023/
Problem
I have to merge 2 data sets and product a XML output.
- one dataset is entity data, e.g.
entity.csv
name, ip,
name1,,
name1,1.2.3.4,
name3,1.2.3.4,
name4,1.2.3.4,
- another data set is streaming data, e.g.
streaming.csv
name, event,
name1, event1,
name1, event2,
name2, event3,
name3, event4,
Concerns
I was not sure how to merge the 2 datasets without storing the data in a list of model objects. Is there a functional programming pattern to do this type of joining/merging?
I thought that if I get a list of objects then I can feed it to a templating engine to produce the XML output. Is there a functional programming pattern to render the merged data as XML?
Approach
My naive approach is to:
- get a list of all the device names
- iterate through this list and fill out all of the relevant data needed for the XML output
- feed the list to a templating engine to product the XML file
- This part has not been completed yet
Implementation
Step 1: Get List of Devices
In Main.scala, get a list of all the devices, e.g. devices_list:
val streaming = spark.read
...
.load("src/main/resources/streaming.csv") // using steaming data because we want devices that has events
...
val devices_df = streaming
.select(
"name",
)
.distinct
val devices_list = new Devices(devices_df).devices()
In Devices.scala:
import org.apache.spark.sql.{Dataset, Row}
import scala.collection.mutable.ListBuffer
class Devices(
val devices_df: Dataset[Row],
){
private var _devices = List[Device]()
def devices(): List[Device] = {
_get_devices()
}
private[this] def _get_devices(): List[Device] = {
if (_devices.isEmpty) {
_devices = _initialize_list_of_devices()
}
_devices
}
private[this] def _initialize_list_of_devices(): List[Device] = {
val devices_list = ListBuffer[Device]()
for (device <- devices_df.collect()) {
devices_list += new Device(
device.getAs[String]("name"),
)
}
devices_list.toList
}
}
In Device.scala:
class Device(
val name: String,
){
private var _events = List[String]()
private var _ip = ""
def getName(): String = {
name
}
def getEvents(): List[String] = {
_events
}
def setEvents(events: List[String]): Unit = {
_events = events
}
def getIp(): String = {
_ip
}
def setIp(ip: String): Unit = {
_ip = ip
}
}
Step 2: Fill Out Data for Each Device
In Main.scala, iterate through the list of devices and use DataFiller class:
for (device <- devices_list) {
new DataFiller(
device,
streaming,
entity,
).fill()
}
In DataFiller:
import org.apache.spark.sql.{Dataset, SparkSession, Row}
import org.apache.spark.sql.functions.col
class DataFiller(
var device: Device,
val streaming: Dataset[Row],
val entity: Dataset[Row],
){
def fill() = {
_fillEvents()
_fillIp()
}
private[this] def _fillEvents(): Unit = {
device.setEvents(
streaming
.filter(col("name") === device.getName())
.select("event")
.rdd.map(event => {
event.getAs[String]("event")
})
.collect.toList
)
}
private[this] def _fillIp(): Unit = {
val ip_column = device_network_info
.filter(col("name") === device.getName())
.filter(row => {
!row.getAs[String]("ip").isEmpty
})
.select("ip")
var ip:Any = ""
if(!ip_column.rdd.isEmpty) {
ip = ip_column.first().get(0)
device.setIp(ip.toString)
}
}
}
Step 3: Output to XML
I have not implemented this part yet. I was thinking to use a templating engine and render the devices_list. I was not sure how to do this functionally without having a list with all of the data in model classes.
Thank you for your time 🙏
Here's the entire Main.scala:
import org.apache.spark.sql.SparkSession
object Main extends App {
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
val streaming = spark.read
...
.load("src/main/resources/streaming.csv")
val entity = spark.read
...
.load("src/main/resources/entity.csv")
val devices_df = streaming
.select(
"name",
)
.distinct
val devices_list = new Devices(devices_df).devices()
for (device <- devices_list) {
new DataFiller(
device,
streaming,
entity,
).fill()
}
// output devices_list to XML
spark.stop()
}
```