目的1.对业务场景抽象,熟练Spark编码2.增加自定义累加器,自定义UDF3.Spark优化方式
项目数据处理架构
模块介绍卡扣流量分析SparkCore
卡扣车流量转化率SparkCore
各区域车流量最高top5的道路统计SparkSQL
稽查布控,道路实时拥堵统计SparkStreaming
hive表monitor_flow_action表–date日期天–monitor_id卡口号–camera_id摄像头编号–car车牌–action_time某个摄像头拍摄时间s–speed通过卡扣的速度–road_id道路id–area_id区域ID
monitor_camera_info表某一个卡扣对应的摄像头编号–monitor_id:卡扣编号–camera_id:摄像头编号
数据加载hive中1).创建表,加载数据loaddataData2File
hive-fcreateHiveTab.sql
2).集群中提交spark作业,使用代码生成到hiveData2Hive
大数据开发流程数据调研(对底层的数据的表结构进行调研,分析和研究)需求分析(与PM讨论需求,画原型图axure)基于讨论出来的结果做出技术方案(某个难点用什么技术,数据库选型)具体实施功能点根据使用者(平台使用者)指定的某些条件,筛选出指定的一批卡扣信息(比如根据区域、时间筛选)
检测卡扣状态,对于筛选出来的所有的卡口(不代表一个摄像头)信息统计•卡口正常数•异常数•camera的正常数•camera的异常数•camera的详细信息(monitor_id:camera_id)
车流量最多的TonN卡扣号•获取每一个卡扣的详细信息(Top5)
随机抽取N个车辆信息,对这些数据可以进行多维度分析(因为随机抽取出来的N个车辆信息可以很权威的代表整个区域的车辆)
计算出经常高速通过的TopN卡口(查看哪些卡扣经常被高速通过,高速,中速,正常,低速根据三个速度段进行四次排序,高速通过的车辆数相同就比较中速通过的车辆数,以此来推)
跟车分析
需求分析按条件筛选卡扣信息•可以指定不同的条件,时间范围、区域范围、卡扣号等可以灵活的分析不同区域的卡扣信息
监测卡扣状态•对符合条件的卡扣信息,可以动态的检查每一个卡扣的状态,查看卡扣是否正常工作,也可以查看摄像头
车流量最多的TonN卡扣•查看哪些卡扣的车流量最高,为什么会出现这么高的车流量。分析原因,例如今天出城的车辆非常多,啥原因,今天进城的车辆非常多,啥原因?要造反?这个功能点里面也会拿到具体的车辆的信息,分析一下是京牌车造成的还是外地车牌?
在符合条件的卡扣信息中随机抽取N个车辆信息•随机抽取N辆车的信息,可以权威的代表整个区域的车辆,这时候可以分析这些车的轨迹,看一下在不同的时间点车辆的流动方向。以便于道路的规划。
计算出经常高速通过的TopN卡口•统计出是否存在飙车现象,或者经常进行超速行驶,可以在此处安装违章拍摄设备
跟车分析•计算出所有车是否被跟踪过,然后将结果存储在MySQL中,以便后期进行查询
项目分析monitor_flow_action监控数据表
monitor_camera_info卡扣与摄像头基本关系表
1.卡扣监控统计:正常的卡扣个数,异常的卡扣个数,正常的摄像头个数,异常的摄像头个数,异常的摄像头详细信息正常卡扣个数:monitor_camera_info基本关系表中卡扣与摄像头的关系与在monitor_flow_action监控数据表中,卡扣与摄像头的关系完全对应上0001:11111,22222000111111xxx000122222xxxRDD思路-正常的卡扣数为例:monitor_flow_action表->RDD<Monitor_id,Camera_id>->RDD<Monitor_id,[camera_ids]>-RDD<Monitor_id,camera_ids>monitor_camera_info表->RDD<Monitor_id,Camera_id>->RDD<Monitor_id,[camera_ids]>异常的卡扣个数:1.monitor_camera_info基本关系表中卡扣与摄像头的关系,在监控的数据表中一条都没有对应。2.monitor_camera_info基本关系表中卡扣与摄像头的关系,在监控的数据表中部分数据有对应。正常的摄像头个数:异常的摄像头个数:异常的摄像头详细信息:0001:11111,22222,33333~0004:76789,27449,87911,61106,45624,37726,09506~0001:70037,23828,34361,92206,76657,26608~0003:36687,99260,49613,97165~0006:82302,11645,73565,36440~0002:60478,07738,53139,75127,16494,48312~0008:34144,27504,83395,62222,49656,18640~0007:19179,72906,55656,60720,74161,85939,51743,40565,13972,79216,35128,27369,84616,09553~0000:67157,85327,08658,57407,64297,15568,31898,36621~0005:09761,12853,91031,33015,52841,15425,45548,36528注意:求个数:累加器实现(并行分布式)
异常的摄像头信息,用累加器实现,无非拼的是字符串
更新累加器与take使用时,take算子可以触发多个job执行,可以造成累加器重复计算。
./spark-submit--masterspark://node1:7077,node2:7077--jars../lib/fastjson-1.2.11.jar,../lib/mysql-connector-java-5.1.6.jar--classMonitorFlowAnalyze../lib/Test.jar1~0001:13846,54785,51995,64341,45994,32228,82054,87746~0003:38780,08844,03281,07183,50318,87000,16722,11604,26508,45523,46380~0007:61833,19140,38387~0005:63920,23464,37389,01219,96765,24844,32101,24141~~0004:60778,35444,35403,68811,73819,81893~0006:09621,67028,96375,60036,91237,53743,10305~0002:24694,01172,25945,79625,83215,72235,26855~0008:24630,40432,96808,78708,28294~0000:68070,12865,49505,26035,36931,38053,918682.通过车辆数最多的topN卡扣3.统计topN卡扣下经过的所有车辆详细信息4.车辆通过速度相对比较快的topN卡扣车速:120=<speed高速90<=speed<120中速60<=speed<90正常0<speed<60低速
5.卡扣“0001”下所有车辆轨迹1.过滤日期范围内卡扣“0001”下有哪些车辆?2.过滤日期范围内这些车辆经过卡扣的时间,按照时间升序排序6.车辆碰撞01,02中这一天同时出现的车辆01:(car,row)02:(car,row)(car,row).join(car,row)01:car02:carcar02.intersection(car02)7.随机抽取车辆在一天中要随机抽取100辆车,抽取的车辆可以权威代表当天交通运行情况。假如这天一共有10000辆车,要随机抽取100辆车:sample(true,0.1,seed)00~01100100/10000*100=101~02100102~03100104~05200205~06200206~07300308~09500509~10200210~11200211~12300312~13500513~147007。。。。。。8.卡扣流量转换率卡扣流量转换率,是指车辆运行连续卡口的转换率如:一车辆经过卡口的轨迹为:0001,0002,0004,0001,0002,0003,0001,0002。0001,0002:卡扣0001到卡扣0002的转换率为:经过卡扣0001,又经过卡扣0002的次数/经过卡扣0001的次数,针对上面的例子,经过卡口0001的次数为:3次,经过卡口0001,又经过卡口0002的次数为:3次,那么卡扣0001到卡扣0002的转换率为:3/3=100%,同理:0001,0002,0003代表卡扣0001,0002到卡扣0003的转换率。0001,0002,0003,0004代表卡扣0001,0002,0003到卡扣00004的转换率。MonitorOneStepConvertRateAnalyze.java
一辆车的轨迹:0001->0002->0003->0001->0002->0004->0005->00010001,0002----卡扣0001到卡扣0002的车流量转化率:通过卡扣0001又通过卡扣0002的次数/通过卡扣0001的次数2/30001,0002,0003----卡扣0001,0002到0003的车辆转换率:通过卡扣0001,0002,0003的次数/通过卡扣0001,00020001,0002,0003,0004-----卡扣0001,0002,0003到0004的车辆转换率:通过卡扣0001,0002,0003,0004的次数/通过卡扣0001,0002,00030001,0002,0003,0004,0005-----卡扣0001,0002,0003,0004到0005的车辆转换率:通过卡扣0001,0002,0003,0004,0005的次数/通过卡扣0001,0002,0003,0004的次数手动输入卡扣号:0001,0002,0003,0004,0005求:0001,00020001,0002,00030001,0002,0003,00040001,0002,0003,0004,0005
粤A11111:("0001",100)("0001,0002",30)("0001,0002,0003",10)粤B22222:("0001",200)("0001,0002",100)("0001,0002,0003",70)("0001,0002,0003,0004",10)
9.实时道路拥堵情况计算一段时间内卡扣下通过的车辆的平均速度。这段时间不能太短,也不能太长。就计算当前时间的前五分钟当前卡扣下通过所有车辆的平均速度。每隔5s计算一次当前卡扣过去5分钟所有车辆的平均速度。SparkStreaming窗口函数windowlenth:5minslideinterval:5s10.动态改变广播变量`transform``foreachRDD`11.统计每个区域中车辆最多的前3道路道路车辆:道路中的每个卡扣经过的车辆累加天河区元岗路10001=30,0002=50,0003=100,0004=20200天河区元岗路20005=50,0006=100150天河区元岗路3100越秀区xxx1200越秀区xxx2150越秀区xxx3100SparkSQLHive表--t1:monitor_idcarroad_idarea_id-----areaIdarea_nameroad_idmonitor_idcar------tmp_car_flow_basicsql:selectarea_name,road_id,count(car)ascar_count,UDAF(monitor_id)asmonitor_infosfromt1groupbyarea_name,road_id----tmp_area_road_flow_count开窗函数:row_number()over(partitionbyxxxorderbyxxx)rankselectarea_name,road_id,car_count,monitor_infos,row_number()over(partitionbyarea_nameorderbycar_countdesc)rankfromtmp_area_road_flow_count----tmpselectarea_name,road_id,car_count,monitor_infosfromtmpwhererank<=3-----------------------------------------------------------------------总sql:selectarea_name,road_id,car_count,monitor_infosfrom(selectarea_name,road_id,car_count,monitor_infos,row_number()over(partitionbyarea_idorderbycarCountdesc)rankfrom(selectarea_name,road_id,count(car)ascar_count,UDAF(monitor_id)asmonitor_infosfromt1groupbyarea_name,road_id)t2)t3whererank<=3===================================================================================================================sql:selectprefix_area_name_road_id,count(car)ascar_count,UDAF(monitor_id)asmonitor_infosfromt1groupbyprefix_area_name_road_id----tmp_area_road_flow_count
selectarea_name,road_id,car_count,monitor_infos,row_number()over(partitionbyarea_nameorderbycar_countdesc)rankfromtmp_area_road_flow_count----tmpselectarea_name,road_id,car_count,monitor_infosfromtmpwhererank<=3-----------------------------------------------------------------------总sql:selectarea_name,road_id,car_count,monitor_infosfrom(selectarea_name,road_id,car_count,monitor_infos,row_number()over(partitionbyarea_idorderbycarCountdesc)rankfrom(selectarea_name,road_id,count(car)ascar_count,UDAF(monitor_id)asmonitor_infosfromt1groupbyarea_name,road_id)t2)t3whererank<=3车辆轨迹统计卡扣0001下所有车辆的轨迹--take(20)
各区域车流量最高topN的道路统计1.会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到executor节点,不走shuffle过程,更加高效。sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold","20971520");//单位:字节2.在Hive中执行sql文件:hive–fsql.sql3.提交命令:--masterspark://node1:7077,node2:7077--jars../lib/mysql-connector-java-5.1.6.jar,../lib/fastjson-1.2.11.jar--driver-class-path../lib/mysql-connector-java-5.1.6.jar:../lib/fastjson-1.2.11.jar../lib/Test.jar4
3.缉查布控,道路实时拥堵统计动态改变广播变量的值:可以通过transform和foreachRDD
屏蔽过多黄色警告,忽略java类方法的参数与注释;File->Settings->Editor->Inspections->java->javadoc:参数不一致的屏蔽:DeclarationhasproblemsinJavadocrefere红色改成waring黄色参数没有注释:DanglingJavadoccomment去掉勾选DeclarationhasJavadocproblems去掉勾选
评论