2

We are using Pyspark on an EMR cluster in order to access and analyse data from a PostgreSQL database (accessible via the JDBC connector at jdbc:postgresql://psqlhost:5432/psqldatabase). Network is open between the EMR cluster and the PostgreSQL instance so that, when we access the EMR master via SSH, we can launch the following job successfully (both in client and in cluster deploy mode):

url = 'jdbc:postgresql://psqlhost:5432/psqldatabase'
table = 'mytable'

properties = {
    'user': 'user',
    'password': 'password',
    'driver': 'org.postgresql.Driver'
}

products_df = spark.read.jdbc(url=url, table=table, properties=properties)

We would like now to change setup and execute Pyspark commands on the EMR cluster from a separate client instance with full connectivity to the EMR cluster. This has been successfully set up by copying the necessary yarn configuration files to the client machine and executing Pyspark with --master yarn.

We are however still experiencing issues in accessing the PostgreSQL database from the client instance. For security reasons, the client machine cannot access the PostgreSQL instance, which however remains accessible to the EMR cluster. the above code snippet works without problem with --deploy-mode cluster. However, when working in --deploy-mode client, the last line produces the following error:

17/03/27 11:39:21 ERROR Driver: Connection error: 
org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
    at org.postgresql.Driver.makeConnection(Driver.java:431)
    at org.postgresql.Driver.connect(Driver.java:247)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:65)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:117)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:237)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:159)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.postgresql.core.PGStream.<init>(PGStream.java:62)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
    ... 22 more
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-f5f370f4d567> in <module>()
----> 1 products_df = spark.read.jdbc(url=url, table=table, properties=properties)

/opt/spark-2.0.0/python/pyspark/sql/readwriter.pyc in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    420             jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
    421             return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 422         return self._df(self._jreader.jdbc(url, table, jprop))
    423 
    424 

/opt/spark-2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/opt/spark-2.0.0/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/spark-2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o47.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
    at org.postgresql.Driver.makeConnection(Driver.java:431)
    at org.postgresql.Driver.connect(Driver.java:247)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:65)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:117)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:237)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:159)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.postgresql.core.PGStream.<init>(PGStream.java:62)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
    ... 22 more

The failure is likely due the fact that the application driver (which in --deploy-mode client lives on the client machine) tries to connect to the PostgreSQL instance, which however fails as the PostgreSQL instance is not accessible from the client. However, I do not understand why the connectivity driver-postgreSQL should be required: when actual actions will be executed on the dataframe, the actual operations will be executed by the machines on the cluster (and not by the driver on the client machine), and the results will be sent back to the driver via the cluster master. Hence, the missing connectivity driver-postgreSQL should not prevent from executing dataframe operations as long as the postgreSQL instance is reachable from the cluster.

Questions:

  • Is it correct that the driver must be able to connect to the database in order to execute operations?
  • Is there any good reason why this connectivity must be in place that I'm missing?
  • If that is not the case, is there any way we can bypass this connection check?
7
  • "the actual operations will be executed by the machines on the cluster " -- what about validating the SQL query, building the list of columns, deciding how many tasks / partitions will be provisioned (and on which executors)? Don't you think these are things that the driver has to do beforehand? Commented Mar 27, 2017 at 17:28
  • If you are really curious, then inspect the source code here >> github.com/apache/spark/blob/master/sql/core/src/main/scala/org/… >> the comments are explicit: "Both the driver code and the workers must be able to access the database; the driver needs to fetch the schema while the workers need to fetch the data." >> if you think you can do better, then submit your proposal to the Apache board... Commented Mar 27, 2017 at 17:30
  • You are right, that part of the source code explains it. Ofc I understand that the driver does some tasks (that "actual operations" sentence was badly worded, my bad), but I didn't know which ones require DB connection (e.g. query validation should not?). The scheduling of the executors afaik is not performed by the driver, but by the resource manager - driver sets the number of partitions. I do not understand your tone though: mine was an honest question about something I did not understand, posted on a Q&A website. I'm not sure why you posted in a comment instead of answering below. Commented Mar 28, 2017 at 6:16
  • Well, that's my usual tone in real life. Live with it :-/ Commented Mar 28, 2017 at 7:01
  • BTW the phrase about "if you can do better" was dead serious, I believe the Spark-JDBC interface has room for improvement. Commented Mar 28, 2017 at 10:06

1 Answer 1

2

As pointed out by Samson Scharfrichter, the driver needs to be able to access the database in order to fetch the schema.

Unfortunately our client does not have direct access to the database. However, as the EMR cluster can access the database and the client has SSH access to the cluster, we can use the following workaround based on SSH tunneling:

from sshtunnel import SSHTunnelForwarder

url = 'jdbc:postgresql://localhost:5432,psqlhost:5432/psqldatabase'
table = 'mytable'

properties = {
    'user': 'user',
    'password': 'password',
    'driver': 'org.postgresql.Driver'
}

with SSHTunnelForwarder(
    ('emrhost', 22),
    remote_bind_address=('psqlhost', 5432),
    local_bind_address=('localhost', 5432)
    ):
    products_df = spark.read.jdbc(url=url, table=table, properties=properties)

After this step, the dataframe products_df can be manipulated even if the tunnel is closed. With this workaround, the client machine can query the database even in client deploy mode.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.