StreamingPro Spark Streaming 框架开源项目

我要开发同款
匿名用户2018年04月29日
33阅读
开发技术JavaScript
所属分类程序开发、其他开发相关
授权协议未知

作品详情

概述

Spark是一个可扩展的可编程框架,用于数据集的大规模分布式处理,称为弹性分布式数据集(ResilientDistributedDatasets,RDD)。

SparkStreaming是SparkAPI核心的扩展,它支持来自各种来源的流处理。

StreamingPro 是一个可扩展、可编程的 SparkStreaming 框架(也包括Spark,Storm),可以轻松地用于构建流式应用。

StreamingPro支持以Spark、Flink等作为底层分布式计算引擎,通过一套统一的配置文件完成批处理、流式计算与Rest服务的开发。特点有:

使用Json描述文件完成流式,批处理的开发,不用写代码。

支持SQLServer,支持XSQL/MLSQL(重点),完成批处理,机器学习,即席查询等功能。

标准化输入输出,支持UDF函数注册,支持自定义模块开发

支持Web化管理Spark应用的启动,监控

如果更细节好处有:

跨版本:StreamingPro可以让你不用任何变更就可以轻易的运行在spark1.6/2.1/2.2上。

新语法:提供了新的DSl查询语法/Json配置语法

程序的管理工具:提供web界面启动/监控Spark程序

功能增强:2.1之后StructuredStreaming不支持kafka0.8/0.9,Structured,此外还有比如sparkstreaming支持offset保存等

简化SparkSQLServer搭建成本:提供rest接口/thrift接口,支持sparksqlserver的负载均衡,自动将driver注册到zookeeper上

探索更多的吧

项目模块说明模块名描述备注streamingpro-commons一些基础工具类 streamingpro-spark-commonSpark有多个版本,所以可以共享一些基础的东西 streamingpro-flinkstreamingpro对flink的支持 streamingpro-sparkstreamingpro对spark1.6.x的支持 streamingpro-spark-2.0streamingpro对spark2.x的支持 streamingpro-apistreamingpro把底层的sparkAPI暴露出来,方便用户灵活处理问题 streamingpro-manager通过该模块,可以很方便的通过web界面启动,管理,监控spark相关的应用 streamingpro-dls自定义connect,load,select,save,train,register等语法,便于用类似sql的方式做批处理任务,机器学习等 相关概念

如果你使用StreamingPro,那么所有的工作都是在编辑一个Json配置文件。通常一个处理流程,会包含三个概念:

多个输入

多个连续/并行的数据处理

多个输出

StreamingPro会通过'compositor'的概念来描述他们,你可以理解为一个处理单元。一个典型的输入compositor如下:

{        "name": "batch.sources",        "params": [          {            "path": "file:///tmp/hdfsfile/abc.txt",            "format": "json",            "outputTable": "test"          },           {              "path": "file:///tmp/parquet/",              "format": "parquet",              "outputTable": "test2"            }        ]}

 

batch.sources 就是一个compositor的名字。这个compositor把一个本地磁盘的文件映射成了一张表,并且告知系统,abc.txt里的内容是json格式的。这样,我们在后续的compositor模块就可以使用这个test表名了。通常,StreamingPro希望整个处理流程,也就是不同的compositor都采用表来进行衔接。

StreamingPro不仅仅能做批处理,还能做流式,流式支持SparkStreaming,StructuredStreaming。依然以输入compositor为例,假设我们使用的是StructuredStreaming,则可以如下配置。

{        "name": "ss.sources",        "params": [          {            "format": "kafka9",            "outputTable": "test",            "kafka.bootstrap.servers": "127.0.0.1:9092",            "topics": "test",            "path": "-"          },          {            "format": "com.databricks.spark.csv",            "outputTable": "sample",            "header": "true",            "path": "/Users/allwefantasy/streamingpro/sample.csv"          }        ]      }

 

第一个表示我们对接的数据源是kafka0.9,我们把Kafka的数据映射成表test。因为我们可能还需要一些元数据,比如ip和城市的映射关系,所以我们还可以配置一些其他的非流式的数据源,我们这里配置了一个smaple.csv文件,并且命名为表sample。

