ActiveMQ采用plugin方式扩展方法,下面是如何使用plugin方式进行自定义的登录方式。 准备创建mysql数据库保存用户,密码,权限等信息。 多台MQ服务器加入自定义的plugin插件,通过这个plugin访问mysql服务器进行登录与授权操作。 一、plugin基本结构与配置 具体代码如下: public class AuthPlugin implements BrokerPlugin { @Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker); } } public class AuthBroker extends AbstractAuthenticationBroker { private static Log log = LogFactory.getLog(AuthBroker.class); public AuthBroker(Broker next) { super(next); } @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { log.debug("addConnection"); SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); } try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } } @Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException { SecurityContext securityContext = null; if("admin".equals(username)&&"1234".equals(password)){ securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>(); groups.add(new GroupPrincipal("users")); return groups; } }; }else{ throw new SecurityException("验证失败"); } return securityContext; } }
安装插件的步骤如下: 1.将代码导出jar 2.将jar包拷贝到activemq目录下的lib目录下 3.修改activemq\conf\activemq.xml 在broker中加入 <broker xmlns="http://activemq./schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> //... <plugins> <bean xmlns="http://www./schema/beans" id="jhPlugin" class="com.test.activemq.plugin.AuthPlugin"/> </plugins> </broker>
4.修改activemq/conf/log4j.properties。日志文件目录在activemq/data/activemq.log log4j.rootLogger=INFO, console, logfile
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.apache.activemq.web.handler=WARN
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.xbean=WARN
log4j.logger.org.apache.camel=INFO
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.com.test.activemq=DEBUG 5.重启activemq服务。 二、集成mysql 1.将mysql-connector-java-5.1.20.jar复制到activemq\lib目录下 2.数据库操作采用spring-jdbc的方式,需要将spring-jdbc-4.1.9.RELEASE.jar复制到activemq\lib\optional目录下(spring-jdbc的版本应与lib\optional其他的spring相同) 3.修改activemq\conf\activemq.xml文件 <beans> //... <!-- mysql数据库数据源--> <bean id="mySqlDataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${jdbc.driverClassName}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean> <!-- 增加jdbcTemplate--> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" abstract="false" lazy-init="false" autowire="default" > <property name="dataSource"> <ref bean="mySqlDataSource" /> </property> </bean> </beans> 4.修改activemq\conf\activemq.xml文件的PropertyPlaceholderConfigurer <beans> //... <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>file:${activemq.conf}/credentials.properties</value> <value>file:${activemq.conf}/db.properties</value> </list> </property> </bean> </beans> 5.增加activemq\conf\activemq.xml文件的<broker> <beans> <broker> <plugins> <bean xmlns="http://www./schema/beans" id="testPlugin" class="com.test.activemq.plugin.AuthPlugin"> <constructor-arg> <ref bean="jdbcTemplate"/> </constructor-arg> </bean> </plugins> </broker> </beans> 6.AuthPlugin的代码如下: public class AuthPlugin implements BrokerPlugin { JdbcTemplate jdbcTemplate;//注入了spring-jdbc public AuthPlugin(JdbcTemplate jdbcTemplate) { this.jdbcTemplate=jdbcTemplate; } @Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker,jdbcTemplate); } } 7.AuthBroker的代码如下: public class AuthBroker extends AbstractAuthenticationBroker { private static Log log = LogFactory.getLog(AuthBroker.class); private JdbcTemplate jdbcTemplate; public AuthBroker(Broker next,JdbcTemplate jdbcTemplate) { super(next); this.jdbcTemplate=jdbcTemplate; } /** * <p> * 创建连接的时候拦截 * </p> */ @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { log.debug("addConnection"); SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); } try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } } /** * 得到用户信息 * <p>Title: getUser</p> * @param username * @return */ private User getUser(String username){ String sql="select * from jh_user where username=? limit 1"; User user=jdbcTemplate.queryForObject(sql,new Object[]{username},new BeanPropertyRowMapper<User>(User.class)); return user; } /** * 认证 * <p>Title: authenticate</p> */ @Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException { SecurityContext securityContext = null; User user=getUser(username); //验证用户信息 if(user!=null&&user.getPwd().equals(password)){ securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>(); groups.add(new GroupPrincipal("users"));//默认加入了users的组 return groups; } }; }else{ throw new SecurityException("验证失败"); } return securityContext; } } 8.在activemq\conf\目录下加入db.properties jdbc.driverClassName=com.mysql.jdbc.Driver |
|