前言

消息队列,这个东西在web开发中经常听到,主要解决应用耦合,异步消息,流量削峰等问题。是大型分布式系统不可缺少的中间件。

NSQ是Go语言编写的一个开源的实时分布式的内存消息队列,其性能十分优异。NSQ有以下优势:

  1. NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
  2. NSQ支持横向扩展,没有任何集中式代理。
  3. NSQ易于配置和部署,并且内置了管理界面。

nsq内部组件

  • nsqd

    nsqd是一个守护进程,它接收、排队并向客户端发送消息,如果是在搭配nsqlookupd使用的模式下需要还指定nsqlookupd地址,命令为:-lookupd-tcp-address=<nsqlookup IP>

  • nsqlookup

    nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。

  • nsqadmin

    这是一个实时监控集群状态、执行各种管理任务的Web管理平台,需要搭配nsqlookup食用

docker配置nsq

  1. 拉取镜像

    docker pull nsqio/nsq
  2. 启动nsqlookup服务

    docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
  3. 进入容器获取分配到的docker ip地址

    docker exec -it <容器id> /bin/sh    # 进入容器
    ifconfig | grep addr    # 获取docker主机ip
  4. 启动nsqd服务节点

    docker run -d --name nsqd -p 4150:4150 -p 4151:4151  nsqio/nsq /nsqd  --broadcast-address=172.17.0.2  --lookupd-tcp-address=172.17.0.2:4160

    注解: lookupd-tcp-address为docker ip加端口号

    此时已经启动了工作所需组件,不过由于他内置了admin管理界面,所以也是需要单独启一个容器的

  5. 启动admin管理界面

    docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=172.17.0.2:4161

    访问http://127.0.0.1:4171/即可看到这个管理界面:

    如果你在这个管理界面的Counter看到一则错误,那多半是TCP绑定的4151出现了错误,这个问题貌似还有点难搞,可能是防火墙出现了拦截,也或者是其他的什么问题,为了简单点,直接将其以上的三个容器用compose进行统一的管理

    docker-compose.yml:

    version: '3'
    services:
      nsqlookupd:
        container_name: nsqlookupd
        image: nsqio/nsq
        command: /nsqlookupd
        ports:
          - "4160:4160"
          - "4161:4161"
      nsqd:
        container_name: nsqd
        image: nsqio/nsq
        command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
        depends_on:
          - nsqlookupd
        ports:
          - "4150:4150"
          - "4151:4151"
      nsqadmin:
        container_name: nsqadmin
        image: nsqio/nsq
        command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
        depends_on:
          - nsqlookupd
        ports:
          - "4171:4171"

    直接绑定nsqlookupd的地址,这样就不会出现错误了。

go操纵NSQ

安装一个客户端依赖包

go get -u "github.com/nsqio/go-nsq"

初始化一下生产者,为了模拟使用场景使用了无限循环加命令行输入的方式,也可以直接定义一个字符串,直接调用producer.Publish方法

var producer *nsq.Producer

// 初始化生产者
func initProducer(str string) (err error) {
	config := nsq.NewConfig()
	producer, err = nsq.NewProducer(str, config)
	if err != nil {
		fmt.Printf("create producer failed, err:%v\n", err)
		return err
	}
	return nil
}

func main() {
	nsqAddress := "127.0.0.1:4150"
	err := initProducer(nsqAddress)
	if err != nil {
		fmt.Printf("init producer failed, err:%v\n", err)
		return
	}
    message := "hello salmon"
        err = producer.Publish("salmon_chan", []byte(message))
        if err != nil {
            fmt.Printf("publish msg to nsq failed, err:%v\n", err)
        }
}

然后打开管理界面会看到:

ok,没有什么问题。这个/counter页面显示了处理的消息数量,因为我们没有接入消费者,所以处理的消息数量为0。

然后我们来创建消费者来模拟

// MyHandler 是一个消费者类型
type MyHandler struct {
	Title string
}

// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
	fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
	return
}

// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
	config := nsq.NewConfig()
	config.LookupdPollInterval = 15 * time.Second
	c, err := nsq.NewConsumer(topic, channel, config)
	if err != nil {
		fmt.Printf("create consumer failed, err:%v\n", err)
		return
	}
	consumer := &MyHandler{
		Title: "salmon_chan",
	}
	c.AddHandler(consumer)
	//if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
	if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
		fmt.Println("salmon error: ", err)
		return err
	}
	return nil
}

func main() {
	err := initConsumer("salmon_chan", "first", "127.0.0.1:4161")
	if err != nil {
		fmt.Printf("init consumer failed, err:%v\n", err)
		return
	}
	c := make(chan os.Signal)        // 定义一个信号的通道
	signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
	<-c                              // 阻塞
}

这里补充一则错误,在初始化消费者的时候,不出意外的话,你会看到这种意外

2022/04/09 18:54:30 INF    1 [salmon_chan/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=salmon_chan
2022/04/09 18:54:30 INF    1 [salmon_chan/first] (ee3e0329bac6:4150) connecting to nsqd
2022/04/09 18:54:31 ERR    1 [salmon_chan/first] (ee3e0329bac6:4150) error connecting to nsqd - dial tcp: i/o timeout

直接连nsqd将地址改为nsqd的4150端口是没有问题的,但是使用nsqlookup进行一些查找的时候,会有以上错误。

我在其issuess上看到评论,然后才发现…官方提供的compose是有坑的。改良之后的配置,只需要在nsqd新增一项配置:

这个地址需要说明下哈,由于我们是使用docker启动,因此返回的 nsqd 所在的 host 直接就是 nsqd 。然而我的本地又没有 DNS 解析, 所以,如果使用nsqlookupd当作地址的话,也是无法解析出来的。所以我们直接填写我们的公网或者局域网ip就好了。

之后,再启动消费者可以看到

ok,关于go-nsq的更多内容请阅读go-nsq的官方文档