Sunday, February 20, 2011

JMS Template and Spring 3.0 Transactions

Ok Here is how you do it...

Configure you broker that you are going to embed. This can easily run outside the container as well.

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
 xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p"
 xsi:schemaLocation="
  http://www.springframework.org/schema/jms 
        http://www.springframework.org/schema/jms/spring-jms.xsd 
  http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core 
  http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">
  
  <!-- Allows us to use system properties as variables in this configuration file --> 
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> 
  
  
  <!--  lets create an embedded ActiveMQ Broker -->
 <amq:broker id="broker" useJmx="false" persistent="false">
  <amq:managementContext>
   <amq:managementContext connectorPort="1199" jmxDomainName="org.apache.activemq"/>
  </amq:managementContext>
  <amq:networkConnectors>
   <amq:networkConnector name="pool" uri="static:(tcp://localhost:61617)"/>
  </amq:networkConnectors>  
  <amq:transportConnectors>
   <amq:transportConnector uri="tcp://localhost:61616" />
  </amq:transportConnectors>  
 </amq:broker>

</beans> 

Then inside a config file that you will actually deploy as part of the container...
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
 xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p"
 xsi:schemaLocation="
  http://www.springframework.org/schema/jms 
        http://www.springframework.org/schema/jms/spring-jms.xsd 
  http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core 
  http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">

 <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
     <property name="config" value="classpath:activemq-cfg.xml" />
     <property name="start" value="true" />
 </bean>
 
 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
     <property name="connectionFactory">
       <bean class="org.apache.activemq.ActiveMQConnectionFactory">
         <property name="brokerURL">
           <value>tcp://localhost:61616</value>
         </property>
       </bean>
     </property>
   </bean> 
</beans>

Then define your config for the consumer and listener.
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
 xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p"
 xsi:schemaLocation="
  http://www.springframework.org/schema/jms 
        http://www.springframework.org/schema/jms/spring-jms.xsd 
  http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core 
  http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">

 <!--  ActiveMQ destinations to use  -->
 <amq:queue id="expiredOfferUpdateDestination" physicalName="com.sightlyinc.ratecred.offer.update" />
 
  <!-- Spring JMS Template -->
 <bean id="expiredOfferUpdateJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="defaultDestination">
   <ref bean="expiredOfferUpdateDestination" />
  </property>
  <property name="connectionFactory" ref="jmsFactory"/>
 </bean>

 <!-- this is the Message Driven POJO (MDP) -->
 <bean id="expiredOfferUpdateMessageListener"
  class="com.sightlyinc.ratecred.admin.jms.UpdateAwardOfferMessageListener">
  <property name="sessionFactory"><ref bean="ApplicationSessionFactory"/></property>
  <property name="mapper"><ref bean="jacksonMapper"/></property>
 </bean>
 
 <!-- and this is the message listener container -->
 <bean id="expiredOfferUpdateContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
     <property name="connectionFactory" ref="jmsFactory"/>
     <property name="destination" ref="expiredOfferUpdateDestination"/>
     <property name="messageListener" ref="expiredOfferUpdateMessageListener" />
     <property name="sessionTransacted" value="true"/>
     <property name="transactionManager" ref="ApplicationTransactionManager"/>     
 </bean> 
</beans>

And then the implementations for the producer and the listener..

Producer
package com.sightlyinc.ratecred.admin.jms;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.sightlyinc.ratecred.model.Award;
import com.sightlyinc.ratecred.model.AwardType;
import com.sightlyinc.ratecred.model.Rater;

@Component("updateAwardOfferMessageProducer")
public class UpdateAwardOfferMessageProducer {
 
    static Logger logger = Logger.getLogger(UpdateAwardOfferMessageProducer.class);
    
    @Autowired
    @Qualifier("expiredOfferUpdateJmsTemplate")
    private JmsTemplate expiredOfferUpdateJmsTemplate;
    
    @Autowired
    @Qualifier("jacksonMapper")
    private ObjectMapper jacksonMapper;

    public void generateMessage(Award award, AwardType awardType, Rater r) 
     throws JMSException, JsonGenerationException, JsonMappingException, IOException {
      logger.debug("generating message");     
         Map awardToSaveData = new HashMap();
         awardToSaveData.put("raterId", r.getId());
            awardToSaveData.put("award", award);
      
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
      
         jacksonMapper.writeValue(baos, awardToSaveData);
      
            final String text = baos.toString();

            expiredOfferUpdateJmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage message = session.createTextMessage(text);                 
                    return message;
                }
            });
    }
}

Listener
package com.sightlyinc.ratecred.admin.jms;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.hibernate.FlushMode;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.springframework.orm.hibernate3.SessionFactoryUtils;
import org.springframework.orm.hibernate3.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import com.noi.utility.spring.service.BLServiceException;
import com.sightlyinc.ratecred.model.Award;
import com.sightlyinc.ratecred.model.AwardType;
import com.sightlyinc.ratecred.model.Rater;

public class UpdateAwardOfferMessageListener implements MessageListener {
    static Logger logger = Logger.getLogger(UpdateAwardOfferMessageListener.class);
    private SessionFactory sessionFactory;
    private ObjectMapper mapper;
 
    /**
    * Implementation of <code>MessageListener</code>.
    */
    public void onMessage(Message message) {
        try {
            logger.debug("==== onMessage");
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage)message;
                //deserialize the json
                String msgText = tm.getText();
                logger.debug(msgText);
                Map<String,Object> userData =
                mapper.readValue(
                new ByteArrayInputStream(
                msgText.getBytes()), Map.class);
                Long awardTypePk =
                new Long(userData.get("awardTypeId").toString());
                Long raterPk =
                new Long(userData.get("raterId").toString());
                AwardType awardType =   awardManagerService.findAwardTypeByPrimaryKey(awardTypePk);
                Map<String,Object> awardModel = (Map<String,Object>)userData.get("award");
                
            }
            } catch (JMSException e) {
            logger.error(e.getMessage(), e);
            } catch (IOException e) {
            logger.error(e.getMessage(), e);
            } catch (BLServiceException e) {
            logger.error(e.getMessage(), e);
            } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
            } finally {
        }
    }
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
    public void setMapper(ObjectMapper mapper) {
        this.mapper = mapper;
    }
}