Apache Kafka - Integrasi Dengan Spark

Dalam bab ini, kami akan membincangkan mengenai cara mengintegrasikan Apache Kafka dengan API Streaming Spark.

Mengenai Spark

Spark Streaming API membolehkan pemproses aliran aliran boleh terbaca, tinggi, dan toleransi aliran data langsung. Data boleh ditelan dari banyak sumber seperti Kafka, Flume, Twitter, dll, dan boleh diproses menggunakan algoritma kompleks seperti fungsi peringkat tinggi seperti peta, mengurangkan, bergabung dan tetingkap. Akhir sekali, data yang diproses boleh ditolak ke sistem fail, pangkalan data, dan papan dash langsung. Dataset Tersebar Berdiri (RDD) adalah struktur data asas Spark. Ia adalah koleksi objek yang diedarkan secara kekal. Setiap dataset dalam RDD dibahagikan kepada partition logik, yang boleh dikira pada nod yang berlainan kumpulan.

Integrasi dengan Spark

Kafka adalah platform pemesejan dan integrasi yang berpotensi untuk aliran Spark. Kafka bertindak sebagai hub pusat untuk aliran data masa sebenar dan diproses menggunakan algoritma kompleks dalam Spark Streaming. Setelah data diproses, Spark Streaming dapat menerbitkan hasil ke dalam satu lagi topik Kafka atau menyimpan di HDFS, pangkalan data atau papan pemuka. Rajah berikut menggambarkan aliran konseptual.

Integrasi dengan Spark

Sekarang, marilah kita melalui API Kafka-Spark secara terperinci.

API SparkConf

Ia mewakili konfigurasi untuk aplikasi Spark. Digunakan untuk menetapkan pelbagai parameter Spark sebagai pasangan kunci-nilai.

Kelas SparkConf mempunyai kaedah berikut -

  • tetapkan (kunci rentetan, nilai rentetan) - tetapkan pemboleh ubah konfigurasi.

  • keluarkan (kunci rentetan) - alih keluar kekunci dari konfigurasi.

  • setAppName (nama rentetan) - tetapkan nama aplikasi untuk aplikasi anda.

  • dapatkan (kunci rentetan) - dapatkan kunci

API StreamingContext

Ini adalah titik masuk utama untuk fungsi Spark. SparkContext mewakili sambungan ke kluster Spark, dan boleh digunakan untuk membuat RDD, akumulator dan pembolehubah penyiaran pada kelompok. Tandatangan ditakrifkan seperti ditunjukkan di bawah.

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • URL kluster induk untuk menyambung kepada (mis. mesos: // host: port, percikan: // host: port, setempat [4]).

  • appName - nama untuk tugas anda, untuk dipaparkan pada UI web kluster

  • batchDuration - selang masa di mana data aliran akan dibahagikan kepada kelompok

public StreamingContext(SparkConf conf, Duration batchDuration)

Buat StreamingContext dengan menyediakan konfigurasi yang diperlukan untuk SparkContext baru.

  • conf - Spark parameter

  • batchDuration - selang masa di mana data aliran akan dibahagikan kepada kelompok

KafkaUtils API

API KafkaUtils digunakan untuk menyambungkan kluster Kafka kepada Streaming Spark. API ini mempunyai signifi-cant kaedah membuat tandatanganStream ditakrifkan seperti di bawah.

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Kaedah yang ditunjukkan di atas digunakan untuk Buat aliran input yang menarik mesej dari Kafka Brokers.

  • ssc - objek StreamingContext.

  • zkQuorum - Kuorum Zookeeper .

  • groupId - Id kumpulan untuk pengguna ini.

  • topik - mengembalikan peta topik untuk dikonsumsi.

  • penyimpananLevel - Tahap penyimpanan untuk digunakan untuk menyimpan objek yang diterima.

KafkaUtils API mempunyai kaedah lain createDirectStream, yang digunakan untuk membuat aliran input yang secara langsung menarik mesej dari Kafka Brokers tanpa menggunakan mana-mana penerima. Aliran ini dapat menjamin bahawa setiap mesej dari Kafka dimasukkan dalam transformasi sekali saja.

Permohonan sampel dilakukan di Scala. Untuk menyusun aplikasi, sila muat turun dan pasang sbt , alat bantu skala (serupa dengan maven). Kod permohonan utama dipaparkan di bawah.

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Membina Skrip

Integrasi percikan-kafka bergantung pada percikan api, percikan api dan percikan serbuk integrasi Kafka. Buat fail build.sbt baru dan tentukan butiran aplikasi dan pergantungannya. Sbt akan memuatkan balang yang diperlukan semasa menyusun dan membungkus aplikasi.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Penyusunan / Pembungkusan

Jalankan arahan berikut untuk menyusun dan membungkus fail balang aplikasi. Kita perlu menghantar fail jar ke dalam konsol percikan untuk menjalankan aplikasi.

sbt package

Hantar ke Spark

Mula pengeluar CLI Kafka (dijelaskan dalam bab sebelumnya), buat topik baru yang dipanggil topik pertama saya dan berikan beberapa mesej contoh seperti yang ditunjukkan di bawah.

Another spark test message

Jalankan arahan berikut untuk menyerahkan aplikasi untuk memancarkan konsol.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

Output sampel aplikasi ini ditunjukkan di bawah.

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..