2

I want to write a Spark Streamin App which takes a stream with random integers and count them. Here is the Spark app I wrote:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "IntegerCount") # 2 threads, app name
ssc = StreamingContext(sc, 1) # sc, time interval for batch update.

nums = ssc.socketTextStream("localhost", 8000) # stream data from TCP; source, port

# create key,value pairs
tests = nums.map(lambda num: (int(num), 1))

# Count each integer in each batch
intCounts = tests.reduceByKey(lambda x, y: x + y)

# Print
intCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

And I am serving random numbers to port 8000 with that Server.py:

import socket
from random import randint

host = 'localhost'
port = 8000
address = (host, port)

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(address)
server_socket.listen(5)


print "Listening for client . . ."
conn, address = server_socket.accept()
print "Connected to client at ", address
#pick a large output buffer size because i dont necessarily know how big the incoming packet is
while True:
    output = str(randint(0, 10))
    conn.send(output)

When I run the Server.py and my Spark App, connection establishes succesfully. However I see an empty output which is that:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2017-07-16 22:36:11
-------------------------------------------

-------------------------------------------
Time: 2017-07-16 22:36:12
-------------------------------------------

I don't know what is the problem, please help me to make understand what is going on?

1
  • Nobody knows the problem??? Commented Jul 19, 2017 at 18:45

1 Answer 1

3

Solved, I sent strings with "\n" and it worked.

import socket
from random import randint

host = 'localhost'
port = 8000
address = (host, port)

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(address)
server_socket.listen(5)


print "Listening for client . . ."
conn, address = server_socket.accept()
print "Connected to client at ", address
#pick a large output buffer size because i dont necessarily know how big the incoming packet is
while True:
    output = str(randint(0, 10)) + "\n"  ### THAT IS THE FIX.
    conn.send(output)
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.