分享

ActiveMq-拦截创建消息队列

 WindySky 2018-03-21

ActiveMQ拦截客户端创建/接收消息队列

http://www.cnblogs.com/huangzhex/p/6358214.html

1.创建插件

复制代码
public class AuthPlugin implements BrokerPlugin{
    private String mqName;//本MQ服务器名称
    private JdbcTemplate jdbcTemplate;//数据库操作类
    
    public AuthPlugin(JdbcTemplate jdbcTemplate,String mqName) {
        this.jdbcTemplate=jdbcTemplate;
        this.mqName=mqName;
    }
    
    @Override
    public Broker installPlugin(Broker broker) throws Exception {
        return new AuthBroker(broker,jdbcTemplate,mqName);
    }
}
复制代码

2.修改apache-activemq\conf\activemq.xml

复制代码
<!--broker节点下-->
<plugins>
    <bean xmlns="http://www./schema/beans" id="ehlPlugin" class="com.ehl.plugin.AuthPlugin">
        <constructor-arg index="0">
            <ref bean="jdbcTemplate"/>
        </constructor-arg>
        <constructor-arg index="1" value="MQName"/><!--本消息队列的名称-->
    </bean>
</plugins>
复制代码

3.创建插件类

复制代码
public class AuthBroker extends AbstractAuthenticationBroker{
    private static Log log = LogFactory.getLog(AuthBroker.class);
   //用户 对应的权限
   private Map<String, Map<String,ViewProjectMqQueuesCom>> powers=new HashMap<String,Map<String,ViewProjectMqQueuesCom>>();//权限

    private static final String ACTIVEMQ_ADVISORY_PRODUCER_QUEUE="ActiveMQ.Advisory.Producer.Queue.";//消息生产者前缀
    private static final String ACTIVEMQ_ADVISORY_CONSUMER_QUEUE="ActiveMQ.Advisory.Consumer.Queue.";//消息消费者前缀
    private JdbcTemplate jdbcTemplate;//数据库操作
    private String mqName;//MQ服务器名称

    public AuthBroker(Broker next,JdbcTemplate jdbcTemplate,String mqName) {
        super(next);
        this.jdbcTemplate=jdbcTemplate;
        this.mqName=mqName;
    }
        /**
     * 连接拦截器
     */
    @Override
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        log.info("用户["+info.getUserName()+"]请求连接["+mqName+"]!");
        SecurityContext securityContext = context.getSecurityContext();
        if (securityContext == null) {
            securityContext = authenticate(info.getUserName(), info.getPassword(), null);
            context.setSecurityContext(securityContext);
            securityContexts.add(securityContext);
        }
        
        try {
            super.addConnection(context, info);
        } catch (Exception e) {
            securityContexts.remove(securityContext);
            context.setSecurityContext(null);
            throw e;
        }
    }
        /**
     * 认证
     * <p>Title: authenticate</p>
     */
    @Override
    public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
        SecurityContext securityContext = null;
        Com com=getCom(username,password);
        //验证用户信息
        if(com!=null&&com.getId()!=null){
             securityContext = new SecurityContext(username) {
                    @Override
                    public Set<Principal> getPrincipals() {
                        Set<Principal> groups = new HashSet<Principal>();
                        groups.add(new GroupPrincipal("users"));//默认加入了users的组
                        return groups;
                    }
                };
//                log.info("用户:"+username+"验证成功!");
        }else{
            log.error("用户:"+username+"验证失败!");
            throw new SecurityException("验证失败");
        }
        return securityContext;
    }
        /**
     * 添加一个目标
     * <p>Title: addDestination</p>
     * @see org.apache.activemq.broker.BrokerFilter#addDestination(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination, boolean)
     */
    @Override
    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
        boolean destStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, 0,ACTIVEMQ_ADVISORY_PRODUCER_QUEUE.length());
        //发送消息者
        if(destStats){
            if(context.getSecurityContext()!=null){
                //判断不是默认用户
                if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){
                    String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, "");//得到消息队列名
                    if(powers.containsKey(context.getSecurityContext().getUserName())){//判断该用户是否有权限
                        Map<String,ViewProjectMqQueuesCom> map=powers.get(context.getSecurityContext().getUserName());
                        if(map!=null&&map.containsKey(queuesName)){//判断是否有发送的权限
                            if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){
                                if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.SEND.getValue().intValue()){
                                    return super.addDestination(context, destination, createIfTemporary);
                                }
                            }
                        }
                        throw new Exception("["+mqName+"-"+context.getUserName()+"]对消息队列["+queuesName+"]没有发送消息的权限");
                    }else{
                        throw new Exception("请登录后再操作!");
                    }
                }
            }
        }else{
            boolean consumerStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, 0,ACTIVEMQ_ADVISORY_CONSUMER_QUEUE.length());
            //消息接收者
            if(consumerStats){
                if(context.getSecurityContext()!=null){
                    //判断不是默认用户
                    if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){
                        String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, "");//得到消息队列名称
                        if(powers.containsKey(context.getSecurityContext().getUserName())){//判断用户是否有对应的权限
                            Map<String,ViewProjectMqQueuesCom> map=powers.get(context.getSecurityContext().getUserName());
                            if(map!=null&&map.containsKey(queuesName)){
                                if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){
                                    if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.RECEIVE.getValue().intValue()){
                                        return super.addDestination(context, destination, createIfTemporary);
                                    }
                                }
                            }
                            throw new Exception("["+mqName+"-"+context.getUserName()+"]对消息队列["+queuesName+"]没有获取消息的权限");
                        }else{
                            throw new Exception("请登录后再操作!");
                        }
                    }
                }
            }
        }
        return super.addDestination(context, destination, createIfTemporary);
    }
        /**
     * 监控发送消息
     * <p>Title: send</p>
     * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, org.apache.activemq.command.Message)
     */
    @Override
    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
        String userName=producerExchange.getConnectionContext().getUserName();
        ActiveMQDestination msgDest = messageSend.getDestination();
        String physicalName = msgDest.getPhysicalName();
    }
/**
     * 监控消息接收者
     * <p>Title: acknowledge</p>
     * @see org.apache.activemq.broker.BrokerFilter#acknowledge(org.apache.activemq.broker.ConsumerBrokerExchange, org.apache.activemq.command.MessageAck)
     */
    @Override
    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
        String userName=consumerExchange.getConnectionContext().getUserName();
        String queues=ack.getDestination().getPhysicalName();
    }
}
复制代码

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约