个人介绍
我是程序员客栈的【asd】,一名【大数据工程师】; 我毕业于【北京理工大学】,担任过【平安保险】的【大数据工程师】,担任过【达宜家】的【大数据工程师】; 负责过【保险销售监控系统】,【离线分析系统】,【智能用户推荐系统】的开发; 熟练使用【java】,【hbase】,【kafka】,【redis】,【scala】;
如果我能帮上您的忙,请点击“立即预约”或“发布需求”!
工作经历
2019-10-05 -2023-10-07平安保险大数据开发
1 对数据的收集、处理及存储,完成数据指标的统计、多维分析和展现。 2 参与公司数据仓库架构设计、建模和 ETL 开发 3 参与大数据平台开发方案的制定对数据平台的建设、治理及优化
教育经历
2014-10-09 - 2018-10-06北京理工大学大数据科学本科
技能
实时数仓的分层方式一般遵守传统数据仓库模型,分为了 ODS 操作数据集、DWD 明细层和 DWS 汇总层以及应用层。但实时数仓模型的处理的方式却和传统数仓有所差别,如明细层和汇总层的数据一般会放在 Kafka 上,维度数据一般考虑到性能问题则会放在 HBase存储上,即席查询则可以使用presto 完成。 ODS 数据来源是官方网站,app,*公众号 我们将从以上方式获取到的数据,通过datax同步数据到STA层(数据登台区)然后用shell脚本的方式在脚本里写sql将数据做ETL存放在DM(基础数据层)并做了数仓建模以及创建维度的工作 接着通过要完成的指标将数据做聚合并形成宽表存入DW(汇总数据层),这一层中主要分为两层DWS(轻度汇总层)和DWT(累计汇总层)这两层是对数据做一个汇总,dws是统计出指标一天的度量值的和,dwt是统计出周期度量值的总和,比如3天5天7天30天,我们在dwt层中做一些同比、环比的指标以及一系列指标最终存放在APP层,最后可以将指标需求同步到展现层也就是用于仪表盘以及自助查询(个人不是负责这一块,不是特别了解)至此项目流程结束 # 日志数采集: 日志采集:方案一: 自定义springboot程序接收nginx反向代理过来的日志数据,一边发Kafka ods主题,一边落盘 优势:传输效率高 方案二: openresty (nginx+lua脚本)实现日志落盘,flume采集日志文件发Kafka ods主题,优势: 1.在Flume这里能实现不完整json过滤; 2.在Flume这里就可以实现日志数据的分流,具有一定灵活性 业务数据采集: 业务数据采集:方案一: canal 监控mysgl的binlog日志,获取增加、修改、删除的数据,封装成json发到kafka ods主题数据量:采集的数据比maxwell 数据重 同步历史数据: canal做不到 方案二:maxwell 监控mysql的binlog日志,获取增加、修改、删除的数据,封装成json发到kafka ods主题数据量:采集的数据更轻量化 同步历史数据: bootstrap功能,可以把开启binlog之前的数据也同步到oDs DWD and DIM 日志: 我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。 实时销售转化率=你的潜在顾客转化成成交顾客的数量/潜在顾客的数量 接下来的话我们的日志数据是通过SpringBoot程序直接落盘到kafka中的,业务数据是通过flinkCDC将数据从MySQL落地到kafka,分别存入kafka不同的主题,我们将这些原始数据定义为原始数据层,然后再通过flink将这些数据在数据明细层做处理。那在日志数据这一侧,我主要是做了两件事,一个是判断潜在顾客的数量以及潜在顾客转换为成交顾客的数量,(目的:准确的判断床位使用率),另一个是将数据按照类型进行分流。 判断判断实际占用的判断潜在顾客的数量以及潜在顾客转换为成交顾客的数量这一块的话,因为在原始的json串里它是有字段来记录这个用户是否完成下单,但是如果这个用户因为一些原因,完成了下单但是没有完成付款,就会被认定是交易完成,那就会造成不准确的数据,对我们后续的指标统计是会造成误差的,所以说这里我当时是用状态进行判断的(valuestate,选择它是因为我只要判断状态里有没有值就行了),我当时的思路是首先咱们肯定得根据用户的id进行分组,然后呢再使用map算子,new一个RichMapFunction(RichFuction相较于MapFuction,RichMapFunction还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息),接着就实现它里面的方法呗,有open方法对状态(时间)进行初始化,map方法进行真正判断,这里判断的话,就是获取数据中的字段,1是已经成交用户,0是潜在用户,如果字段为1的话,说明他已经完成了下单,然后再获取状态值,如果状态不为空的话,那说明他没有付款完成,那这种用户其实就是我们要去纠正的,那如果字段为1并且状态也是空的话,那就说明他确实没有付费,这时候咱们就要把他第一次的状态记录下来另一种情况就是,字段为0,那一定就是没下单,这种不会存在误差。那么这是我负责的判断成交用户,判断完之后,我要 通过测输出流将原始数据进行一个拆分 最开始获取数据后,有脏数据属于正常现象,所以在对数据进行json格式解析的时候,会把脏数据输入到测输出流。 业务: 业务数据的变化,我们可以通过Maxwell采集到,但是MaxWell是把全部数据统一写入一个Topic中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到Hbase,将事实数据写回Kafka作为业务数据的DWD层。 实现动态分流 我们是将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以使用的是mysql数据库存储,周期性的同步 事实数据保存到Kafka的主题中 维度数据保存到Hbase的表中 根据数据对象为单位进行分流 DWD的实时计算核心就是数据分流,其次是状态识别。在开发过程中我们实践了几个灵活度较强算子,比如RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个我们什么时候会用到呢?如何选择? Function 可转换结构 可过滤数据 侧输出 open方法 可以使用状态 输出至 MapFunction Yes No No No No 下游算子 FilterFunction No Yes No No No 下游算子 RichMapFunction Yes No No Yes Yes 下游算子 RichFilterFunction No Yes No Yes Yes 下游算子 ProcessFunction Yes Yes Yes Yes Yes 下游算子 SinkFunction Yes Yes No No No 外部 RichSinkFunction Yes Yes No Yes Yes 外部 从对比表中能明显看出,Rich系列能功能强大,ProcessFunction功能更强大,但是相对的越全面的算子使用起来也更加繁琐。 DWM 我们在之前通过分流等手段,把数据分拆成了独立的kafka topic。 因为实时计算与离线不同,实时计算的开发和运维成本都是非常高的,所以就必要象离线数仓一样,建一个大而全的中间层。 如果没有必要大而全,这时候就需要大体规划一下要实时计算出的指标需求了。把这些指标以主题宽表的形式输出就是我们的DWS层。 使用wait/notify方法实现线程间的通信 然后利用CountDownLatch api 去实现异步查询的功能 DWS ink-CDC监控mysql到ODS层,从 ODS层中获得, /* val v10 = dataStream.keyBy("bkmc") .timeWindow(Time.seconds(5)) .allowedLateness(Time.seconds(2)) .sideOutputLateData(new OutputTag[gp]("late")) .minBy("zxj") v10.getSideOutput(new OutputTag[gp]("late")).print("late")*/ 用户的支付意愿(ods_user_pay), 续保的难易程度(ods_user_ny), 保险员工的推荐方式(ods_yg_tuijian), 客户对保险的满意程度(ods_bx_pleased), 保险员工的对老客户的维护程度,(ods_bx_user) 销售评测表(10%)(ods_yg_test)(1~10), 用户支付情况(10%)(dwd_teaching_)(1~10) zh因子是基于公司调研部评估(10%)(1~10)与反馈明细(10%) 客户对保险代理人的满意程度(ods_bx_userder) 使用Flink流处理(ods_feedback)用户的续保率, 就是ods层客户是否续费/所维护的客户 * 用户量(用户表 员工表 保险订单表),和 员工考勤率就是全勤分(ods_all_attendance_rate)5分 (出勤表) 保险支付率就是客户支付的订单信息比客户浏览总, 跟DIM层的保险代理人信息表进行关联,形成dwd层宽表,包含了代理人满意度,代理人信息,客户信息,续保率等等,分别对代理人信息,和用户信息,使用flinksql 计算员工表kpi 90分以上比上所维护客户 * 10 (客户表 员工表 订单详情表),绩效考核表客户对保险员工的保险销售就是客户评分之和比上客户评分个数 * 5 (评价表) 四张表进行join每一个的评分等级为1~10级,由相应的的人员给出评分等级 客户的续费率就是从dwd层拉取出用户表 员工表 和ods的订单详情表 以客户id进行join 获取到每个用户所维护的客户以及订单的完成情况 求出客户是否续费与所维护客户的总数的比值在乘于转化值kpi就是从dwd层获取 客户表 员工表 和订单详情表 3张表 订单表与客户表join在与员工表join 成一张绩效考核表用表中分数在90分以上的人数比上保险员工所带学员在乘于转化值客户对员工的评价是从dwd层获取评价表 使用客户评分之和比上客户评分个数 乘以转化值出勤率 只有全勤才能拿5分转化分,支付率就是员工维护客户支付的订单与总维护的客户的比值乘以转化值trk 最终计算出每位员工的(trk)在95分以上(S) 90-95(A) 80-90(B) 80以下(C) ,将结果在DWM层汇总,存入ck中 ads 把Clickhouse中的数据根据可视化需要进行筛选聚合。 4 项目中遇到的一些技术难点 问题简介 公司线上一个Flink作业的State Size随时间逐渐增大,运行一段时间后出现报OutOfMemory异常。 作业代码是之前同事写的,所以,我了尽可能少分析少修改之前代码的原则,直接从问题现象本身挖掘尽可能多的有效信息。 1. 问题表象分析 从Flink web ui上观察作业的checkpoint历史信息,每隔一段时间抽取出来作业所有算子的checkpoint metrics信息: 可以看到,除了第一的state比较稳定外,其他operator算子的state size始终是单调递增的,没有任何收敛的趋势。 分析程序,第一个算子是addSource(source),数据源是Kafka消息队列,所以只记录offset之类的消费信息,这种state需要的空间复杂度为常数,所以保持474字节不变。 循序渐进 (1)由于这个Flink作业只用了一个TaskManager,所以,我们只需要观察这个TaskManager的JVM进程即可。从Flink ui上记录TaskManager所在的物理节点 (2)从yarn的All Applications ui上查看这个Flink作业的yarn作业ID,端口号默认是8088 (3)到TaskManager所在节点,用yarn作业ID获取TaskManager的进程号 (4)jmap -dump打印堆dump文件 dump文件可能比较大,建议用gzip,它是个功能很强大的压缩命令,特别是我们可以设置 -1 ~ -9 来指定它的压缩级别,数据越大压缩比率越大,耗时也就越长,推荐使用 -6~7。 问题定位 用jvisualvm分析dump。 查看堆内存对象占用空间情况,找到top3的实例对象,发现是程序中的某类实例类,如下,是LoadAddrOutPut和LoadEmpNameOutPut。 再查看这些实体类对象的成员变量值的eventtime,分析都是哪些事件时间的实体类对象没有被释放。双击LoadAddrOutPut即可进入该类实例对象的查看界面: 发现存在很多天之前的事件,而程序中的window窗口大小为1天。 至此,初步结论是:window窗口中本应过期的数据没有释放。那么,再从程序中查看有valuestate的StateTtlConfig,但是却没有设置清除策略! 问题解决 Flink的过期数据的清理。 1.默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理RocksDBStateBackend 利用压缩过滤器进行后台清理 2.全量快照时进行清理 另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。该策略可以通过 StateTtlConfig 配置进行配置:这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。注意: 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时 3. 增量数据清理 另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。每次触发增量清理时,从迭代器中选择已经过期的数进行清理。该策略有两个参数。第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理 4. 在 RocksDB 压缩时清理 如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig. newBuilder(...). cleanupInRocksdbCompactFilter (long queryTimeAfterNumEntries) 方法指定处理状态的条数。时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。 还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j. logger. org. rocksdb. FlinkCompactionFilter = DEBUG
乍眼一看,是不是觉得和离线数仓的架构图,相差无几?其实二者差别还是很多的: 与离线数仓相比,实时数仓的层次更少一些 从目前建设离线数仓的经验来看,数仓的数据明细层内容会非常丰富,处理明细数据外一般还会包含轻度汇总层的概念,另外离线数仓中应用层数据在数仓内部,但实时数仓中,app 应用层数据已经落入应用系统的存储介质中,可以把该层与数仓的表分离。 应用层少建设的好处:实时处理数据的时候,每建一个层次,数据必然会产生一定的延迟。 汇总层少建的好处:在汇总统计的时候,往往为了容忍一部分数据的延迟,可能会人为的制造一些延迟来保证数据的准确。举例,在统计跨天相关的订单事件中的数据时,可能会等到 00:00:05 或者 00:00:10 再统计,确保 00:00 前的数据已经全部接受到位了,再进行统计。所以,汇总层的层次太多的话,就会更大的加重人为造成的数据延迟。 与离线数仓相比,实时数仓的数据源存储不同。在建设离线数仓的时候,目前公司整个离线数仓都是建立在 Hive 表之上。但是,在建设实时数仓的时候,同一份表,会使用不同的方式进行存储。比如常见的情况下,明细数据或者汇总数据都会存在 Kafka 里面,但是像商家、直播等维度信息需要借助 Hbase,MySQL 或者其他 KV 存储等数据库来进行存储。 离线数仓建设的数据域也更丰富些,因为离线数仓的应用和分析场景比实时数仓丰富,所以对于基础数据建设的覆盖度要求比实时数仓要高。结合直直播电商的业务场景看,交易、营销、流量、内容这几个数据域的实时应用场景往往最多,因此建设优先级也往往是最高的。 技术架构图
项目名称:农作物大数据平台管理 项目介绍: 1、项目背景 现代农业种植是通过控制全国各地的基地状况,实现全球自动化管理,;农产品的信息管理则是通过采用物联网技术实现现代农业种植和农产品信息管理,并通过全球定位系统统计农作物所在基地概况进行信息通信,用以实现智能识别、监控、跟踪和管理的一套系统。 2、基本业务概述 3、开发模式 本项目采用前后端分离的开发模式,前端用到了基于Vue的Element-Admin UI框架,后端采用了.Net Core后台开发框架和EF Core+异步泛型仓储的开发模式 4、技术选型 前端项目技术: Vue-Element-Admin、Vue-Router、Axios、Echarts 后端项目技术: .Net Core 、EF Core 、Jwt权限验证、仓储模式、Autofac 项目技术亮点分析: 本项目采用前后端分离模式,分为多层降低了耦合度,提高了代码的可维护性,后台基于.Net Core设计Api接口框架,使用用Ef Core中CodeFirst模式实现与数据库的交互,并使用异步泛型仓储提高对数据库访问的维护,在完成接口编写后加入了JWT权限验证,对于JWT来说是一种标准化的数据传输规范,可以实现跨平台,并通过CorsFilter设置全局跨域配置实现浏览器请求访问,更加安全的调用接口,并使用Autofac作为依赖注入容器,降低系统耦合性,也加入了Nlog日志记录用户操作 (三)开发中遇到的技术难点及解决办法 1 、进行依赖注入时,手动注入过于繁琐,加入了Autofac动态注入 2、在进行跨域请求时,配置了全局跨域 3、进行Swagger调试时,请求数据首字母会自动变为小写更改之后加入了全局配置Json序列化处理 4、Vue 封装的Axios请求数据时,Post请求和Put请求参数传值错误利用data传值获取参数 5、后台获取前台数据时,用Request.Query获取参数值,并用FromFrom在Action方法传入参数之后添加frombody属性,参数将以表单形式提交 6、关于‘415‘错误前台请求接口时,请求的ContentType需为:application/json 难点一: 根据调整账户营销费用产生的数据进行treeTable显示 解决办法:调整费用产生Id跟父Id对比进行构造树形结构list,之后递归循环子节点 难点二:文件的上传重名 解决办法:使用uuid的方式解决重名问题:String name = fileItem.getName();//获取上传文件的名称 name=UUID.randomUUID().toString(); 难点三:页面刷新后恢复checkbox选中状态 解决办法:用checkbox对应的Id值作为value遍历选中的checkbox,存入value值,最后取出value值反选checkbox (四)围绕项目亮点“提炼面试问题”(技术点) 问题一:如果在一个B/S结构的系统中需要传递变量值请列举几种实现方式进行实现。 答案:URL提交方式、表单提交方式,以及状态保持方式(Cookie 、Session、Application) 问题二:前端js实现大文件G级的分块上传与分段下载 答案: 分块上传,用up6已经将文件进行了分块,并且对每个分块数据进行了标识,这些标识包括文件块的索引,大小,偏移,文件MD5,文件块MD5(需要开启)等信息,服务端在接收这些信息进行处理,比如将块数据保存到分布式存储系统中 分段下载 利用了多线程,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载后的数据组装成完整的数据文件 问题三:怎样能提高EF的效率? 答案:合理使用EF的加载方式,禁用延迟加载,减少数据库的访问次数,使用贪懒加载,显示加载、按需加载,使用批量删除和修改 问题四:Token的作用集原理? 答案: Token,就是令牌,最大的特点就是随机性,不可预测。一般黑客或软件无法猜测出来。那么,Token有什么作用?又是什么原理呢?Token一般用在两个地方: 1)防止表单重复提交、2)anti csrf攻击(跨站点请求伪造)。两者在原理上都是通过session token来实现的。当客户端请求页面时,服务器会生成一个随机数Token,并且将Token放置到session当中,然后将Token发给客户端(一般通过构造hidden表单)。下次客户端提交请求时,Token会随着表单一起提交到服务器端。 原理:用户在前端请求登录—>在后台验证通过后根基后台用户信息和过期时间生成token-->请求返回时将token带给前端并保存在前端—>以后前端请求时需要带上token-->后台接受token并验证,如果一致返回true否则false。 问题五:什么是WebService?UDDI? 答案:Web Service便是基于网络的、分布式的模块化组件,它执行特定的任务,遵 守具体的技术规范,这些规范使得Web Service能与其他兼容的组件进行互操作。 UDDI 的目的是为电子商务建立标准;UDDI是一套基于Web的、分布式的、为 Web Service提供的、信息注册中心的实现标准规范,同时也包含一组使企业能将 自身提供的Web Service注册,以使别的企业能够发现的访问协议的实现标准。