49

100 million customers click 100 billion times on the pages of a few web sites (let's say 100 websites). And the click stream is available to you in a large dataset.

Using the abstractions of Apache Spark, what is the most efficient way to count distinct visitors per website?

1
  • 3
    I guess the answer will be different if you want the exact count or an approximate. Commented Jun 19, 2014 at 16:55

7 Answers 7

45

visitors.distinct().count() would be the obvious ways, with the first way in distinct you can specify the level of parallelism and also see improvement in the speed. If it is possible to set up visitors as a stream and use D-streams, that would do the count in realtime. You can stream directly from a directory and use the same methods as on the RDD like:

val file = ssc.textFileStream("...") file.distinct().count()

Last option is to use def countApproxDistinct(relativeSD: Double = 0.05): Long however this is labelled as experimental, but would be significantly faster than count if relativeSD (std deviation) is higher.

EDIT: Since you want the count per website you can just reduce on the website id, this can be done efficiently (with combiners ) since count is aggregate. If you have an RDD of website name user id tuples you can do. visitors.countDistinctByKey() or visitors.countApproxDistinctByKey(), once again the approx one is experimental. To use approx distinct by key you need a PairRDD

Interesting side note if you are ok with approximations and want fast results you might want to look into blinkDB made by the same people as spark amp labs.

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for the guidelines, but doesn't that count the unique visitors globally (It will return 100M)? What should be changed to return unique visitors per website? Also with this approach you can expect a very large intermediary dataset (100M * parallelism) and I wonder if there is more memory efficient approach.
@AntoineCHAMBILLE oh I didn't realize that you wanted the count per website will re write
Great! But what kind of RDD does expose this "countDistinctByKey" method? Can you please share a link to the API doc?
@AntoineCHAMBILLE PairRDDFunctions
@aaronman Well, I believe Antoine is right. I also see only countApproxDistinctByKey() there, not countDistinctByKey()
|
11

I've had to do similar things, one efficiency thing you can do (that isn't really spark) is map your vistor IDs to lists of bytes rather than GUID Strings, you can save 4x space then (as 2 Chars is hex encoding of a single byte, and a Char uses 2 bytes in a String).

// Inventing these custom types purely for this question - don't do this in real life!
type VistorID = List[Byte]
type WebsiteID = Int

val visitors: RDD[(WebsiteID, VisitorID)] = ???

visitors.distinct().mapValues(_ => 1).reduceByKey(_ + _)

Note you could also do:

visitors.distinct().map(_._1).countByValue()

but this doesn't scale as well.

Comments

10

I noticed the basic distinct function can be significantly faster when you run it on a RDD than running it on a DataFrame collection. For example:

DataFrame df = sqlContext.load(...)
df.distinct.count // 0.8 s
df.rdd.distinct.count // 0.2 s

2 Comments

I also observed the same and this is probably coz df.count is implemented as an aggregate operation - def count(): Long = withCallback("count", groupBy().count()) { df => df.collect(needCallback = false).head.getLong(0) }
/!\ need to be edited, now (Spark 2.4.4) Spark SQL seams to be faster on this same experiment (up to an order of magnitude on certain inputs)
9

If data is an RDD of (site,visitor) pairs, then data.countApproxDistinctByKey(0.05) will give you an RDD of (site,count). The parameter can be reduced to get more accuracy at the cost of more processing.

Comments

7

Spark 2.0 added ApproxCountDistinct into dataframe and SQL APIs:

https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#approxCountDistinct(org.apache.spark.sql.Column)

Comments

4

If you want it per webpage, then visitors.distinct()... is inefficient. If there are a lot of visitors and a lot of webpages, then you're distincting over a huge number of (webpage, visitor) combinations, which can overwhelm the memory.

Here is a another way:

visitors.groupByKey().map { 
  case (webpage, visitor_iterable)
  => (webpage, visitor_iterable.toArray.distinct.length)
}

This requires that the visitors to a single webpage fits in memory, so may not be best in all cases.

Comments

0
Df.select(approx_count_distinct("col_name",0.1))

0.1 is the parameter which is saying maximum estimated error allowed. You can see much great performance with large data set.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.