1

I used this code for extracting bigrams from a text file:

import org.apache.spark.{SparkContext, SparkConf}
object DS_E6 {

  def main(args: Array[String]): Unit = {
    case class Bigram(first: String, second: String) {
      def mkReplacement(s: String) = s.replaceAll(first + " " + second, first + "-" + second)
    }

    def stringToBigrams(s: String) = {
      val words = s.split(" ")
      if (words.size >= 2) {
        words.sliding(2).map(a => Bigram(a(0), a(1)))
      } else
        Iterator[Bigram]()
    }

    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("bigram")
      .set("spark.executor.memory", "1g")

    val sc = new SparkContext(conf)
      val data = sc.textFile("data/file.txt")
      val bigrams = data.flatMap {
        stringToBigrams
      }.collect()

      val bigramCounts = bigrams.groupBy(identity).mapValues(_.size)
      val threshold = 100
      val topBigrams = bigramCounts.filter(_._2 >= threshold).map(_._1)
      topBigrams.foreach(println)
      val replaced = data.map(r => topBigrams.foldLeft(r)((r, b) => b.mkReplacement(r)))
      val replaced1 = replaced.zipWithIndex()
        .map { case (line, i) => i.toString + "," + line}

      replaced1.coalesce(1).saveAsTextFile("data/output.txt")
    }
  }
}

my input file is 45 MB , when i run this code it shows me below error: (i think it is related to Collect())

 java.lang.OutOfMemoryError: GC overhead limit exceeded

at org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
at org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
at org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)

how can i solve this problem?

9
  • When running your app try increasing JVM heap size, e.g. -Xmx1G Commented Jul 7, 2015 at 18:09
  • How can i set JVM heap size to -Xmx1G ? Commented Jul 7, 2015 at 18:25
  • How do you run the code you provided? Commented Jul 7, 2015 at 18:26
  • BTW, if the code you provided is all you need to do you don't need to use Spark at all. A simple Python script or Scala application will be faster and will use less memory. Commented Jul 7, 2015 at 18:28
  • maybe my input file be more than 2GB , then Python and Scala scripts are more faster? Commented Jul 7, 2015 at 18:31

2 Answers 2

2

It's possible that you are not actually getting the 1g of memory that you are requesting via SparkConf. The reason is that when master = local, Spark driver and executor will be entirely running inside the JVM that is running your code shown here that creates the SparkContext. By that time, it's too late to obtain more Java heap than was allocated when the JVM started. You need to add that -Xmx1G arg to the command IntelliJ uses to launch the JVM that runs your code.

You don't say exactly how you're running your code in IntelliJ. But what you'll need to do is to create or modify a "Run Configuration".

In the IntelliJ UI, under the "Run" toolbar menu, select the command "Edit Configurations...". That will bring up a window such as is shown below. This shows my Run Configuration for running in the "Scala Console". The box for "VM options:" is where you need to include the -Xmx1G jvm arg. In my case, I am running with 2.5G of memory.

IntelliJ might already have created a Run Configuration for you when you previously ran your app. If not, use this window to create a new one of the proper type, e.g. "Scala Console".

To check memory in a running Scala app, use the following commands to check your actual current, max, and free jvm memory to see if you actually got the memory you requested.

  • sys.runtime.totalMemory()
  • sys.runtime.maxMemory()
  • sys.runtime.freeMemory()

enter image description here

Sign up to request clarification or add additional context in comments.

Comments

0

When you submit your application try to increase the spark.driver.memory (is 512mb by default) by setting the parameter :

--driver-memory 4g

Check the spark documentation for more information about this kind of parameters : https://spark.apache.org/docs/latest/configuration.html

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.