Destination destination, String s,
ServerSessionPool serversessionpool, int i) throws JMSException {
//AQjmsError.throwEx(102);
return _createConnectionConsumer(destination, null, s, serversessionpool, i);
}
private ConnectionConsumer _createConnectionConsumer(final Destination destination,
final String sub, final String s, final ServerSessionPool spool, final int max) {
return new ConnectionConsumer() {
private boolean closed = false;
private final LinkedBlockingQueue<Long> trigger = new LinkedBlockingQueue<Long>();
private AtomicLong counter = new AtomicLong(0);
@SuppressWarnings("unused")
private Thread bootstrapThread = bootstrapConsumers();
@Override
public ServerSessionPool getServerSessionPool() throws JMSException {
return spool;
}
@Override
public void close() throws JMSException {
closed = true;
}
public Thread bootstrapConsumers() {
Thread coordinator = new Thread() {
public void run() {
setName("ConsumerCoordinator "+Thread.currentThread().getId());
long lastCnt = 0;
while (!closed) {
Long value = null;
try {
value = trigger.poll(1, TimeUnit.SECONDS);
}
catch (InterruptedException e) {}
if ((value!=null && counter.get() < max) && !closed) {
Thread th = new Thread() {
public void run() {
runConsumer();
}
};
th.setDaemon(true);
th.start();
}
long currentCnt = counter.get();
if (currentCnt!=lastCnt) {
System.out.println("Server session pool #"+spool.hashCode()+" has got active "+counter.get()+" session(s), max is "+max);
lastCnt = currentCnt;
}
}
}
};
coordinator.setDaemon(true);
coordinator.start();
trigger.add(System.currentTimeMillis());
return coordinator;
}
public void runConsumer() {
Thread.currentThread().setName("Connection consumer "+Thread.currentThread().getId());
counter.incrementAndGet();
ServerSession ss = null;
Session sess = null;
MessageListener appSrvMessageListener = null;
while (!closed) {
try {
if (ss==null) {
ss = spool.getServerSession(); /* app srv will block */
trigger.put(System.currentTimeMillis());
sess = ss.getSession();
appSrvMessageListener = sess.getMessageListener();
}
if (sess==null) {
sess = ss.getSession();
}
sess.setMessageListener(null);
MessageConsumer mc = (sub!=null) ?
sess.createDurableSubscriber((Topic) destination, sub, s, false) :
sess.createConsumer(destination, s);
while (!closed) {
LinkedList
int limit = 1; /* XA: one transaction per message */
for (int i=0; i < limit; i++) {
Message m = null;
try {
m = (i==limit-1) ? mc.receive(1000) : mc.receiveNoWait();
if (m!=null)
list.add(m);
}
catch (Exception e) {
System.out.println("Error while trying to receive message from JMS server, destination="+destination+", subscription="+sub+", selector="+s+": "+ e);
try {
sess.close();
}
catch (Exception ee) {}
sess = null;
break;
}
}
if (sess==null)
break;
if (!list.isEmpty()) {
synchronized (ss) {
for (Message m : list)
appSrvMessageListener.onMessage(m);
/*ss.start();*/
}
}
}
}
catch (Exception exception) {
System.out.println("Cannot consume JMS connection (but will retry), destination="+destination+", subscription="+sub+", selector="+s+": "+ exception);
try {
if (sess!=null)
sess.close();
}
catch (Exception e) {}
}
}
counter.decrementAndGet();
Logger.debug("EMS Connection Consumer closed (permanently), destination="+destination+", subscription="+sub+", selector="+s);
Logger.debug("Connection #"+hashCode()+" consumer counter is "+counter.get());
}
};
}
0 komentarze:
Prześlij komentarz