如果你使用的是kafka>=1.0,则topics参数需要换成'subscribe',并且使用时可能需要对内容做下转换,类似:

select CAST(key AS STRING) as k, CAST(value AS STRING) as v from test

 

启动时,你需要把-streaming.platform设置为 ss。

如果我们的输入输出都是Hive的话,可能就不需要batch.sources/batch.outputs等组件了,通常一个batch.sql就够了。比如:

"without-sources-job": {    "desc": "-",    "strategy": "spark",    "algorithm": [],    "ref": [],    "compositor": [      {        "name": "batch.sql",        "params": [          {            "sql": "select * from hiveTable",            "outputTableName": "puarquetTable"          }        ]      },      {        "name": "batch.outputs",        "params": [          {            "format": "parquet",            "inputTableName": "puarquetTable",            "path": "/tmp/wow",            "mode": "Overwrite"          }        ]      }    ],    "configParams": {    }  }

 

在批处理里,batch.sources/batch.outputs都是可有可无的,但是对于流式程序,stream.sources/stream.outputs/ss.sources/ss.outputs则是必须的。

StreamingPro的一些参数PropertyNameDefaultMeaningstreaming.name(none)required等价于spark.app.namestreaming.master(none)required等价于spark.masterstreaming.duration10secondssparkstreaming周期,默认单位为秒streaming.resttrue/false,defaultisfalse是否提供http接口streaming.spark.servicetrue/false,defaultisfalse开启该选项时,streaming.platform必须为spark.该选项会保证spark实例不会退出streaming.platformspark/spark_streaming/ss/flink,defaultisspark基于什么平台跑streaming.checkpoint(none)sparkstreamingcheckpoint目录streaming.kafka.offsetPath(none)kafka的偏移量保存目录。如果没有设置,会保存在内存中streaming.driver.port9003配置streaming.rest使用,streaming.rest为true,你可以设置一个http端口streaming.spark.hadoop.*(none)hadoopconfiguration,eg.-streaming.spark.hadoop.fs.defaultFShdfs://name:8020streaming.job.file.path(none)配置文件路径,默认从hdfs加载streaming.jobs(none)json配置文件里的job名称,按逗号分隔。如果没有配置该参数,默认运行所有jobstreaming.zk.servers(none)如果把spark作为一个server,那么streamingpro会把driver地址注册到zookeeper上streaming.zk.conf_root_dir(none)配置streaming.zk.servers使用streaming.enableHiveSupportfalse是否支持Hivestreaming.thriftfalse是否thriftserverstreaming.sql.source.[name].[参数](none)batch/ss/stream.sources中,你可以替换里面的任何一个参数streaming.sql.out.[name].[参数](none)batch/ss/stream.outputs中,你可以替换里面的任何一个参数streaming.sql.params.[param-name](none)batch/ss/stream.sql中,你是可以写表达式的,比如select*from:table,之后你可以通过命令行传递该table参数

后面三个参数值得进一步说明:

假设我们定义了两个数据源,firstSource,secondSource,描述如下:

{        "name": "batch.sources",        "params": [          {            "name":"firstSource",            "path": "file:///tmp/sample_article.txt",            "format": "com.databricks.spark.csv",            "outputTable": "article",            "header":true          },          {              "name":"secondSource",              "path": "file:///tmp/sample_article2.txt",              "format": "com.databricks.spark.csv",              "outputTable": "article2",              "header":true            }        ]      }

 

我们希望path不是固定的,而是启动时候决定的,这个时候,我们可以在启动脚本中使用-streaming.sql.source.[name].[参数]来完成这个需求。比如:

-streaming.sql.source.firstSource.path  file:///tmp/wow.txt

 

这个时候,streamingpro启动的时候会动态将path替换成你要的。包括outputTable等都是可以替换的。

有时候我们需要定时执行一个任务,而sql语句也是动态变化的,具体如下:

{        "name": "batch.sql",        "params": [          {            "sql": "select * from test where hp_time=:today",            "outputTableName": "finalOutputTable"          }        ]      },

 

这个时候我们在启动streamingpro的时候,通过参数:

-streaming.sql.params.today  "2017"

 

动态替换sql语句里的:today

声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态

评论