2

I'm loading a parquet file as a spark dataset. I can query and create new datasets from the query. Now, I would like to add a new column to the dataset ("hashkey") and generate the values (e.g. md5sum(nameValue)). How can i achieve this?

public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();

    sparkConf.setAppName("Hello Spark");
    sparkConf.setMaster("local");

    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example")
            .config("spark.master", "local").config("spark.sql.warehouse.dir", "file:///C:\\spark_warehouse")
            .getOrCreate();

    Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("meetup.parquet");
    df.show();

    df.createOrReplaceTempView("tmpview");

    Dataset<Row> namesDF = spark.sql("SELECT * FROM tmpview where name like 'Spark-%'");

    namesDF.show();

}

The output looks like this:

+-------------+-----------+-----+---------+--------------------+
|         name|meetup_date|going|organizer|              topics|
+-------------+-----------+-----+---------+--------------------+
|    Spark-H20| 2016-01-01|   50|airisdata|[h2o, repeated sh...|
|   Spark-Avro| 2016-01-02|   60|airisdata|    [avro, usecases]|
|Spark-Parquet| 2016-01-03|   70|airisdata| [parquet, usecases]|
+-------------+-----------+-----+---------+--------------------+

2 Answers 2

2

Just add spark sql function for MD5 in you query.

Dataset<Row> namesDF = spark.sql("SELECT *, md5(name) as modified_name FROM tmpview where name like 'Spark-%'");
Sign up to request clarification or add additional context in comments.

Comments

0
Dataset<Row> ds = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("delimiter","|")
    .load("/home/cloudera/Desktop/data.csv");
ds.printSchema();

will print this :

root
 |-- ReferenceValueSet_Id: integer (nullable = true)
 |-- ReferenceValueSet_Name: string (nullable = true)
 |-- Code_Description: string (nullable = true)
 |-- Code_Type: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- CURR_FLAG: string (nullable = true)
 |-- REC_CREATE_DATE: timestamp (nullable = true)
 |-- REC_UPDATE_DATE: timestamp (nullable = true)

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(1));
        df1.printSchema();

after adding above code, it will append one column with constant values.

root
 |-- ReferenceValueSet_Id: integer (nullable = true)
 |-- ReferenceValueSet_Name: string (nullable = true)
 |-- Code_Description: string (nullable = true)
 |-- Code_Type: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- CURR_FLAG: string (nullable = true)
 |-- REC_CREATE_DATE: timestamp (nullable = true)
 |-- REC_UPDATE_DATE: timestamp (nullable = true)
 |-- Key: integer (nullable = true)

you can see column with name Key is added to the dataset.

If you wanted to add some column inplace of the constunt value, you can use below code to add it.

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(ds.col("Code")));
        df1.printSchema();
        df1.show();

now it will print watever the values is there into the column CODE. into the newly aded column named Key.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.