2

I have a simple Spark job that reads large log files, filters them, and writes results to a new table. The simplified Scala driver app code is:

val sourceRdd = sc.textFile(sourcePath)

val parsedRdd = sourceRdd.flatMap(parseRow)

val filteredRdd = parsedRdd.filter(l => filterLogEntry(l, beginDateTime, endDateTime))

val dataFrame = sqlContext.createDataFrame(filteredRdd)

val writer = dataFrame.write

val properties = new Properties()
properties.setProperty("user", "my_user")
properties.setProperty("password", "my_password")
writer.jdbc("jdbc:postgresql://ip_address/database_name", "my_table", properties)

This works perfectly on smaller batches. On a large batch, after two hours of execution, I see about 8 million records in the target table and the Spark job has failed with the following error:

Caused by: java.sql.BatchUpdateException: Batch entry 524 INSERT INTO my_table <snip>  was aborted.  Call getNextException to see the cause.
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:210)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:277)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:276)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

If I copy paste the given SQL INSERT statement into an SQL console, it works fine. In the Postgresql server log I see:

(this is unmodified/unanonymized log)

2012016-04-26 22:38:09 GMT [3769-12] nginxlogs_admin@nginxlogs ERROR:  syntax error at or near "was" at character 544
2016-04-26 22:38:09 GMT [3769-13] nginxlogs_admin@nginxlogs STATEMENT:  INSERT INTO log_entries2 (client,host,req_t,request,seg,server,timestamp_value) VALUES ('68.67.161.5','"204.13.197.104"','0.000s','"GET /bid?apnx_id=&ip=67.221.130.195&idfa=&dmd5=&daid=&lt=32.90630&lg=-95.57920&u=branovate.com&ua=Mozilla%2F5.0+%28Linux%3B+Android+5.1%3B+XT1254+Build%2FSU3TL-39%3B+wv%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Version%2F4.0+Chrome%2F44.0.2403.90+Mobile+Safari%2F537.36+%5BFB_IAB%2FFB4A%3BFBAV%2F39.0.0.36.238%3B%5D&ap=&c=1&dmdl=&dmk= HTTP/1.1"','samba_info_has_geo','','2015-08-02T20:24:30.482000112') was aborted.  Call getNextException to see the cause.

It seems like Spark sent the text "was aborted. Call getNextException..." to Postgresql which triggered this specific error. That seems like a legitimate Spark bug. The second question is why did Spark abort this in the first place?

So, afaik, I can't call getNextException because I'm not using JDBC directly but going through Spark.

FYI, this is with Spark 1.6.1 and Scala 2.11.

6
  • The items inside your values clause are quoted in a strange way , like VALUES('"2016-04-27"', ...) this will cause errors when the server needs to interpret them as e.g. dates. Commented Apr 27, 2016 at 16:24
  • So, yes, there is an extraneous quoting issue on the "request" and "host" text fields and that should be fixed for cleanliness but it's not causing any errors. The one date field is currently being treated as a string and doesn't have the extraneous quoting issue. It probably would be better to have the date field as a proper postgresql date type rather than a string, but that's not causing the problem. Commented Apr 27, 2016 at 16:56
  • For stings it is allowed to contain quotes. For date and time and timestamps the quotes will make them unparseble. Commented Apr 27, 2016 at 16:58
  • My one timestamp field was a Postgres TEXT column. It also didn't have the extra set of quotes issue. Commented Apr 27, 2016 at 17:58
  • I actually cleaned up my code so the timestamp field is properly typed as a Postgresql TIMESTAMP column and the double quotes are removed from the text values. However, I don't think that has anything to do with this error. Commented Apr 27, 2016 at 18:01

1 Answer 1

2

If anyone else is searching and hits this, my database server (running in a VM) hit disk space limits, Spark seemed to get confused by this error, not log the real error, cause a different internal error, and log the results of that. Technically, this is probably an internal Spark bug responding to an uncommon database disk full error.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.