środa, listopada 14, 2012

Prosty test na XA

import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Scanner;

import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;

import com.tibco.tibjms.TibjmsXAConnectionFactory;


public class Test {
 
 private static boolean canRun = true;
 private final static void runTest() throws Exception {
  final com.tibco.tibjms.TibjmsXAConnectionFactory xacf = 
   new  TibjmsXAConnectionFactory();
  xacf.setConnAttemptCount(60);
  xacf.setConnAttemptDelay(100);
  xacf.setConnAttemptTimeout(1000);
  xacf.setReconnAttemptCount(60);
  xacf.setReconnAttemptDelay(100);
  xacf.setReconnAttemptTimeout(1000);
  xacf.setServerUrl("tcp://node1:7222,tcp://node2:8222");
  Thread[] threads = new Thread[20];
  FileOutputStream fos = new FileOutputStream("C://xa.log");
  final PrintStream ps = new PrintStream(fos);
  for (int i=0; i < threads.length; i++) {
    threads[i] = new Thread() {
      public void run() {
        while (canRun) {
         XAConnection xaconn1 = null;
         XAConnection xaconn2 = null;
         TransactionManager tm = null;
         XAResource xares1 = null;
         XAResource xares2 = null;
         try {
          xaconn1 = xacf.createXAConnection();
          xaconn2 = xacf.createXAConnection();
          tm = com.arjuna.ats.jta
           .TransactionManager.transactionManager();
          tm.setTransactionTimeout(30);
          xaconn1.start();
          xaconn2.start();
          XASession sess1 = xaconn1.createXASession();
          XASession sess2 = xaconn1.createXASession();
          xares1 = ( sess1.getXAResource() ); 
          xares2 = ( sess2.getXAResource() );
          MessageConsumer bw1cons = 
           sess1.createConsumer(sess1.createQueue("test.bw1"));
          MessageConsumer fw1cons =
           sess1.createConsumer(sess1.createQueue("test.fw1"));
          MessageProducer bw1prod = 
           sess1.createProducer(sess1.createQueue("test.bw1"));
          MessageProducer fw1prod = 
           sess1.createProducer(sess1.createQueue("test.fw1"));
          tm.begin();
          tm.getTransaction().enlistResource(xares1);
          tm.getTransaction().enlistResource(xares2);
          ///
          TextMessage m = (TextMessage) bw1cons.receive(1000);
          if (m!=null)
           fw1prod.send(m);
          m = (TextMessage) fw1cons.receive(1000);
          if (m!=null)
           bw1prod.send(m);
          ///
          tm.getTransaction().delistResource(xares1,
           XAResource.TMSUCCESS);
          tm.getTransaction().delistResource(xares2,
           XAResource.TMSUCCESS);
          tm.commit(); 
         }
         catch (Exception e) {
          e.printStackTrace(ps);
          ps.flush();
          Transaction tr = null;
          try {
           tr = tm.getTransaction();
          }
          catch (Exception ex) {
           ex.printStackTrace(ps);
           ps.flush();
          }
          if (tr!=null) {
           try {
            tr.rollback();
           }
           catch (Exception ex) {
            ex.printStackTrace(ps);
            ps.flush();
           }
           try {
            tr.delistResource(xares1, XAResource.TMFAIL);
           }
           catch (Exception ex) {
            ex.printStackTrace(ps);
            ps.flush();
           }
           try {
            tr.delistResource(xares2, XAResource.TMFAIL);
           }
           catch (Exception ex) {
            ex.printStackTrace(ps);
            ps.flush();
           }
          }
         }
         finally {
          try {
           xaconn1.close();
          }
          catch (Exception ex) {
           ex.printStackTrace();
          }
          try {
           xaconn2.close();
          }
          catch (Exception ex) {
           ex.printStackTrace();
          }
         }
        }
       }
      };
    }
    for (Thread t : threads)
     t.start();
 }

public static void main(String[] args) throws Exception {
  System.out.println("Press q to stop.");
  runTest();
  Scanner sc = new Scanner(System.in);
  if (sc.nextLine().contains("q"))
    canRun = false;
  sc.close();
 }
}

0 komentarze: