发现了一个Nginx的LUA脚本:lua-resty-rabbitmqstomp,可以让Nginx通过LUA脚本访问RabbitMQ消息队列,这个脚本是通过stomp协议连接RabbitMQ的stomp适配器,来pub/sub消息的 关于RabbitMQ-STOMP安装使用相关内容可以参见:RabbitMQ STOMP Adapter 关于Nginx-LUA模块安装使用参见:LAMP架构演进到LAMPGC,再演进到LNMLGC 关于STOMP协议相关资料参见这里: nginx配置: location /rabbitmq {
lua_code_cache off;
content_by_lua_file conf/rabbitmq.lua;
} rabbitmq.lua源码如下,该脚本简单的实现pub一个消息然后再sub该消息,set_timeout(60000)表示sub消息如果没有消息时等待60S超时 local strlen = string.len
local json = require 'json'
local rabbitmq = require 'rabbitmqstomp'
local mq, err = rabbitmq:new()
if not mq then
return
end
mq:set_timeout(60000)
local ok, err = mq:connect {
host = '127.0.0.1',
port = 61613,
username = 'guest',
password = 'guest',
vhost = '/'
}
if not ok then
return
end
ngx.log(ngx.INFO, 'Connect: ' .. 'OK')
local msg = {key='value1', key2='value2'}
local headers = {}
headers['destination'] = '/queue/my_queue'
headers['receipt'] = 'msg#1'
headers['app-id'] = 'luaresty'
headers['persistent'] = 'true'
headers['content-type'] = 'application/json'
local ok, err = mq:send(json.encode(msg), headers)
if not ok then
return
end
ngx.log(ngx.INFO, 'Published: ' .. json.encode(msg))
local headers = {}
headers['destination'] = '/queue/my_queue'
headers['persistent'] = 'true'
headers['id'] = '123'
local ok, err = mq:subscribe(headers)
if not ok then
return
end
local data, err = mq:receive()
if not data then
return
end
ngx.log(ngx.INFO, 'Consumed: ' .. data)
ngx.header.content_type = 'text/plain';
ngx.say(data);
local headers = {}
headers['persistent'] = 'true'
headers['id'] = '123'
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
具体扩展一下,可以实现推送功能:发送方调用send将消息pub到RabbitMQ,接收方设置一个超时调用subscribe订阅消息(就类似于long polling)
|
|
来自: waitingnothing > 《网关》