Spark on Pulsar

Source: https://github.com/tspannhw/FLiPS-SparkOnPulsar


It is very easy to query Apache Pulsar topics with Apache Spark.

We can run a quick local Apache Spark server.


sbin/start-m*****.sh --memory 2G spark://pulsar1:7077

sbin/start-worker.sh --memory 2G spark://pulsar1:7077


We can run Scala code or Pyspark really easy. We connect to the port and pass in the pulsar-spark-connector. I am utilizing Scala 2.12 with Spark 3.1.


bin/spark-shell --master spark://pulsar1:7077 --packages io.streamnative.connectors:pulsar-spark-connector_2.12:3.1.1.3


val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/chatresult2").load()

dfPulsar.printSchema()

val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)").as[(String, String)].writeStream.format("console").option("truncate", "false").start()

pQuery.explain()

pQuery.awaitTermination()

pQuery.stop()