Riko是一款Python流处理引擎,类似YahooPipes。采用纯python开发,用于分析处理结构化数据流。拥有同步和异步APIs,同时也支持并行RSSfeeds。Riko也支持字符终端界面。
功能特性:
可读取csv/xml/json/html文件。
通过模块化的管道可创建文本流和数据流。
可解析、处理、提取RSS/Atomfeeds。
可创建强大的混合型APIs和maps。
支持并行处理。
使用示例代码:
>>> ### Create a SyncPipe flow ###>>> #>>> # `SyncPipe` is a convenience class that creates chainable flows>>> # and allows for parallel processing.>>> from riko.collections.sync import SyncPipe>>>>>> ### Set the pipe configurations ###>>> #>>> # Notes:>>> # 1. the `detag` option will strip all html tags from the result>>> # 2. fetch the text contained inside the 'body' tag of the hackernews>>> # homepage>>> # 3. replace newlines with spaces and assign the result to 'content'>>> # 4. tokenize the resulting text using whitespace as the delimeter>>> # 5. count the number of times each token appears>>> # 6. obtain the raw stream>>> # 7. extract the first word and its count>>> # 8. extract the second word and its count>>> # 9. extract the third word and its count>>> url = 'https://news.ycombinator.com/'>>> fetch_conf = {... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1>>>>>> replace_conf = {... 'rule': [... {'find': '\r\n', 'replace': ' '},... {'find': '\n', 'replace': ' '}]}>>>>>> flow = (... SyncPipe('fetchpage', conf=fetch_conf) # 2... .strreplace(conf=replace_conf, assign='content') # 3... .stringtokenizer(conf={'delimiter': ' '}, emit=True) # 4... .count(conf={'count_key': 'content'})) # 5>>>>>> stream = flow.output # 6>>> next(stream) # 7{"'sad": 1}>>> next(stream) # 8{'(': 28}>>> next(stream) # 9{'(1999)': 1}
评论