8BTCCI: 12395.47 +0.12% 8BTCVI: 6683.08 +3.99% 24H成交额: ¥4559.95亿 -16.28% 总市值: ¥17040.74亿 +0.56%
golang-event 在以太坊中的使用

golang-event 在以太坊中的使用

姜家志 发布在 技术指南 33799

go-ethereum中go-event库的使用

github.com/ethereum/go-ethereum/event 包实现了一个事件发布订阅的库,使用接口主要是 event.Feed 类型,以前还有 event.TypeMux 类型,看代码注释,说过时了,目前主要使用 Feed 类型。

package main
 import (
 "fmt"
 "sync"
 "github.com/ethereum/go-ethereum/event"
 )
 func main() {
 type someEvent struct{ I int }
 var feed event.Feed
 var wg sync.WaitGroup
 ch := make(chan someEvent)
 sub := feed.Subscribe(ch)
 wg.Add(1)
 go func() {
 defer wg.Done()
 for event := range ch {
 fmt.Printf("Received: %#v\n", event.I)
 }
 sub.Unsubscribe()
 fmt.Println("done")
 }()
 feed.Send(someEvent{5})
 feed.Send(someEvent{10})
 feed.Send(someEvent{7})
 feed.Send(someEvent{14})
 close(ch)
 wg.Wait()
 }
通过调用 event.Feed 类型的Subscrible方法订阅事件通知,需要使用者提前指定接收事件的 channel,Subscribe 返回 Subscription 对象,是一个接口类型:
type Subscription interface {
 Err() // returns the error channel
 Unsubscribe()
 // cancels sending of events, closing the error channel
 }
Err() 返回获取error 的channel,调用Unsubscribe()取消事件订阅。事件的发布者调用 Send() 方法,发送事件。

可以使用同一个channel实例,多次调用Feed 的Subscrible()方法:

package main
 import (
 "fmt"
 "sync"
 "github.com/ethereum/go-ethereum/event"
 )
 func main() {
 var (
 feed event.Feed
 recv sync.WaitGroup
 sender sync.WaitGroup
 )
 ch := make(chan int)
 feed.Subscribe(ch)
 feed.Subscribe(ch)
 feed.Subscribe(ch)
 expectSends := func(value, n int) {
 defer sender.Done()
 if nsent := feed.Send(value); nsent != n {
 fmt.Printf("send delivered %d times, want %d\n", nsent, n)
 }
 }
 expectRecv := func(wantValue, n int) {
 defer recv.Done()
 for v := range ch {
 if v != wantValue {
 fmt.Printf("received %d, want %d\n", v, wantValue)
 } else {
 fmt.Printf("recv v = %d\n", v)
 }
 }
 }
 sender.Add(3)
 for i := 0; i < 3; i++ {
 go expectSends(1, 3)
 }
 go func() {
 sender.Wait()
 close(ch)
 }()
 recv.Add(1)
 go expectRecv(1, 3)
 recv.Wait()
 }
这个例子中, 有三个订阅者, 有三个发送者, 每个发送者发送三次1, 同一个channel ch 里面被推送了9个1。

ethereum event 库还提供了一些高级别的方便接口, 比如event.NewSubscription函数,接收一个函数类型,作为数据的生产者, producer本身在后台一个单独的goroutine内执行, 后台goroutine往用户的channel 发送数据:

package main
 import (
 "fmt"
 "github.com/ethereum/go-ethereum/event"
 )
 func main() {
 ch := make(chan int)
 sub := event.NewSubscription(
 func(quit for i := 0; i < 10; i++ {
 select {
 case ch case <-quit:
 fmt.Println("unsubscribed")
 return nil
 }
 }
 return nil
 })
 for i := range ch {
 fmt.Println(i)
 if i == 4 {
 sub.Unsubscribe()
 break
 }
 }
 }
库也提供了 event.SubscriptionScope 类型用于追踪多个订阅者,提供集中的取消订阅功能:
package main
 import (
 "fmt"
 "sync"
 "github.com/ethereum/go-ethereum/event"
 )
 // This example demonstrates how SubscriptionScope can be used to control the lifetime of
 // subscriptions.
 // Our example program consists of two servers, each of which performs a calculation when
 // requested. The servers also allow subscribing to results of all computations.
 type divServer struct{ results event.Feed }
 type mulServer struct{ results event.Feed }
 func (s *divServer) do(a, b int) int {
 r := a / b
 s.results.Send(r)
 return r
 }
 func (s *mulServer) do(a, b int) int {
 r := a * b
 s.results.Send(r)
 return r
 }
 // The servers are contained in an App. The app controls the servers and exposes them
 // through its API.
 type App struct {
 divServer
 mulServer
 scope event.SubscriptionScope
 }
 func (s *App) Calc(op byte, a, b int) int {
 switch op {
 case '/':
 return s.divServer.do(a, b)
 case '*':
 return s.mulServer.do(a, b)
 default:
 panic("invalid op")
 }
 }
 // The app's SubscribeResults method starts sending calculation results to the given
 // channel. Subscriptions created through this method are tied to the lifetime of the App
 // because they are registered in the scope.
 func (s *App) SubscribeResults(op byte, ch chan switch op {
 case '/':
 return s.scope.Track(s.divServer.results.Subscribe(ch))
 case '*':
 return s.scope.Track(s.mulServer.results.Subscribe(ch))
 default:
 panic("invalid op")
 }
 }
 // Stop stops the App, closing all subscriptions created through SubscribeResults.
 func (s *App) Stop() {
 s.scope.Close()
 }
 func main() {
 var (
 app App
 wg sync.WaitGroup
 divs = make(chan int)
 muls = make(chan int)
 )
 divsub := app.SubscribeResults('/', divs)
 mulsub := app.SubscribeResults('*', muls)
 wg.Add(1)
 go func() {
 defer wg.Done()
 defer fmt.Println("subscriber exited")
 for {
 select {
 case result := <-divs:
 fmt.Println("division happened:", result)
 case result := <-muls:
 fmt.Println("multiplication happened:", result)
 case divErr := <-divsub.Err():
 fmt.Println("divsub.Err() :", divErr)
 return
 case mulErr := <-mulsub.Err():
 fmt.Println("mulsub.Err() :", mulErr)
 return
 }
 }
 }()
 app.Calc('/', 22, 11)
 app.Calc('*', 3, 4)
 app.Stop()
 wg.Wait()
 }
SubscriptionScope的Close() 方法接收 Track 方法的返回值 ,Track 方法负责追踪订阅者。

评论(1)
登录 账号发表你的看法,还没有账号?立即免费 注册
  • 链圈观察员 2018-03-18
    golang-event 在以太坊中的使用_巴比特_服务于区块链创新者http://t.cn/RnGhxzx ​