0

I have a PySpark dataframe which looks like:

+------+-------------------+
|port  |  timestamp        |
+------+-------------------+
|9200  |2020-06-19 02:12:41|
|9200  |2020-06-19 03:54:23|
|51    |2020-06-19 05:32:11|
|22    |2020-06-20 06:07:43|
|22    |2020-06-20 01:11:12|
|51    |2020-06-20 07:38:49|
+------+-------------------+ 

I'm trying to find the number of times a distinct port is used per day

For example, the resulting dataframe should look like this:

+------------+----------------+
|window      |  ports         |
+------------+----------------+
|2020-06-19  |{9200: 2, 51: 1}|
|2020-06-20  |{22: 2, 51:1 }  |
+------------+----------------+ 

It definitely does not need to be stored in a dictionary, I'm just not sure how it should look to capture all ports per day.

I've currently tried the following:

df.groupBy(window(df['timestamp'], "1 day")).agg(count('port'))

which results in:

+------------+----------------+
|window      |  count(port)   |
+------------+----------------+
|2020-06-19  |3               |
|2020-06-20  |3               |
+------------+----------------+ 

This is not what I'm looking for as it only counts the number of ports per day, and does not split by the distinct ports.

2
  • Your looking for a solution in Pyspark or pandas? Commented Dec 15, 2020 at 15:02
  • PySpark please! Commented Dec 15, 2020 at 15:12

3 Answers 3

2

Group by window and port, aggregate count of ports, then group by window and collect the port count into an array.

df.groupBy(
    F.window(df['timestamp'], "1 day"), 'port'
).agg(
    F.array(
        F.col('port'),
        F.count('port')
    ).alias('ports')
).groupBy(
    'window'
).agg(
    F.collect_list('ports').alias('ports')
).withColumn(
    'window',
    F.col('window')['start'].cast('date')
)

+----------+--------------------+
|    window|               ports|
+----------+--------------------+
|2020-06-19|[[9200, 2], [51, 1]]|
|2020-06-20|  [[51, 1], [22, 2]]|
+----------+--------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for this, it works great! Just as a follow-up: is there any way to only get the count of specified ports? For example _ports = [9200, 22], which would mean 51 is not included in the data frame.
yes, just do df = df.filter('port != 51') at the very beginning.
1

If your intention is to get only the number of times a distinct port is used per day, then it is nothing but just a case of aggregating the count of records over a group of 'date column' and 'port'.

import pyspark.sql.functions as F
df.groupBy(F.to_date('timestamp').alias('date'),'port').count().orderBy('date','port').show()
+----------+----+-----+
|      date|port|count|
+----------+----+-----+
|2020-06-19|  51|    1|
|2020-06-19|9200|    2|
|2020-06-20|  22|    2|
|2020-06-20|  51|    1|
+----------+----+-----+

Comments

1

Spark-sql solution:

val df = spark.sql("""
with t1 (
select 9200 x,  '2020-06-19 02:12:41' y union all 
select 9200 x, '2020-06-19 03:54:23' y union all 
select 51 x, '2020-06-19 05:32:11' y union all 
select 22 x, '2020-06-20 06:07:43' y union all 
select 22 x, '2020-06-20 01:11:12' y union all 
select 51 x, '2020-06-20 07:38:49' y
) select x as port, y as timestamp  from t1
""")
df.show(false)

+----+-------------------+
|port|timestamp          |
+----+-------------------+
|9200|2020-06-19 02:12:41|
|9200|2020-06-19 03:54:23|
|51  |2020-06-19 05:32:11|
|22  |2020-06-20 06:07:43|
|22  |2020-06-20 01:11:12|
|51  |2020-06-20 07:38:49|
+----+-------------------+

df.createOrReplaceTempView("logtable")

spark.sql(""" 
select window, collect_set(struct(port,t)) ports from 
( select cast(timestamp as date) window, port, count(port) over(partition by cast(timestamp as date), port ) t from  logtable ) temp 
group by 1
""").show(false)

+----------+--------------------+
|window    |ports               |
+----------+--------------------+
|2020-06-20|[[22, 2], [51, 1]]  |
|2020-06-19|[[9200, 2], [51, 1]]|
+----------+--------------------+

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.