Configure you broker that you are going to embed. This can easily run outside the container as well.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<!-- lets create an embedded ActiveMQ Broker -->
<amq:broker id="broker" useJmx="false" persistent="false">
<amq:managementContext>
<amq:managementContext connectorPort="1199" jmxDomainName="org.apache.activemq"/>
</amq:managementContext>
<amq:networkConnectors>
<amq:networkConnector name="pool" uri="static:(tcp://localhost:61617)"/>
</amq:networkConnectors>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61616" />
</amq:transportConnectors>
</amq:broker>
</beans>
Then inside a config file that you will actually deploy as part of the container...
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:activemq-cfg.xml" />
<property name="start" value="true" />
</bean>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
</bean>
</beans>
Then define your config for the consumer and listener.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">
<!-- ActiveMQ destinations to use -->
<amq:queue id="expiredOfferUpdateDestination" physicalName="com.sightlyinc.ratecred.offer.update" />
<!-- Spring JMS Template -->
<bean id="expiredOfferUpdateJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestination">
<ref bean="expiredOfferUpdateDestination" />
</property>
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
<!-- this is the Message Driven POJO (MDP) -->
<bean id="expiredOfferUpdateMessageListener"
class="com.sightlyinc.ratecred.admin.jms.UpdateAwardOfferMessageListener">
<property name="sessionFactory"><ref bean="ApplicationSessionFactory"/></property>
<property name="mapper"><ref bean="jacksonMapper"/></property>
</bean>
<!-- and this is the message listener container -->
<bean id="expiredOfferUpdateContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destination" ref="expiredOfferUpdateDestination"/>
<property name="messageListener" ref="expiredOfferUpdateMessageListener" />
<property name="sessionTransacted" value="true"/>
<property name="transactionManager" ref="ApplicationTransactionManager"/>
</bean>
</beans>
And then the implementations for the producer and the listener..
Producer
package com.sightlyinc.ratecred.admin.jms;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import com.sightlyinc.ratecred.model.Award;
import com.sightlyinc.ratecred.model.AwardType;
import com.sightlyinc.ratecred.model.Rater;
@Component("updateAwardOfferMessageProducer")
public class UpdateAwardOfferMessageProducer {
static Logger logger = Logger.getLogger(UpdateAwardOfferMessageProducer.class);
@Autowired
@Qualifier("expiredOfferUpdateJmsTemplate")
private JmsTemplate expiredOfferUpdateJmsTemplate;
@Autowired
@Qualifier("jacksonMapper")
private ObjectMapper jacksonMapper;
public void generateMessage(Award award, AwardType awardType, Rater r)
throws JMSException, JsonGenerationException, JsonMappingException, IOException {
logger.debug("generating message");
Map awardToSaveData = new HashMap();
awardToSaveData.put("raterId", r.getId());
awardToSaveData.put("award", award);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
jacksonMapper.writeValue(baos, awardToSaveData);
final String text = baos.toString();
expiredOfferUpdateJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage(text);
return message;
}
});
}
}
Listener
package com.sightlyinc.ratecred.admin.jms;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.hibernate.FlushMode;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.springframework.orm.hibernate3.SessionFactoryUtils;
import org.springframework.orm.hibernate3.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import com.noi.utility.spring.service.BLServiceException;
import com.sightlyinc.ratecred.model.Award;
import com.sightlyinc.ratecred.model.AwardType;
import com.sightlyinc.ratecred.model.Rater;
public class UpdateAwardOfferMessageListener implements MessageListener {
static Logger logger = Logger.getLogger(UpdateAwardOfferMessageListener.class);
private SessionFactory sessionFactory;
private ObjectMapper mapper;
/**
* Implementation of <code>MessageListener</code>.
*/
public void onMessage(Message message) {
try {
logger.debug("==== onMessage");
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage)message;
//deserialize the json
String msgText = tm.getText();
logger.debug(msgText);
Map<String,Object> userData =
mapper.readValue(
new ByteArrayInputStream(
msgText.getBytes()), Map.class);
Long awardTypePk =
new Long(userData.get("awardTypeId").toString());
Long raterPk =
new Long(userData.get("raterId").toString());
AwardType awardType = awardManagerService.findAwardTypeByPrimaryKey(awardTypePk);
Map<String,Object> awardModel = (Map<String,Object>)userData.get("award");
}
} catch (JMSException e) {
logger.error(e.getMessage(), e);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (BLServiceException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} finally {
}
}
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
public void setMapper(ObjectMapper mapper) {
this.mapper = mapper;
}
}