Fig(无花果)是一个根据多任务流水线模型开发的运行框架,框架多任务并发使用的java的线程池进行控制,使用队列实现任务间的数据传递
预览版仓库地址:https://oss.sonatype.org/content/repositories/snapshots maven依赖:
<dependency><groupId>com.github.taomus.fig</groupId><artifactId>fig-core</artifactId><version>0.1.0-SNAPSHOT</version><type>module</type></dependency><dependency><groupId>com.github.taomus.fig</groupId><artifactId>fig-spring-plugin</artifactId><version>0.1.0-SNAPSHOT</version><type>module</type></dependency>Xtend代码实例:
packagecom.github.test1.stockimportcom.github.taomus.fig.core.engine.Dataimportcom.github.taomus.fig.core.engine.Figimportcom.github.taomus.fig.core.engine.FigEngineimportcom.github.taomus.fig.core.engine.Taskimportcom.github.test.stock.entity.StockDataimportjava.nio.charset.Charsetimportjava.util.Arraysimportjava.util.Vectorimportjoinery.DataFrameimportorg.apache.commons.lang3.SerializationUtilsimportorg.jsoup.Connectionimportorg.jsoup.Jsoupimportorg.slf4j.LoggerFactoryimportorg.springframework.util.StreamUtilsclassCollectStock1extendsFig{valstaticLOG=LoggerFactory.getLogger(CollectStock1)defstaticvoidmain(String[]args){FigEngine.instance.addModule(CollectStock1)FigEngine.instance.run()FigEngine.instance.taskQueue.put(Data.create("start",null))}defStringgetUrl(Stringcode,Stringdate){if(code.startsWith("0")){return'''https://quotes.money.163.com/service/chddata.html?code=1«code»&start=«date»0101&end=«date»1231'''}else{return'''https://quotes.money.163.com/service/chddata.html?code=0«code»&start=«date»0101&end=«date»1231'''}}@Task("start")defData[]getStockCodes(Datavalue){return#[Data.create("buildData",newStockData("平安银行","000001")),Data.create("buildData",newStockData("万科A","000002"))]}@Task("buildData")defData[]collect(Datavalue){varstockinfo=value.getDataasStockDatavarresults=newVector<Data>();for(date:2000..2020){varinfo=SerializationUtils.<StockData>clone(stockinfo)info.date=String.valueOf(date)results.add(Data.create("stock_history",info))}returnresults}@Task("stock_history")defDatastartHistory(Datavalue){vard=value.getData()asStockDatavarStringurl=getUrl(d.code,d.date)LOG.info(url)varConnectionconnection=Jsoup.connect(url);varConnection.Responseresponse=connection.method(Connection.Method.GET).ignoreContentType(true).timeout(10*1000).execute();vara=StreamUtils.copyToString(response.bodyStream,Charset.forName("UTF-8"))LOG.info(a)vardata=a.split("\n")if(data.size==1){returnnull}varDataFrame<Object>df=newDataFrame("日期","股票代码","名称","收盘价","最高价","最低价","开盘价","前收盘","涨跌额","涨跌幅","换手率","成交量","成交金额","总市值","流通市值","成交笔数");for(index:1..data.length-1){df.append(Arrays.asList(data.get(index).split(',')))}varresults=newVector();varindexs=df.index();for(Objectindex:indexs){varsd=newStockData()sd.date=df.get(index,"日期")asStringsd.code=df.get(index,"股票代码")asStringsd.code=sd.code.substring(1)sd.name=d.namesd.opening=Float.valueOf(df.get(index,"开盘价")asString)sd.ending=Float.valueOf(df.get(index,"收盘价")asString)sd.low=Float.valueOf(df.get(index,"最低价")asString)sd.hig=Float.valueOf(df.get(index,"最高价")asString)results.add(sd)}varresdata=Data.create("calc",results.reverse);resdata.priority=2returnresdata}@Task("calc")defDatacalc(Datavalue){returnnull;}}
评论