0

I have a DataFrame about connection log with columns targetIP, Time. Every record in this DataFrame is a connection event to one system. targetIP means the target IP address this time, Time is the connection time. With Values:

Time targetIP
1 192.163.0.1
2 192.163.0.2
3 192.163.0.1
5 192.163.0.1
6 192.163.0.2
7 192.163.0.2
8 192.163.0.2

I want to create a new column under some condition: count of connections to this time's target IP address in the past 3 connections. So the result DataFrame should be:

Time targetIP count
1 192.163.0.1 0
2 192.163.0.2 0
3 192.163.0.1 1
5 192.163.0.1 2
6 192.163.0.2 1
7 192.163.0.2 1
8 192.163.0.2 2

For example, Time=8, the targetIP is 192.163.0.2, connected to system in the past 3 connections, which are Time=5 Time=6 and Time=7. Time=6 and Time=7's targetIP are also 192.163.0.2. So the count about Time=8 is 2.

I have an idea that add a new ID column to this DataFrame:

ID Time targetIP
1 1 192.163.0.1
2 2 192.163.0.2
3 3 192.163.0.1
4 5 192.163.0.1
5 6 192.163.0.2
6 7 192.163.0.2
7 8 192.163.0.2

and using Window function:

from pyspark.sql import Window
from pyspark.sql import functions as F

w = Window.partitionBy("targetIP").orderBy(F.col("ID").cast("int")).rangeBetween(-3,-1)
df1= df.withColumn("count", F.count("*").over(w)).orderBy("ID")

but if I use monotonically_increasing_id(), the ID is not sequential. So I want to get your help without using ID.

Thank you.

1 Answer 1

2

You can use row_number to assign an ID:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'id',
    F.row_number().over(Window.orderBy('Time'))
).withColumn(
    'count', 
    F.count('*').over(
        Window.partitionBy('targetIP').orderBy('id').rangeBetween(-3,-1)
    )
).orderBy('Time')

df2.show()
+----+-----------+---+-----+
|Time|   targetIP| id|count|
+----+-----------+---+-----+
|   1|192.163.0.1|  1|    0|
|   2|192.163.0.2|  2|    0|
|   3|192.163.0.1|  3|    1|
|   5|192.163.0.1|  4|    2|
|   6|192.163.0.2|  5|    1|
|   7|192.163.0.2|  6|    1|
|   8|192.163.0.2|  7|    2|
+----+-----------+---+-----+

Another way without using ID is to do a collect_list and filter the resulting array:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'count',
    F.expr("""
        size(filter(
            collect_list(targetIP) over (order by Time rows between 3 preceding and 1 preceding),
            x -> x = targetIP
        ))
    """)
)

df2.show()
+----+-----------+-----+
|Time|   targetIP|count|
+----+-----------+-----+
|   1|192.163.0.1|    0|
|   2|192.163.0.2|    0|
|   3|192.163.0.1|    1|
|   5|192.163.0.1|    2|
|   6|192.163.0.2|    1|
|   7|192.163.0.2|    1|
|   8|192.163.0.2|    2|
+----+-----------+-----+
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.