0

So I have 2 questions which I think should be basic for people experienced in PySpark, but I can't seem to solve them.

Sample entries in my csv file are-

"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aw0L1uT_OfFyzbk","1"
"D9TOXY7rA_LsnvwQa-awVk","2"
"JWg8_0lGDA7OCwWcH_9aDc","2"
"ewrq.AAbRaACr2tVh5wA","1"
"ewrq.AALJWAAC-Qku3heg","1"
"ewrq.AADStQqmhJ7A","2"
"ewrq.AAEAABh36oHUNA","1"
"ewrq.AALJABfV5u-7Yg","1"

I create the following dataframe-

>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

First, is this the right way to convert hits column to IntegerType()? Why are all values becoming null?

>>> df2 = df2.withColumn("hits", df2["hits"].cast(IntegerType()))
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...|null|
|"yDQ...|null|
|"qUU...|null|
+-------+----+
only showing top 3 rows

Second, I need to sort this list in descending order with respect to hits column. So, I tried this-

>>> df1 = df2.sort(col('hits').desc())
>>> df1.show(20)

But I get the following error-

java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.

I'm guessing it's due to the fact that I create my dataframe using-

>>> rdd = sc.textFile("/path/to/file/*")
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
​
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()

>>> df2 = my_df.selectExpr("_1 as user_id", "_2 as hits")
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

And I'm guessing there's extra commas in some rows. How do I avoid this - or what's the best way to read this file?

2 Answers 2

2

UPDATE

-- Adding file read and split

looking at example above, created a file like this

'"7wAfdgdfgd","7"'
'"1x3Qdfgdf","1"'
'"13xxyyzzsdff","13"'

--Please note the ' to make all the lines as single string Now the code to read it :

scala> val myRdd = sc.textFile("test_file.dat")
myRdd: org.apache.spark.rdd.RDD[String] = test_file.dat MapPartitionsRDD[1] at textFile at <console>:24
// please check the type of RDD , here it is string
// We need to have Iterable[tuple(String,String)] to convert it into Dataframe

scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1)))
res0: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26

// Finally
    scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1))).toDF("user_id","hits").show(false)
+--------------+----+
|user_id       |hits|
+--------------+----+
|"7wAfdgdfgd"  |"7" |
|"1x3Qdfgdf"   |"1" |
|"13xxyyzzsdff"|"13"|
+--------------+----+

END of UPDATE

since you are new(or otherwise), I recommend/practice running actual ANSI sql instead of pyspark.sql.functions. It makes easy to maintain + there is no advantage of using sql.functions over ansi sql. Obviously, you need to know sql/columns functions provided by spark as I used split,orderBy and cast in answer. Since you did not provide with content of text file, here is my take and all the 3 answers in one SQL

    myDf = spark.createDataFrame([("abc","7"),("xyz","18"),("lmn","4,xyz")],schema=["user_id","hits"])
myDf.show(20,False)
+-------+-----+
|user_id|hits |
+-------+-----+
|abc    |7    |
|xyz    |18   |
|lmn    |4,xyz|
+-------+-----+

myDf.createOrReplaceTempView("hits_table")

SQL + Result

    spark.sql("select user_id, cast(split(hits,',')[0] as integer) as hits from hits_table order by hits desc ").show(20,False)
    +-------+----+
    |user_id|hits|
    +-------+----+
    |xyz    |18  |
    |abc    |7   |
    |lmn    |4   |
    +-------+----+
Sign up to request clarification or add additional context in comments.

7 Comments

can you create the dataframe from an RDD like I did? I think the line my_df = rdd.map(lambda x: (x.split(","))).toDF() is the root cause of my issue. I get the error Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided. You're creating a dataframe out of simple data.
@kev, can you paste a sample of raw data , please ?
@kev, please check my updated answer on Reading as RDD ,splitting it and converting to dataframe. Also shown how the type changes with each operation
can you please give me answers for Python?
@kev Does not Python uses “[ ]” to get index of a list ? Will () work ?
|
0

So, w.r.t @SanBan answer, I came up with the following results-

>>> rdd = sc.textFile("/home/jsanghvi/work/buffer/*")

>>> schema =  StructType([StructField ("user_id", StringType(), True), StructField ("hits", StringType(), True)])

>>> my_rdd = rdd.map(lambda x: x.replace("'","")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))

>>> my_rdd2 = my_rdd.map(lambda x: str(x).replace("'","").replace("(", "").replace(")", "")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))

>>> df1 = spark.createDataFrame(my_rdd2, schema)

>>> dfx = df1.sort(col('hits').desc())

>>> dfx.show(5)
+----------------+--------------------+                                     
|         user_id|                hits|
+----------------+--------------------+
|"AUDIO_AUTO_PLAY| EXPANDABLE_AUTOM...|
|       "user_id"|             "_col1"|
| "AAESjk66lDk...|              "9999"|
| "ABexsk6sLlc...|              "9999"|
| "AAgb1k65pHI...|              "9999"|
+----------------+--------------------+

# removing garbage rows
>>> dfx = df2.filter(~col("hits").isin(["_col1", "EXPANDABLE_AUTOM..."]))

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.