Redisのpub/subを軽く触る
Pub/Subとは
Cloud Pub/Sub とは Pub/Subとは言えばGCPのCloud Pub/Subで有名なメッセージングサービスです。一言でいうと、「Publisher」が送信したメッセージを、「Subscriber」達が受け取ることができるものだと考えてください。
引用:https://cloud.google.com/pubsub/docs/overview
PublisherとSubscriberは両方複数登録することができるので、一対多、多対多の非同期メッセージングを実現することができます。 また、PublisherとSubscriberで処理速度や負荷の違いが有るときに緩衝材として機能させることができます。
Redis-Pub/Sub
KVSで有名なRedisにもPub/Sub機能があります。今回は、RedisのPub/Subを使ってメッセージのやりとりを行なっていきます。言語はGo言語を使います。
実践
今回はPublisher、Subscriberが共に一つだけの場合を考えます。 go-redis/redisを使うと、以下のようにPublisher、Subscriberを構造体として表せます。
//Publisherの構造体.redisのクライアントやPub/Subに関するデータをもつ type Publisher struct{ redis *redis.Client channel string pubsub *redis.PubSub } func NewPublisher(channel string) *Publisher { client := NewRedis() return &Publisher{ redis:client, channel:channel, pubsub:client.Subscribe(channel), } } //SubScriber用のチャネルを生成 func (p Publisher) SubChannel() <-chan *redis.Message{ _, err := p.pubsub.Receive() if err != nil { panic(err) } return p.pubsub.Channel() } func (p Publisher)Close() error { err := p.pubsub.Close() return err } //メッセージの送信 func (p Publisher) Publish(message string) error { err := p.redis.Publish(p.channel,message).Err() return err }
//SubScriberはメッセージを受信するチャネルを持つ type Subscriber struct { ch <-chan *redis.Message } func (s Subscriber)RecieveMessage() { for msg:= range s.ch{ fmt.Println("recieve: ",msg) }
Publisherは、Redisのクライアントをもち、Pub/Sub通信を行う部屋の名前とredisのPubSub用の構造体を持っています。PublishメソッドによってPub/Subにメッセージを送信します。 Subscriberに関しては、Go言語には「チャネル」という非同期通信をするのに便利な機能があり、今回はメッセージの受信に受信専用のチャネルを使用しています。
実際のメッセージのやりとり
実際にPublisherとSubscriberを準備し、メッセージのやりとりを行います
publisher := NewPublisher("channel1") subscriber := Subscriber{ ch: publisher.SubChannel(), } go func() { for i := 0; i < 10; i++ { time.Sleep(1 * time.Second) //時間の送信 err := publisher.Publish(time.Now().String()) if err != nil { log.Fatal(err) } } publisher.Close() }() // messageの受信 for msg := range subscriber.ch { fmt.Println(msg.Payload) }
Publisherは10秒間の間、その時の時間を送信します。SubScriberは最後のfor文でチャネルがCloseされるまでの間、チャネルから送信されたデータを読み込みます。 そうすると以下のような結果が得られます。
2019-03-15 18:43:01.059707 +0900 JST m=+1.013542296 2019-03-15 18:43:02.066162 +0900 JST m=+2.019999777 2019-03-15 18:43:03.073749 +0900 JST m=+3.027588299 2019-03-15 18:43:04.080888 +0900 JST m=+4.034729674 2019-03-15 18:43:05.087476 +0900 JST m=+5.041319857 2019-03-15 18:43:06.092702 +0900 JST m=+6.046548033 2019-03-15 18:43:07.096567 +0900 JST m=+7.050414716 2019-03-15 18:43:08.10268 +0900 JST m=+8.056530136 2019-03-15 18:43:09.109732 +0900 JST m=+9.063583712 2019-03-15 18:43:10.115859 +0900 JST m=+10.069712490
単純な例ですが、これでPub/Sub通信をすることができました。
最後に
最近ではマイクロサービス化が進んでいるので、様々なサービスの間で通信が行われることが想定されますが、そういったサービス間の通信の一つとしてPub/Sub通信は非常に便利な機能なので使う機会があればぜひ使っていきましょう