Redisのpub/subを軽く触る

Pub/Subとは

Cloud Pub/Sub とは Pub/Subとは言えばGCPのCloud Pub/Subで有名なメッセージングサービスです。一言でいうと、「Publisher」が送信したメッセージを、「Subscriber」達が受け取ることができるものだと考えてください。 f:id:tomiokasyogo:20190315200033p:plain

引用:https://cloud.google.com/pubsub/docs/overview

PublisherとSubscriberは両方複数登録することができるので、一対多、多対多の非同期メッセージングを実現することができます。 また、PublisherとSubscriberで処理速度や負荷の違いが有るときに緩衝材として機能させることができます。

Redis-Pub/Sub

KVSで有名なRedisにもPub/Sub機能があります。今回は、RedisのPub/Subを使ってメッセージのやりとりを行なっていきます。言語はGo言語を使います。

実践

今回はPublisher、Subscriberが共に一つだけの場合を考えます。 go-redis/redisを使うと、以下のようにPublisher、Subscriberを構造体として表せます。

//Publisherの構造体.redisのクライアントやPub/Subに関するデータをもつ
type Publisher struct{
    redis *redis.Client
    channel string
    pubsub *redis.PubSub
}

func NewPublisher(channel string) *Publisher {
    client := NewRedis()
    return &Publisher{
        redis:client,
        channel:channel,
        pubsub:client.Subscribe(channel),
    }
}

//SubScriber用のチャネルを生成
func (p Publisher) SubChannel() <-chan *redis.Message{
    _, err := p.pubsub.Receive()
    if err != nil {
        panic(err)
    }

    return p.pubsub.Channel()
}

func (p Publisher)Close() error {
    err := p.pubsub.Close()
    return err
}

//メッセージの送信
func (p Publisher) Publish(message string) error {
    err := p.redis.Publish(p.channel,message).Err()
    return err
}
//SubScriberはメッセージを受信するチャネルを持つ
type Subscriber struct {
    ch <-chan *redis.Message
}

func (s Subscriber)RecieveMessage() {
    for msg:= range s.ch{
        fmt.Println("recieve: ",msg)
    }

Publisherは、Redisのクライアントをもち、Pub/Sub通信を行う部屋の名前とredisのPubSub用の構造体を持っています。PublishメソッドによってPub/Subにメッセージを送信します。 Subscriberに関しては、Go言語には「チャネル」という非同期通信をするのに便利な機能があり、今回はメッセージの受信に受信専用のチャネルを使用しています。

実際のメッセージのやりとり

実際にPublisherとSubscriberを準備し、メッセージのやりとりを行います

   publisher := NewPublisher("channel1")


    subscriber := Subscriber{
        ch: publisher.SubChannel(),
    }

    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(1 * time.Second)
             //時間の送信
            err := publisher.Publish(time.Now().String())
            if err != nil {
                log.Fatal(err)
            }
        }
        publisher.Close()
    }()

    // messageの受信
    for msg := range subscriber.ch {
        fmt.Println(msg.Payload)
    }

Publisherは10秒間の間、その時の時間を送信します。SubScriberは最後のfor文でチャネルがCloseされるまでの間、チャネルから送信されたデータを読み込みます。 そうすると以下のような結果が得られます。

2019-03-15 18:43:01.059707 +0900 JST m=+1.013542296
2019-03-15 18:43:02.066162 +0900 JST m=+2.019999777
2019-03-15 18:43:03.073749 +0900 JST m=+3.027588299
2019-03-15 18:43:04.080888 +0900 JST m=+4.034729674
2019-03-15 18:43:05.087476 +0900 JST m=+5.041319857
2019-03-15 18:43:06.092702 +0900 JST m=+6.046548033
2019-03-15 18:43:07.096567 +0900 JST m=+7.050414716
2019-03-15 18:43:08.10268 +0900 JST m=+8.056530136
2019-03-15 18:43:09.109732 +0900 JST m=+9.063583712
2019-03-15 18:43:10.115859 +0900 JST m=+10.069712490

単純な例ですが、これでPub/Sub通信をすることができました。

最後に

最近ではマイクロサービス化が進んでいるので、様々なサービスの間で通信が行われることが想定されますが、そういったサービス間の通信の一つとしてPub/Sub通信は非常に便利な機能なので使う機会があればぜひ使っていきましょう