java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutabl 川长思鸟来 2022-03-17 06:52 989阅读 0赞 19/02/22 19:06:55 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 19/02/22 19:06:55 INFO VerifiableProperties: Property group.id is overridden to 19/02/22 19:06:55 INFO VerifiableProperties: Property zookeeper.connect is overridden to Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.refArrayOps(\[Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps; at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig.<init>(KafkaCluster.scala:382) at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:418) at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:46) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:123) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at kafka.Kafka\_Spark$.createStream(Kafka\_Spark.scala:65) at kafka.Kafka\_Spark$.main(Kafka\_Spark.scala:30) at kafka.Kafka\_Spark.main(Kafka\_Spark.scala) 19/02/22 19:06:55 INFO SparkContext: Invoking stop() from shutdown hook ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0pIQ19iaW5nZQ_size_16_color_FFFFFF_t_70][] package kafka import java.sql.Date import java.text.SimpleDateFormat import java.util.Properties import kafka.serializer.StringDecoder import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import com.alibaba.fastjson.JSON import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.KeyedMessage import org.apache.spark.{SparkConf, SparkContext} //1.打开zk,kafka。2.启动kafka-connect(source部分)3.运行此文件 object Kafka_Spark { def main(args: Array[String]) { // val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo") // val scc = new StreamingContext(sparkConf, Duration(5000))//new一个spark-streaming的上下文 val conf = new SparkConf().setAppName("Kafkaspark").setMaster("local[*]") val scc = new StreamingContext(conf, Duration(5000)) // scc.checkpoint(".") // 暂时用不到 val topics = Set("test") //我们需要消费的kafka数据的topic val kafkaParam = Map( "metadata.broker.list" -> "master:9092", // kafka的broker list地址 "auto.offset.reset" -> "smallest"//这个参数可以让streaming消费topic的时候从头开始消费 ) val stream: InputDStream[(String, String)] = createStream(scc, kafkaParam, topics)//建立流,读数据,传入上下文,kafka配置,主题名字 val wordCount = stream.map(l => (json_an(l._2), 1)).reduceByKey(_ + _) //对数据进行处理 wordCount.print()//输出到控制台看看结果 //发送数据(对外部服务器连接必须要用这种方式,不然会报错:任务无法序列化) wordCount.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => //配置说明 val producerProperties = new Properties() producerProperties.put("serializer.class", "kafka.serializer.StringEncoder") producerProperties.put("metadata.broker.list", "master:9092") producerProperties.put("request.required.acks", "1") val config: ProducerConfig = new ProducerConfig(producerProperties) //与kafka进行连接。此处用的是kafka自家的Producer,用spark的kafkaproducer也可以,但传送的方式不同 val producer = new Producer[String,String](config) partitionOfRecords.foreach(record => //发送数据,在这里key简单的用了相同的。实际情况应该用别的 producer.send(new KeyedMessage("cunchu","key",record.toString())) ) } } scc.start() // 真正启动程序 scc.awaitTermination() //阻塞等待 } /** * 创建一个从kafka获取数据的流. * * @param scc spark streaming上下文 * @param kafkaParam kafka相关配置 * @param topics 需要消费的topic集合 * @return */ def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = { KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](scc, kafkaParam, topics) } //处理时间 def formatData(line: String) = { val date = new SimpleDateFormat("yyyy/MM/dd H:mm") val d = new SimpleDateFormat("yyyy/MM/dd") val dateFormated = date.parse(line) val dateFormated3 = date.parse(line.split(" ")(0) + " 0:0") val dateFormated2 = date.format(dateFormated) val dateFormated4 = date.format(dateFormated3) val dateFf = date.parse(dateFormated2).getTime val dateFf2 = date.parse(dateFormated4).getTime val r = dateFf - dateFf2 val hash = r / 300000 val final_date = new Date(hash.toInt * 300000 + dateFf2) date.format(final_date) } //字符串处理。在这里是提取时间 def json_an(str: String) = { if (str.length < 10) { 1 } else { val json = JSON.parseObject(str) val main_v = json.get("payload") if (main_v.toString.split(",").length == 7) { formatData(main_v.toString.split(",")(2)) } else { "NAN" } } } } [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0pIQ19iaW5nZQ_size_16_color_FFFFFF_t_70]: /images/20220317/a39eca8d1de245f8be67a6e85412cc72.png
还没有评论,来说两句吧...