资源预览内容
第1页 / 共4页
第2页 / 共4页
第3页 / 共4页
第4页 / 共4页
亲,该文档总共4页全部预览完了,如果喜欢就下载吧!
资源描述
Spark 框架集成应用之 Helloworld前言:经过几天的学习和探索,终于把 Spark 框架的几个模块集成起来了,从 SparkStream 读取 kafka 消息流,并解析消息构建图,同时结合 Pregel shortpath 算法计算最短路径,最 后将计算结果保存至数据库。虽然是 helloworld 级别的程序,但麻雀 虽小,五脏俱全,当然,千里之行始于足下!今天把这几天来的学习心得整理一下,分享给所有 spark 的爱好者,当然也许有很多的 不足,就算是抛砖引玉吧!一、 概述该实验是基于一个简化版,微缩版的智能交通场景来实现的,通过构建一张道路路况图,结合实时的路况信息,找出 A 路口到 B 路口所有交通路口之间的最短路径。首先假设各个交通路口为图的顶点,各个道路的路长和拥堵状况为图的边的属性。假设静态的图顶点数据存在磁盘文件中,动态的路况通过接收 kafka 实时消息流的方式实现。最后,将计算的结果存入数据库。总体的框架图如下:上图中,该实验暂时只实现接收消息并计算最短路径和持久化到数据库的功能。 (其他外围的可考虑在下个实验中实现)二、 运行环境及准备工作1、 安装 Scala2、 安装 Spark具体安装过程可以参考 spark 官方网站或网上资料,但有一点需要注意的是,该实验中 Graphx 只支持 spark-1.0.2 版本,暂不支持 spark-1.1.0 版本。 3、 安装 Kafka参考官方网站4、 安装 Mysql参考网络资料三、 编码与代码说明由于这只是一个小实验,因此采用一刀切的方法,从消息读取,解析,存储等都在一个 scala object 中实现。1、 读取顶点数据2、 读取边数据3、 解析 Json 边数据4、 构建图val sparkHome = System.getenv(SPARK_HOME)val vertexs : RDD(VertexId,(String) = sc.textFile(sparkHome + /test/vdata.txt).map(line = line.split(s+).map( v = (v(0).toLong : VertexId,( v(1).toString : String)val Array(zkQuorum, group, topics, numThreads) = argsval topicpMap = topics.split(,).map(_,numThreads.toInt).toMapval lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap,StorageLevel.MEMORY_ONLY).map(_._2)val eo = lines.flatMap(s = s.split(,).map(n = n.split(:).map(e = e(1).toString.split(,).map(t = t.split(:).map(l = EdgeObj(e(0).toString.replace(, ),l(0).toString,l(1).toString.replace(, ).toDouble)eo.foreachRDD rd = val edges :RDDEdgeDouble = sc.parallelize(rd.flatMap(ea = ea.map(e = Edge(e.node1.toLong,e.node2.toLong,e.length).toArray)这里 edges 必须为ParallelCollectionRDD,因此必须parallerlize 化val graph = Graph(vertexs, edges, defaultVertex)graph.persist(StorageLevels.MEMORY_AND_DISK)因为下面还需要用到 graph,因此建议将它persist 起来5、 计算最短路径6、 保存数据库vertexs.collect.map case(k,v) = k.foreachid = val sourceId = id.toLongval initialGraph = graph.mapVertices(id, _) = if (id.toLong = sourceId) 0.0 else Double.PositiveInfinity)val sssp = initialGraph.pregel(Double.PositiveInfinity)(id, dist, newDist) = math.min(dist, newDist), / Vertex Programtriplet = / Send Messageif (triplet.srcAttr + triplet.attr math.min(a,b) / Merge Message)println(shortpath:+sssp.vertices.collect.mkString(n)val conn = getConnection()val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)sssp.vertices.collect.foreach t =if(t._2.toDouble = Double.PositiveInfinity )insert(sourceId,t._1 ,999999D ,conn)elseinsert(sourceId,t._1 ,t._2 ,conn)def insert(start : Long, target : Long , length : Double , conn : Connection) : Unit = val prep = conn.prepareStatement(INSERT INTO path (start, target,length) VALUES (?, ?, ?) )prep.setLong(1, start)prep.setLong(2, target)prep.setDouble(3, length)prep.executeUpdategetConnect 根据不同的数据库创建连接,可参考 scala 数据库的连接方式Mysql 的数据库连接:四、 开发中遇到的问题与解决方法曾经在几个地方栽了跟头,折腾了很久,现将其罗列出来,希望大家可以少走一些弯路。1、 RDD 的 foreach 和 foreachRDD 操作不会改变外部变量的值如以下代码中 inner foreach 中 target 有值,但 foreach 外 target 的值则为空。解决方法:在 map 之前加上 collect(虽 collect 需慎用,因为很耗时,但有时也不得不用)另外参考官方的说明:if your application does not have any output operation, or has output operations like dstream.foreachRDD() without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.2、 Build graph 时,edges,vertexs RDD 必须为 ParallelCollectionRDD,否则如 MappedRDD 虽编译时不会报错,但运行时会出错。def getConnection() : Connection = val dbc = jdbc:mysql:/localhost:3306/test?user=root&password=rootclassOfcom.mysql.jdbc.Driverval conn = DriverManager.getConnection(dbc)connval sparkHome = System.getenv(SPARK_HOME)val vertexs : RDD(VertexId,(String) = sc.textFile(sparkHome + /test/vdata.txt).map(line = line.split(s+).map( v = (v(0).toLong : VertexId,( v(1).toString : String)val target = new mutable.HashMapString,Long()vertexs.foreachv = target += (v._2.toString - v._1.toLong )target.map(t = =inner foreach= + t._1 +: + t._2 ).foreach(println)target.map(t = =outof foreach= + t._1 +: + t._2 ).foreach(println)vertexs.collect.foreachv =
收藏 下载该资源
网站客服QQ:2055934822
金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号