RxGo是 Go语言的Reactive扩展。
安装
go get -u github.com/jochasinga/rxgo用法
watcher := observer.Observer{ // Register a handler function for every next available item. NextHandler: func(item interface{}) { fmt.Printf("Processing: %v\n", item) }, // Register a handler for any emitted error. ErrHandler: func(err error) { fmt.Printf("Encountered error: %v\n", err) }, // Register a handler when a stream is completed. DoneHandler: func() { fmt.Println("Done!") },}it, _ := iterable.New([]interface{}{1, 2, 3, 4, errors.New("bang"), 5})source := observable.From(it)sub := source.Subscribe(watcher)// wait for the async operation<-sub以上将:
将切片中每个数字的格式字符串print为4。
print错误“bang”
重要的是要记住,只有一个OnError或OnDone可以在stream中调用。如果stream中有错误,处理停止,OnDone将永远不会被调用,反之亦然。
概念是将所有“sideeffects”分组到这些处理程序中,让一个Observer或任何EventHandler处理它们。
package mainimport ( "fmt" "time" "github.com/jochasinga/rx" "github.com/jochasinga/rx/handlers")func main() { score := 9 onNext := handlers.NextFunc(func(item interface{}) { if num, ok := item.(int); ok { score += num } }) onDone := handlers.DoneFunc(func() { score *= 2 }) watcher := observer.New(onNext, onDone) // Create an `Observable` from a single item and subscribe to the observer. sub := observable.Just(1).Subscribe(watcher) <-sub fmt.Println(score) // 20}
评论