How can I convert this row form into JSON while pushing into kafka topic
Clash Royale CLAN TAG#URR8PPP
How can I convert this row form into JSON while pushing into kafka topic
I am using a Spark application for processing textfiles that dropped at /home/user1/files/ folder in my system and which map the comma separated data that present in those text files into a particular JSON format. I have written following python code using spark for doing the same. But the output that comes in Kafka will look like as follows
Row(Name=Priyesh,Age=26,MailId=priyeshkaratha@gmail.com,Address=AddressTest,Phone=112)
Python Code :
import findspark
findspark.init('/home/user1/spark')
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.sql import Column, DataFrame, Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='server.kafka:9092')
def handler(message):
records = message.collect()
for record in records:
producer.send('spark.out', str(record))
print(record)
producer.flush()
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream('/home/user1/files/')
fields = lines.map(lambda l: l.split(","))
udr = fields.map(lambda p: Row(Name=p[0],Age=int(p[3].split('@')[0]),MailId=p[31],Address=p[29],Phone=p[46]))
udr.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
So how can I convert this row form into JSON while pushing into kafka topic?
@cricket_007 I am new to Spark and Kafka. So i was following different tutorials and finally reached upto this code.
– Priyesh Karatha
Aug 9 at 5:48
Okay, well, while this code works getting data to Kafka, it's not really using advantage of any Spark parallelism of multiple machines
– cricket_007
Aug 9 at 11:55
1 Answer
1
You can convert Spark Row objects to dict's, and then serialize those to JSON. For example, you could change this line:
producer.send('spark.out', str(record))
to this:
producer.send('spark.out', json.dumps(record.asDict())))
Alternatively.. in your example code since you aren't using DataFrames you could just create it as a dict to begin with instead of a Row.
However, ideally KafkaUtils should be used instead of the Producer instance
– cricket_007
Aug 9 at 0:27
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Spark has Kafka libraries... Why are you collecting the RDD and using regular Kafka producer?
– cricket_007
Aug 9 at 0:13