Fig(无花果)是一个根据多任务流水线模型开发的运行框架,框架多任务并发使用的java的线程池进行控制,使用队列实现任务间的数据传递
预览版仓库地址:https://oss.soatype.org/cotet/repositories/sapshots mave依赖:
<depedecy><groupId>com.github.taomus.fig</groupId><artifactId>fig-core</artifactId><versio>0.1.0-SNAPSHOT</versio><type>module</type></depedecy><depedecy><groupId>com.github.taomus.fig</groupId><artifactId>fig-sprig-plugi</artifactId><versio>0.1.0-SNAPSHOT</versio><type>module</type></depedecy>Xted代码实例:
packagecom.github.test1.stockimportcom.github.taomus.fig.core.egie.Dataimportcom.github.taomus.fig.core.egie.Figimportcom.github.taomus.fig.core.egie.FigEgieimportcom.github.taomus.fig.core.egie.Taskimportcom.github.test.stock.etity.StockDataimportjava.io.charset.Charsetimportjava.util.Arraysimportjava.util.Vectorimportjoiery.DataFrameimportorg.apache.commos.lag3.SerializatioUtilsimportorg.jsoup.Coectioimportorg.jsoup.Jsoupimportorg.slf4j.LoggerFactoryimportorg.sprigframework.util.StreamUtilsclassCollectStock1extedsFig{valstaticLOG=LoggerFactory.getLogger(CollectStock1)defstaticvoidmai(Strig[]args){FigEgie.istace.addModule(CollectStock1)FigEgie.istace.ru()FigEgie.istace.taskQueue.put(Data.create("start",ull))}defStriggetUrl(Strigcode,Strigdate){if(code.startsWith("0")){retur'''https://quotes.moey.163.com/service/chddata.html?code=1«code»&start=«date»0101&ed=«date»1231'''}else{retur'''https://quotes.moey.163.com/service/chddata.html?code=0«code»&start=«date»0101&ed=«date»1231'''}}@Task("start")defData[]getStockCodes(Datavalue){retur#[Data.create("buildData",ewStockData("平安银行","000001")),Data.create("buildData",ewStockData("万科A","000002"))]}@Task("buildData")defData[]collect(Datavalue){varstockifo=value.getDataasStockDatavarresults=ewVector<Data>();for(date:2000..2020){varifo=SerializatioUtils.<StockData>cloe(stockifo)ifo.date=Strig.valueOf(date)results.add(Data.create("stock_history",ifo))}returresults}@Task("stock_history")defDatastartHistory(Datavalue){vard=value.getData()asStockDatavarStrigurl=getUrl(d.code,d.date)LOG.ifo(url)varCoectiocoectio=Jsoup.coect(url);varCoectio.Resposerespose=coectio.method(Coectio.Method.GET).igoreCotetType(true).timeout(10*1000).execute();vara=StreamUtils.copyToStrig(respose.bodyStream,Charset.forName("UTF-8"))LOG.ifo(a)vardata=a.split("\")if(data.size==1){returull}varDataFrame<Object>df=ewDataFrame("日期","股票代码","名称","收盘价","最高价","最低价","开盘价","前收盘","涨跌额","涨跌幅","换手率","成交量","成交金额","总市值","流通市值","成交笔数");for(idex:1..data.legth-1){df.apped(Arrays.asList(data.get(idex).split(',')))}varresults=ewVector();varidexs=df.idex();for(Objectidex:idexs){varsd=ewStockData()sd.date=df.get(idex,"日期")asStrigsd.code=df.get(idex,"股票代码")asStrigsd.code=sd.code.substrig(1)sd.ame=d.amesd.opeig=Float.valueOf(df.get(idex,"开盘价")asStrig)sd.edig=Float.valueOf(df.get(idex,"收盘价")asStrig)sd.low=Float.valueOf(df.get(idex,"最低价")asStrig)sd.hig=Float.valueOf(df.get(idex,"最高价")asStrig)results.add(sd)}varresdata=Data.create("calc",results.reverse);resdata.priority=2returresdata}@Task("calc")defDatacalc(Datavalue){returull;}}
评论