Configure you broker that you are going to embed. This can easily run outside the container as well.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <!-- lets create an embedded ActiveMQ Broker --> <amq:broker id="broker" useJmx="false" persistent="false"> <amq:managementContext> <amq:managementContext connectorPort="1199" jmxDomainName="org.apache.activemq"/> </amq:managementContext> <amq:networkConnectors> <amq:networkConnector name="pool" uri="static:(tcp://localhost:61617)"/> </amq:networkConnectors> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> </amq:transportConnectors> </amq:broker> </beans>
Then inside a config file that you will actually deploy as part of the container...
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd"> <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean"> <property name="config" value="classpath:activemq-cfg.xml" /> <property name="start" value="true" /> </bean> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> </beans>
Then define your config for the consumer and listener.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd"> <!-- ActiveMQ destinations to use --> <amq:queue id="expiredOfferUpdateDestination" physicalName="com.sightlyinc.ratecred.offer.update" /> <!-- Spring JMS Template --> <bean id="expiredOfferUpdateJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="defaultDestination"> <ref bean="expiredOfferUpdateDestination" /> </property> <property name="connectionFactory" ref="jmsFactory"/> </bean> <!-- this is the Message Driven POJO (MDP) --> <bean id="expiredOfferUpdateMessageListener" class="com.sightlyinc.ratecred.admin.jms.UpdateAwardOfferMessageListener"> <property name="sessionFactory"><ref bean="ApplicationSessionFactory"/></property> <property name="mapper"><ref bean="jacksonMapper"/></property> </bean> <!-- and this is the message listener container --> <bean id="expiredOfferUpdateContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destination" ref="expiredOfferUpdateDestination"/> <property name="messageListener" ref="expiredOfferUpdateMessageListener" /> <property name="sessionTransacted" value="true"/> <property name="transactionManager" ref="ApplicationTransactionManager"/> </bean> </beans>
And then the implementations for the producer and the listener..
Producer
package com.sightlyinc.ratecred.admin.jms; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import com.sightlyinc.ratecred.model.Award; import com.sightlyinc.ratecred.model.AwardType; import com.sightlyinc.ratecred.model.Rater; @Component("updateAwardOfferMessageProducer") public class UpdateAwardOfferMessageProducer { static Logger logger = Logger.getLogger(UpdateAwardOfferMessageProducer.class); @Autowired @Qualifier("expiredOfferUpdateJmsTemplate") private JmsTemplate expiredOfferUpdateJmsTemplate; @Autowired @Qualifier("jacksonMapper") private ObjectMapper jacksonMapper; public void generateMessage(Award award, AwardType awardType, Rater r) throws JMSException, JsonGenerationException, JsonMappingException, IOException { logger.debug("generating message"); MapawardToSaveData = new HashMap (); awardToSaveData.put("raterId", r.getId()); awardToSaveData.put("award", award); ByteArrayOutputStream baos = new ByteArrayOutputStream(); jacksonMapper.writeValue(baos, awardToSaveData); final String text = baos.toString(); expiredOfferUpdateJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(text); return message; } }); } }
Listener
package com.sightlyinc.ratecred.admin.jms; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Map; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.hibernate.FlushMode; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.springframework.orm.hibernate3.SessionFactoryUtils; import org.springframework.orm.hibernate3.SessionHolder; import org.springframework.transaction.support.TransactionSynchronizationManager; import com.noi.utility.spring.service.BLServiceException; import com.sightlyinc.ratecred.model.Award; import com.sightlyinc.ratecred.model.AwardType; import com.sightlyinc.ratecred.model.Rater; public class UpdateAwardOfferMessageListener implements MessageListener { static Logger logger = Logger.getLogger(UpdateAwardOfferMessageListener.class); private SessionFactory sessionFactory; private ObjectMapper mapper; /** * Implementation of <code>MessageListener</code>. */ public void onMessage(Message message) { try { logger.debug("==== onMessage"); if (message instanceof TextMessage) { TextMessage tm = (TextMessage)message; //deserialize the json String msgText = tm.getText(); logger.debug(msgText); Map<String,Object> userData = mapper.readValue( new ByteArrayInputStream( msgText.getBytes()), Map.class); Long awardTypePk = new Long(userData.get("awardTypeId").toString()); Long raterPk = new Long(userData.get("raterId").toString()); AwardType awardType = awardManagerService.findAwardTypeByPrimaryKey(awardTypePk); Map<String,Object> awardModel = (Map<String,Object>)userData.get("award"); } } catch (JMSException e) { logger.error(e.getMessage(), e); } catch (IOException e) { logger.error(e.getMessage(), e); } catch (BLServiceException e) { logger.error(e.getMessage(), e); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } finally { } } public void setSessionFactory(SessionFactory sessionFactory) { this.sessionFactory = sessionFactory; } public void setMapper(ObjectMapper mapper) { this.mapper = mapper; } }
No comments:
Post a Comment