前言
消息队列,这个东西在web开发中经常听到,主要解决应用耦合,异步消息,流量削峰等问题。是大型分布式系统不可缺少的中间件。
NSQ是Go语言编写的一个开源的实时分布式的内存消息队列,其性能十分优异。NSQ有以下优势:
- NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
- NSQ支持横向扩展,没有任何集中式代理。
- NSQ易于配置和部署,并且内置了管理界面。
nsq内部组件
nsqd
nsqd是一个守护进程,它接收、排队并向客户端发送消息,如果是在搭配
nsqlookupd
使用的模式下需要还指定nsqlookupd
地址,命令为:-lookupd-tcp-address=<nsqlookup IP>
nsqlookup
nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。
nsqadmin
这是一个实时监控集群状态、执行各种管理任务的Web管理平台,需要搭配nsqlookup食用
docker配置nsq
拉取镜像
docker pull nsqio/nsq
启动nsqlookup服务
docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
进入容器获取分配到的docker ip地址
docker exec -it <容器id> /bin/sh # 进入容器 ifconfig | grep addr # 获取docker主机ip
启动nsqd服务节点
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.2 --lookupd-tcp-address=172.17.0.2:4160
注解: lookupd-tcp-address为docker ip加端口号
此时已经启动了工作所需组件,不过由于他内置了admin管理界面,所以也是需要单独启一个容器的
启动admin管理界面
docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=172.17.0.2:4161
访问
http://127.0.0.1:4171/
即可看到这个管理界面:如果你在这个管理界面的Counter看到一则错误,那多半是TCP绑定的4151出现了错误,这个问题貌似还有点难搞,可能是防火墙出现了拦截,也或者是其他的什么问题,为了简单点,直接将其以上的三个容器用compose进行统一的管理
docker-compose.yml
:version: '3' services: nsqlookupd: container_name: nsqlookupd image: nsqio/nsq command: /nsqlookupd ports: - "4160:4160" - "4161:4161" nsqd: container_name: nsqd image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 depends_on: - nsqlookupd ports: - "4150:4150" - "4151:4151" nsqadmin: container_name: nsqadmin image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "4171:4171"
直接绑定nsqlookupd的地址,这样就不会出现错误了。
go操纵NSQ
安装一个客户端依赖包
go get -u "github.com/nsqio/go-nsq"
初始化一下生产者,为了模拟使用场景使用了无限循环加命令行输入的方式,也可以直接定义一个字符串,直接调用producer.Publish
方法
var producer *nsq.Producer
// 初始化生产者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%v\n", err)
return err
}
return nil
}
func main() {
nsqAddress := "127.0.0.1:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%v\n", err)
return
}
message := "hello salmon"
err = producer.Publish("salmon_chan", []byte(message))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err)
}
}
然后打开管理界面会看到:
ok,没有什么问题。这个/counter
页面显示了处理的消息数量,因为我们没有接入消费者,所以处理的消息数量为0。
然后我们来创建消费者来模拟
// MyHandler 是一个消费者类型
type MyHandler struct {
Title string
}
// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err)
return
}
consumer := &MyHandler{
Title: "salmon_chan",
}
c.AddHandler(consumer)
//if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
fmt.Println("salmon error: ", err)
return err
}
return nil
}
func main() {
err := initConsumer("salmon_chan", "first", "127.0.0.1:4161")
if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err)
return
}
c := make(chan os.Signal) // 定义一个信号的通道
signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
<-c // 阻塞
}
这里补充一则错误,在初始化消费者的时候,不出意外的话,你会看到这种意外
2022/04/09 18:54:30 INF 1 [salmon_chan/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=salmon_chan
2022/04/09 18:54:30 INF 1 [salmon_chan/first] (ee3e0329bac6:4150) connecting to nsqd
2022/04/09 18:54:31 ERR 1 [salmon_chan/first] (ee3e0329bac6:4150) error connecting to nsqd - dial tcp: i/o timeout
直接连nsqd将地址改为nsqd的4150端口是没有问题的,但是使用nsqlookup进行一些查找的时候,会有以上错误。
我在其issuess上看到评论,然后才发现…官方提供的compose是有坑的。改良之后的配置,只需要在nsqd新增一项配置:

这个地址需要说明下哈,由于我们是使用docker启动,因此返回的 nsqd 所在的 host 直接就是 nsqd 。然而我的本地又没有 DNS 解析, 所以,如果使用nsqlookupd当作地址的话,也是无法解析出来的。所以我们直接填写我们的公网或者局域网ip就好了。
之后,再启动消费者可以看到

ok,关于go-nsq
的更多内容请阅读go-nsq的官方文档。
- Post link: https://www.godhearing.cn/docker-jian-dan-shi-yong-nsq-xiao-xi-dui-lie/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.