1

I am trying to Spark structured streaming with Kafka and Python. Requirement: I need to process streaming data from Kafka (in JSON format) in Spark (perform transformations) and then store it in a database.

I have data in JSON format like, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

I am planning to use spark.readStream for reading from Kafka like,

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()

I referred this link for reference but didn't get how to parse JSON data. I tried this,

data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)]

but looks like it doesn't work.

Can anyone who has worked on Spark structured streaming with Python guide me to proceed with sample examples or links?

Using,

schema = StructType([
    StructField("a", DoubleType()),
    StructField("b", DoubleType()),
    StructField("name", StringType()),
    StructField("time", TimestampType())])

inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
data = inData.select(from_json(col("value").cast("string"), schema))
query = data.writeStream.outputMode("Append").format("console").start()

Program runs but I am getting values on console as,

+-----------------------------------+
|jsontostruct(CAST(value AS STRING))|
+-----------------------------------+
|               [null,null,null,2...|
|               [null,null,null,2...|
+-----------------------------------+

17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: {
  "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e",
  "runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b",
  "name" : null,
  "timestamp" : "2017-04-07T19:23:15.013Z",
  "numInputRows" : 2,
  "inputRowsPerSecond" : 125.0,
  "processedRowsPerSecond" : 12.269938650306749,
  "durationMs" : {
    "addBatch" : 112,
    "getBatch" : 8,
    "getOffset" : 2,
    "queryPlanning" : 4,
    "triggerExecution" : 163,
    "walCommit" : 26
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test]]",
    "startOffset" : {
      "test" : {
        "0" : 366
      }
    },
    "endOffset" : {
      "test" : {
        "0" : 368
      }
    },
    "numInputRows" : 2,
    "inputRowsPerSecond" : 125.0,
    "processedRowsPerSecond" : 12.269938650306749
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6aa91aa2"
  }
}

Did I miss something here.

1 Answer 1

1

You can either use from_json with schema:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType([
    StructField("a", DoubleType()),
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())])

data.select(from_json(col("value").cast("string"), schema))

or get individual fields as strings with get_json_object:

from pyspark.sql.functions import get_json_object

data.select([
    get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
    for c in ["a", "b", "name", "time"]])

and cast them later according to your needs.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.