RxGo Go 语言的 Reactive 扩展开源项目

我要开发同款
匿名用户2017年02月04日
33阅读
所属分类Google Go、程序开发、常用工具包
授权协议MIT

作品详情

RxGo是 Go语言的Reactive扩展。

安装

go get -u github.com/jochasinga/rxgo

用法

watcher := observer.Observer{    // Register a handler function for every next available item.    NextHandler: func(item interface{}) {        fmt.Printf("Processing: %v\n", item)    },    // Register a handler for any emitted error.    ErrHandler: func(err error) {        fmt.Printf("Encountered error: %v\n", err)    },    // Register a handler when a stream is completed.    DoneHandler: func() {        fmt.Println("Done!")    },}it, _ := iterable.New([]interface{}{1, 2, 3, 4, errors.New("bang"), 5})source := observable.From(it)sub := source.Subscribe(watcher)// wait for the async operation<-sub

以上将:

将切片中每个数字的格式字符串print为4。

print错误“bang”

重要的是要记住,只有一个OnError或OnDone可以在stream中调用。如果stream中有错误,处理停止,OnDone将永远不会被调用,反之亦然。

概念是将所有“sideeffects”分组到这些处理程序中,让一个Observer或任何EventHandler处理它们。

package mainimport (    "fmt"    "time"    "github.com/jochasinga/rx"    "github.com/jochasinga/rx/handlers")func main() {    score := 9    onNext := handlers.NextFunc(func(item interface{}) {        if num, ok := item.(int); ok {            score += num        }    })    onDone := handlers.DoneFunc(func() {        score *= 2    })    watcher := observer.New(onNext, onDone)    // Create an `Observable` from a single item and subscribe to the observer.    sub := observable.Just(1).Subscribe(watcher)    <-sub    fmt.Println(score) // 20}
声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态

评论