czwartek, grudnia 18, 2014

How to copy EMS datastore for interDC transport? Ought to be _fast_.

#ifndef _GNU_SOURCE
#define _GNU_SOURCE 
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <malloc.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <dlfcn.h>
#include <execinfo.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <errno.h>
#include <time.h>
#include <sys/syscall.h>
#include <inttypes.h>
#ifdef AIO_VIA_SYSCALL
#include <linux/aio_abi.h>
#else
#include <libaio.h>
#endif
#include <list>

#define SYSOUTF(msg, params...) do { char buff[1024]; memset(buff, 0, 1024); snprintf(buff, 1024, msg, params); write(1,buff,strlen(buff)); } while(0)
#define SYSOUT(msg) write(1, msg, strlen(msg))

#define ONE_MB (1024*1024)
#define ONE_GB (1024L*ONE_MB)

#define STRATEGY_MMAP_WRITE  0
#define STRATEGY_MMAP_BOTH   1
#define STRATEGY_SERIAL      2
#define STRATEGY_SENDFILE    3
#define STRATEGY_AIO         4

static const char* STRATEGY_DESC[] = { "MMAP+WRITE", "MMAPx2+MEMCPY", "READ+WRITE", "SENDFILE", "AIO" };

static struct timespec __ts, __ts_res;
static clockid_t __clock = CLOCK_REALTIME;

static int strategy = STRATEGY_MMAP_WRITE;
static int fd_in = -1, fd_out = -1, err_in = 0, err_out = 0;
static void *addr = NULL, *addr2 = NULL;
static long size_in = 0, blk_size = 0;
static int warn_mem_params_not_optimal = 1;

#define TIMER_START() do { timer_start(&__ts,__clock); } while(0)
#define TIMER_STOP() do { timer_stop(&__ts,__clock); clock_getres(__clock,&__ts_res); } while(0)
#define PRINT_WALL_TIME() do { long msec = __ts.tv_nsec/1000000L; SYSOUTF("[INFO] WALL TIME: %ldsec %ldmsec %ldusec +- %ldnsec\n",(long)__ts.tv_sec,msec,(__ts.tv_nsec - msec*1000000L)/1000L,__ts_res.tv_nsec); } while(0)

static inline int timer_start(struct timespec *ts, clockid_t clid) {
    int res = clock_gettime(clid,ts);
 if (res!=0) {
  int e = errno;
  SYSOUTF("Failed timer_start due to '%s'\r\n", strerror(e));
 }
    return res;
}

static inline int timer_stop(struct timespec *ts, clockid_t clid) {
    struct timespec ts1;
    int res = clock_gettime(clid,&ts1);
    if (res!=0) {
        int e = errno;
        SYSOUTF("Failed timer_stop due to '%s'\r\n", strerror(e));
  return res;
    }
    ts->tv_sec = ts1.tv_sec - ts->tv_sec;
    ts->tv_nsec = ts1.tv_nsec - ts->tv_nsec;
    if (ts->tv_nsec < 0) {
  ts->tv_nsec += 1000000000L;
        ts->tv_sec--;
    }
    return 0;
}

long get_rmem_limit() {
    struct rlimit rlimit;
    int res = getrlimit(RLIMIT_MEMLOCK, &rlimit);
    if (res == 0) {
  return (long)rlimit.rlim_max;
    }
    return -1L;
}

int open_source_file(char *path) {
    int res = -1;
    int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0;
    do {
  res = open(path, O_LARGEFILE | O_RDONLY | extra_flags);
    }
    while (res == -1 && errno == EINTR);    
    return res;
}

int open_dest_file(char *path) {
    int res = -1;
    int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0;
    do {
  res = open(path, O_CREAT | O_LARGEFILE | O_RDWR | extra_flags, 0666);
    }
    while (res == -1 && errno == EINTR);
    return res;
}

long get_file_size(int fd, long *block_size) {
    struct stat stat;
    TIMER_START();
    int res = fstat(fd, &stat);
    if (res == 0) {
  if (block_size != NULL)
   *block_size = stat.st_blksize;
  return stat.st_size;
    }
    return -1L;
}

int allocate_file(int fd, long size) {
    return fallocate(fd, 0, 0, size);
}

void preload_file_advise(int fd, long size, off_t base_offset) {
 int res = posix_fadvise(fd, base_offset, size, POSIX_FADV_WILLNEED | POSIX_FADV_SEQUENTIAL);
 if (res != 0) {
  int e = errno;
  SYSOUTF("[WARN] POSIX fadvise returned error '%s'\r\n", strerror(e));
 }
}

void* mmap_file(int fd, long size, int mode, off_t base_offset = 0) {
    preload_file_advise(fd, size, base_offset);
    int PERF_FLAGS[] = { MAP_HUGETLB | MAP_LOCKED, MAP_HUGETLB, MAP_LOCKED, 0 };
    void *addr = NULL;
    int i = 0;
    for (; i < 4; i++) {
  addr = mmap(NULL, size, mode == 0 ? PROT_READ : PROT_WRITE, MAP_SHARED | MAP_POPULATE | PERF_FLAGS[i], fd, base_offset);
  if (addr!= (void*)-1)
   break;
    }
    if (addr == (void*)-1)
  return NULL;
    if (i!=0 && warn_mem_params_not_optimal) {
  SYSOUT("[WARN] Memory access is not optimal (no huge TLB & pages locked into RAM)\r\n");
  warn_mem_params_not_optimal = 0;
    }
    int res = posix_madvise(addr, size, POSIX_MADV_WILLNEED | POSIX_MADV_SEQUENTIAL);
    if (res != 0) {
  int e = errno;
  SYSOUTF("[WARN] POSIX madvise returned error '%s'\r\n", strerror(e));
    }
    res = mlock(addr, size);
    if (res != 0) {
     int e = errno;
     SYSOUTF("[WARN] POSIX mlock returned error '%s'\r\n", strerror(e));
    }
    return addr;
}

int write_file(int fd, void *buff, long len, long base_off = 0) {
    long to_be_written = len;
    long off = 0;
    do {
  long res = pwrite(fd, buff, to_be_written, off + base_off);
  if (res > 0) {
   to_be_written -= res;
   off += res;
   if (to_be_written == 0L)
    break;
  }
  else if (res < 0 && errno != EINTR)
   break;
 }
    while (errno == EINTR);
    TIMER_STOP();
    if (errno == 0)
  fdatasync(fd); 
    return to_be_written == 0 ? 0 : errno;
}    

#define CLEANUP_EXIT(code) { if (addr) munmap(addr, size_in); if (addr2) munmap(addr2, size_in); if (fd_in!=-1) close(fd_in); if (fd_out!=-1) close(fd_out); exit(code); } while(0)
#define CLEANUP_MMAP(size) { if (addr) { munmap(addr, size); addr = NULL; } if (addr2) { munmap(addr2, size); addr2 = NULL; } } while(0)

void open_files(char **argv) {
    long mlock_max = get_rmem_limit();
    SYSOUTF("[INFO] RLIMIT_MEMLOCK is %ldKB,\r\n", mlock_max/1024);
    if (mlock_max < 128*1024*1024) {
  SYSOUTF("[WARN] Please adjust memlock in /etc/security/limits.conf to something big like %d (KB), which is 32GB.\r\n", 32*ONE_MB);
    }
    fd_in = open_source_file(argv[1]); err_in = errno;
    if (fd_in == -1) {
  SYSOUTF("[ERROR] Cannot open source file %s due to error '%s'\r\n", argv[1], strerror(err_in));
  CLEANUP_EXIT(-1);
    }
    fd_out = open_dest_file(argv[2]); err_out = errno;
    if (fd_out == -1) {
  SYSOUTF("[ERROR] Cannot open destination file %s due to error '%s'\r\n", argv[2], strerror(err_out));
  CLEANUP_EXIT(-2);
    }    
    size_in = get_file_size(fd_in, &blk_size); err_in = errno;
    if (size_in == -1L) {
  SYSOUTF("[ERROR] Unable determine file size for %s due to error '%s'\r\n", argv[1], strerror(err_in));
  CLEANUP_EXIT(-3);
    }
    SYSOUTF("[INFO] File size for %s is %ld, block size is %ld.\r\n", argv[1], size_in, blk_size);
    if (allocate_file(fd_out, size_in) != 0) {
  err_out = errno;
  SYSOUTF("[ERROR] Cannot allocate file %s due to error '%s'\r\n", argv[2], strerror(err_out));
  CLEANUP_EXIT(-4);
    }
}

#define MMAP_IN  0
#define MMAP_OUT 1

#ifndef MMAP_CHUNK_SIZE
#define MMAP_CHUNK_SIZE  (128*1024*1024)
#endif

void mmap_copy(char **argv) {
 for (long base = 0; base < size_in; base += MMAP_CHUNK_SIZE) {
  long size = size_in - base;
  if (size > MMAP_CHUNK_SIZE)
   size = MMAP_CHUNK_SIZE;
  else if (size <= 0)
   break;
   
  addr = mmap_file(fd_in, size, MMAP_IN, base);
  if (addr == NULL) {
   err_in = errno;
   SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[1], strerror(err_in));
   CLEANUP_EXIT(-5);
  }
  if (strategy == STRATEGY_MMAP_WRITE) {
   int res = write_file(fd_out, addr, size, base);
   if (res!=0) {
    SYSOUTF("[ERROR] Could not write file %s due to error '%s'\r\n", argv[2], strerror(res));
    CLEANUP_EXIT(-6);
   }
  }
  else if (strategy == STRATEGY_MMAP_BOTH) {
   addr2 = mmap_file(fd_out, size, MMAP_OUT, base);
   if (addr2 == NULL) {
    err_out = errno;
    SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[2], strerror(err_out));
    CLEANUP_EXIT(-7);
   }
   else {
    memcpy(addr2, addr, size);
    TIMER_STOP();
    if (msync(addr2, size, MS_SYNC) != 0) {
     err_out = errno;
     SYSOUTF("[ERROR] Cannot sync file %s due to error '%s'\r\n", argv[2], strerror(err_out));
     CLEANUP_EXIT(-8);
    }
   }
  }
  CLEANUP_MMAP(size);
 }
 TIMER_STOP();
}

int sendfile_copy() {
 err_out = 0;
 long to_write = size_in;
 long written_once = 0;
 while (to_write > 0) {
  written_once = sendfile(fd_out, fd_in, NULL, to_write);
  if (written_once == -1) {
   err_out = errno;
   if (err_out != EAGAIN) {
    SYSOUTF("[ERROR] Function sendfile is not supported '%s'\r\n", strerror(err_out));
    return -1;
   }
   else
    continue;
  }
  else {
   to_write -= written_once;
  }
 }
 if (to_write != 0) {
  SYSOUTF("[ERROR] Function sendfile written too small data: %ld vs %ld, error=%s\r\n", size_in - to_write, size_in, strerror(err_out));
  return -1;
 }
 TIMER_STOP();
 return 1;    
}    

#ifndef COPY_BLOCK_SIZE
#define COPY_BLOCK_SIZE 4096
#endif

void serial_copy() {
    char buff[COPY_BLOCK_SIZE];
    long in = 0;
    long out = 0;
    long cnt = 0;
    preload_file_advise(fd_in, size_in, 0);
    do {
  in=read(fd_in, (void*)buff, COPY_BLOCK_SIZE);
  if (in>0) {
   out = in;
   do {
    cnt = write(fd_out, buff, out);
    err_out = errno;
    if (cnt < 0 && err_out != EINTR) {
     SYSOUTF("[ERROR] Broken writing to output file '%s'\r\n", strerror(err_out));
     CLEANUP_EXIT(-10);
    }
    else if (cnt > 0) {
     out -= cnt;
    }
   }
   while (out > 0);
  }
  else {
   if (errno == 0) {
    if (in==0)
     continue;
    else
     break;
   }
   if (errno != EINTR) {
    err_in = errno;
    SYSOUTF("[ERROR] Broken reading of input file '%s'\r\n", strerror(err_in));
    CLEANUP_EXIT(-9); 
   }
   else
    continue;
  }
    }
    while (in > 0); 
 TIMER_STOP();
}

#define PG_SIZE 4096
#define AIO_MAX_NR 1024

#ifdef AIO_VIA_SYSCALL
inline int io_setup(unsigned nr, aio_context_t *ctxp) {
 return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_destroy(aio_context_t ctx) {
 return syscall(__NR_io_destroy, ctx);
}
inline int io_submit(aio_context_t ctx, long nr, struct iocb **cbp) {
 return syscall(__NR_io_submit, nr, cbp);
}
inline int io_cancel(aio_context_t ctx, struct iocb *cb, struct io_event *result) {
 return syscall(__NR_io_cancel, cb, result);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *timeout) {
 return syscall(__NR_io_getevents, min_nr, nr, events, timeout);
}
inline void io_prep_pread(struct iocb *cb, int fd, void *buff, long size, long offset) {
 cb->aio_fildes = fd;
 cb->aio_lio_opcode = IOCB_CMD_PREAD;
 cb->aio_buf = (uint64_t)buff;
 cb->aio_offset = offset;
 cb->aio_nbytes = size;
}
#else
#define IOCB_CMD_PREAD  0
#define IOCB_CMD_PWRITE  1
#define IOCB_CMD_FSYNC  2
#define IOCB_CMD_FDSYNC  3
#define IOCB_CMD_NOOP  6
#define aio_context_t io_context_t
#endif

void aio_copy() {
 int chunks = (size_in + (PG_SIZE/2) - 1) / PG_SIZE; /* with padding */
 SYSOUTF("[INFO] AIO copy needs %d chunks\r\n", chunks);
 aio_context_t ctx = 0;
 struct iocb* cbs = (struct iocb*)malloc(AIO_MAX_NR * sizeof(struct iocb));
 struct iocb** cbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*));
 struct iocb** wcbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*));
 struct io_event* ioevents = (struct io_event*)malloc(AIO_MAX_NR * sizeof(struct io_event));

 if (cbs==NULL || cbps==NULL || wcbps==NULL || ioevents==NULL) {
  int ret = errno;
  SYSOUTF("[ERROR] Broken malloc '%s'\r\n", strerror(ret));
  CLEANUP_EXIT(-12);
 } 
 
 int ret = io_setup(AIO_MAX_NR, &ctx);
 if (ret < 0) {
  ret = errno;
  SYSOUTF("[ERROR] Broken io_setup for %d chunks '%s'\r\n", chunks, strerror(ret));
  CLEANUP_EXIT(-11); 
 }
 void *bigbuff = malloc(AIO_MAX_NR*PG_SIZE);
#define DELETE_MEM do { if (bigbuff!=NULL) free(bigbuff); free(cbs); free(cbps); free(wcbps); free(ioevents); io_destroy(ctx); } while(0)
 if (bigbuff == NULL) {
  ret = errno;
  SYSOUTF("[ERROR] Buffer allocation failed for input file: '%s'\r\n", strerror(ret));
  DELETE_MEM;
  CLEANUP_EXIT(-12); 
 }
 
 int rounds = (chunks + AIO_MAX_NR/2 - 1) / AIO_MAX_NR; /* with padding */
#define AIO_GROUP_TO_OFFSET(i) (long(i) * PG_SIZE * AIO_MAX_NR)
#define AIO_STEP_TO_OFFSET(i) (long(i) * PG_SIZE)
 for (int i=0; i < AIO_MAX_NR; i++) {
  memset(&cbs[i], 0, sizeof(struct iocb));
  cbps[i] = &cbs[i];
 }

 for (int group=0; group < rounds; group++) {
  int step = 0;
  for (; step < AIO_MAX_NR; step++) {
   long curr_off = AIO_GROUP_TO_OFFSET(group) + AIO_STEP_TO_OFFSET(step);
   long nbytes = size_in - curr_off;
   if (nbytes <= 0) /* we are inside padding */
    break;
   if (nbytes > PG_SIZE)
    nbytes = PG_SIZE;
 
   long pbuff = (long)bigbuff + AIO_STEP_TO_OFFSET(step);
   io_prep_pread(&cbs[step], fd_in, (void*)pbuff, nbytes, curr_off);
  }
  if (step == 0)
   break;
  int todo = io_submit(ctx, step, cbps);
  if (todo != step) {
   ret = errno;
   SYSOUTF("[ERROR] Broken io_submit, group=%d, queued=%d; '%s'\r\n", group, todo >= 0 ? todo : -1, todo < 0 ? strerror(ret) : "failure for given index");
   DELETE_MEM;
   CLEANUP_EXIT(-13); 
  }
  
  int wi = 0;
  while (todo > 0) {
   ret = io_getevents(ctx, 1, todo, ioevents, NULL);
   if (ret < 0) {
    ret = errno;
    SYSOUTF("[ERROR] Broken io_getevents '%s'\r\n", strerror(ret));
    DELETE_MEM;
    CLEANUP_EXIT(-14); 
   }
   else if (ret > 0) {
    todo -= ret;
    int to_write = 0;
    for (int i=0; i < ret; i++) {
     struct iocb *iocbp = (struct iocb*) ioevents[i].obj;
     if (iocbp->aio_lio_opcode == IOCB_CMD_PREAD) {
      iocbp->aio_fildes = fd_out;
      iocbp->aio_lio_opcode = IOCB_CMD_PWRITE;
      wcbps[wi++] = iocbp;
      to_write++;
     }
    }
    if (to_write > 0) {
     int e = io_submit(ctx, to_write, &wcbps[wi - to_write]); /* move along linearly */
     if (e != to_write) {
      ret = errno;
      SYSOUTF("[ERROR] Broken io_submit for write '%s'\r\n", e < 0 ? strerror(ret) : "failure for given index");
      DELETE_MEM;
      CLEANUP_EXIT(-15); 
     }  
     else
      todo += to_write;
    }
   }
  }
 }
 DELETE_MEM;
 TIMER_STOP();
}

int main(int argc, char **argv) {

 if (argc < 3) {
  SYSOUTF("Usage: %s SRC_FILE DST_FILE --strategy={mmap,serial,sendfile,aio,default}\r\n", argv[0]);
  return 0;
 }
 if (argc == 4 && strncmp(argv[3], "--strategy=mmap", 15) == 0)
  strategy = STRATEGY_MMAP_BOTH;
 else if (argc == 4 && strncmp(argv[3], "--strategy=serial", 17) == 0)
  strategy = STRATEGY_SERIAL;
 else if (argc == 4 && strncmp(argv[3], "--strategy=sendfile", 19) == 0)
  strategy = STRATEGY_SENDFILE;
 else if (argc == 4 && strncmp(argv[3], "--strategy=aio", 14) == 0)
  strategy = STRATEGY_AIO;
    
 open_files(argv);
 if (strategy == STRATEGY_SERIAL)
  serial_copy();
 else if (strategy == STRATEGY_SENDFILE)
  sendfile_copy();
 else if (strategy == STRATEGY_AIO)
  aio_copy();
 else
  mmap_copy(argv);
    
 SYSOUTF("\r\n[INFO] Success. Strategy was %s\r\n", STRATEGY_DESC[strategy]);
 PRINT_WALL_TIME();
 CLEANUP_EXIT(0);
 return 0;
}

niedziela, grudnia 14, 2014

Jak oglądać telewizję gdy niemowlak śpi?



Transmiter Bluetooth Audio (RCA/Jack) podłączony do tunera albo telewizora + słuchawki BT. Urządzenie można zasilać z portu USB telewizora/tunera lub ładowarki do komórki. Gdy transmiter jest podłączony do tunera należy go włączyć i wyciszyć telewizor za pomocą pilota. W przypadku podłączenia do telewizora należy wpiąć w TV kabel Jack, telewizor wyłączy wtedy głośniki. Na zwykłych słuchawkach bez kodeka aptx (wirtualnie wszystkie dostępne na Allegro słuchawki bezprzewodowe robione przez Chińczyków wystawiających towar na alibaba.com) jest lag do 1s, ale można z tym żyć. Porządne słuchawki zapewniające dobrą synchronizację są co najmniej 2 razy droższe. Rozwiązanie jest bardzo fajne, ale ma minus - jest jednoosobowe. Transmiter FM i dwie sztuki słuchawek z radiem mają sporo gorszą jakość dźwięku.

piątek, grudnia 12, 2014

EMS DS tracer

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <dlfcn.h>
#include <execinfo.h>
#include <pthread.h>
#include <list>

const char interp[] __attribute__((section(".interp"))) = "/lib64/ld-linux-x86-64.so.2";

typedef int (*open_fn)(const char *path, int flags, ...);
typedef ssize_t (*write_fn)(int fd, const void *buf, size_t count);
typedef ssize_t (*writev_fn)(int fd, const struct iovec *iov, int count);
typedef ssize_t (*pwritev_fn)(int fd, const struct iovec *iov, int count, off_t offset);
typedef int (*ftruncate_fn)(int fd, off_t length);

static open_fn _open = NULL;
static open_fn _open64 = NULL;
static write_fn _write = NULL;
static writev_fn _writev = NULL;
static pwritev_fn _pwritev = NULL;
static ftruncate_fn _ftruncate = NULL;
static void *handle_libc = NULL;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

#define SYSOUTF(msg, params...) do { char buff[1024]; memset(buff, 0, 1024); snprintf(buff, 1024, msg, params); _write(1,buff,strlen(buff)); } while(0)
#define SYSOUT(msg) _write(1, msg, strlen(msg))

void __attribute__((constructor)) __init(void)
{
    if (handle_libc==NULL)
handle_libc = dlopen ("/lib64/libc.so.6", RTLD_NOW);
    if (handle_libc!=NULL) {
_open = (open_fn) dlsym(handle_libc, "open");
_open64 = (open_fn) dlsym(handle_libc, "open64");
_write = (write_fn) dlsym(handle_libc,"write");
_writev = (writev_fn) dlsym(handle_libc,"writev");
_pwritev = (pwritev_fn) dlsym(handle_libc,"pwritev");
_ftruncate = (ftruncate_fn) dlsym(handle_libc, "ftruncate");
SYSOUT("libemscompanion init completed\r\n");
    }
    else {
exit(-1);
    }
}

void __attribute__((destructor)) __fini(void)
{
    if (handle_libc!=NULL)
dlclose(handle_libc);
}

extern "C" {

    int open(const char *path, int flags, ...) {
va_list vargs;
va_start(vargs, flags);
mode_t mode = va_arg(vargs, mode_t);
va_end(vargs);
int ret = _open(path, flags, mode);
SYSOUTF("Open %s = %d\r\n", path, ret);
return ret;
    }
   
    int open64(const char *path, int flags, ...) {
va_list vargs;
va_start(vargs, flags);
mode_t mode = va_arg(vargs, mode_t);
va_end(vargs);
int ret = _open(path, flags, mode);
SYSOUTF("Open %s = %d\r\n", path, ret);
return ret;
    }
   
    ssize_t write(int fd, const void *buf, size_t count) {
off_t curr = lseek(fd, 0, SEEK_CUR);
SYSOUTF("Writing fd = %d, addr = %d, buff = %x, size = %d\r\n", fd, curr, buff, count);
ssize_t ret = _write(fd, buf, count);
return ret;
    }
}

extern "C" void _main(int argc, char **argv) {
    __init();
    SYSOUT("libEMSCompanion 0.0.1\r\n");
    exit(0);
}

EMS File Datastore as a sequential tx log

Writing fd = 9, addr = 24834560, buff = 8c405540, size = 512 (chunk header, const offset, every 1910272 bytes)
Writing fd = 9, addr = 22405120, buff = 8c405710, size = 861696 (full message with headers and body)
Writing fd = 9, addr = 23266816, buff = 8c405710, size = 861696
Writing fd = 9, addr = 26744832, buff = 8c405540, size = 512
Writing fd = 9, addr = 24128512, buff = 8c405710, size = 861696
Writing fd = 9, addr = 24990208, buff = 89c01820, size = 512
Writing fd = 9, addr = 1024, buff = 89c01820, size = 512
Writing fd = 9, addr = 1536, buff = 89c01820, size = 512
Writing fd = 9, addr = 24990720, buff = 89c01820, size = 512 (Client ACK record)
Writing fd = 9, addr = 24991232, buff = 89c01820, size = 512
Writing fd = 9, addr = 24991744, buff = 89c01820, size = 512
Writing fd = 9, addr = 24992256, buff = 89c01820, size = 512
Writing fd = 9, addr = 24992768, buff = 89c01820, size = 512
Writing fd = 9, addr = 24993280, buff = 89c01820, size = 512
Writing fd = 9, addr = 24993792, buff = 89c01820, size = 512
Writing fd = 9, addr = 24994304, buff = 89c01820, size = 512
Writing fd = 9, addr = 24994816, buff = 89c01820, size = 512
Writing fd = 9, addr = 24995328, buff = 89c01820, size = 512
Writing fd = 9, addr = 24995840, buff = 89c01820, size = 512
Writing fd = 9, addr = 24996352, buff = 89c01820, size = 512
Writing fd = 9, addr = 24996864, buff = 89c01820, size = 512
Writing fd = 9, addr = 24997376, buff = 89c01820, size = 512
Writing fd = 9, addr = 24997888, buff = 89c01820, size = 512
Writing fd = 9, addr = 24998400, buff = 89c01820, size = 512
Writing fd = 9, addr = 24998912, buff = 89c01820, size = 512
Writing fd = 9, addr = 24999424, buff = 89c01820, size = 512
Writing fd = 9, addr = 24999936, buff = 89c01820, size = 512
Writing fd = 9, addr = 25000448, buff = 89c01820, size = 512
Writing fd = 9, addr = 25000960, buff = 89c01820, size = 512
Writing fd = 9, addr = 25001472, buff = 89c01820, size = 512
Writing fd = 9, addr = 25001984, buff = 89c01820, size = 512
Writing fd = 9, addr = 25002496, buff = 89c01820, size = 512
Writing fd = 9, addr = 25003008, buff = 89c01820, size = 512
Writing fd = 9, addr = 25003520, buff = 89c01820, size = 512
Writing fd = 9, addr = 25004032, buff = 79ffa800, size = 512
Writing fd = 9, addr = 25004544, buff = 79ffa750, size = 512
Writing fd = 9, addr = 512, buff = 8c405710, size = 861696 (full message with headers and body, space reused)

Full messages and ACK records are laid out sequentially and interwoven. ACK doesn't clean up message header. This design under heavy load may lead to excessive datastore usage and need for frequent compaction maintenance.

JMS headers with almost nothing set take 213 bytes. Minus 28 bytes for JMSMessageID it is 185 bytes on disk.

Executable Linux shared library

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

const char interp[] __attribute__((section(".interp"))) = "/lib64/ld-linux-x86-64.so.2";

void test() {
        printf("test\r\n");
}

extern "C" void _main(int argc, char **argv) {
        printf("main\r\n");
        test();
        exit(0);
}

user@user-Aspire-4530 ~/DEV $ g++ -shared -fPIC -o libtest.so libtest.c -Wl,-e,_main
user@user-Aspire-4530 ~/DEV $ LD_PRELOAD=./libtest.so ls
libtest.c  libtest.o  libtest.so
user@user-Aspire-4530 ~/DEV $ ./libtest.so 
main
test
user@user-Aspire-4530 ~/DEV $ readelf -l /bin/ls | grep interpreter

środa, grudnia 10, 2014

Maurice Moss


Hejterzy PO






Master Chef


Politechnika


poniedziałek, grudnia 08, 2014

Tibco BW Generic XPath Extractor


czwartek, listopada 20, 2014

Beaujolois nouveau

piątek, listopada 14, 2014

Strona głowna Onetu


Proponuję zmiany UX. Oryginalny wygląd u dołu.

wtorek, września 30, 2014

JBossTS recovery: look inside

10:40:44,801 (Thread-18) PeriodicRecovery: background thread Status <== SCANNING
10:40:44,801 (Thread-18) PeriodicRecovery: background thread scanning
10:40:44,802 (Thread-18) Periodic recovery - first pass <Wt, 30 wrz 2014 10:40:44>
10:40:44,802 (Thread-18) InputObjectState::InputObjectState()
10:40:44,802 (Thread-18) StatusModule: first pass
10:40:44,802 (Thread-18) HashedStore.allObjUids(/StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, InputObjectState Uid   : 0:0:0:0
InputObjectState Type  : null
InputObjectState Size  : 0
InputObjectState Buffer: , 0)
10:40:44,802 (Thread-18) OutputObjectState::OutputObjectState()
10:40:44,890 (Thread-18) processing /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction transactions
10:40:44,890 (Thread-18) found transaction -3f57c7ff:ecce:542a6c31:53b
10:40:44,890 (Thread-18) found transaction -3f57c7ff:ebbb:542a6bef:a98
10:40:44,890 (Thread-18) found transaction -3f57c7ff:ed72:542a6c5e:50c
...

10:40:44,891 (Thread-18) [com.arjuna.ats.internal.txoj.recovery.TORecoveryModule_3] - TORecoveryModule - first pass
10:40:44,891 (Thread-18) InputObjectState::InputObjectState()
10:40:44,891 (Thread-18) FileSystemStore.allTypes(InputObjectState Uid   : 0:0:0:0
InputObjectState Type  : null
InputObjectState Size  : 0
InputObjectState Buffer: )
10:40:44,892 (Thread-18) OutputObjectState::OutputObjectState()
10:40:44,892 (Thread-18)
10:40:44,896 (Thread-18) PERIODIC 1. PASS ON STORE  WITH TYPE /StateManager/AbstractRecord/XAResourceRecord
10:40:44,896 (Thread-18) [com.arjuna.ats.internal.jta.recovery.info.firstpass] Local XARecoveryModule - first pass
10:40:44,896 (Thread-18) InputObjectState::InputObjectState()
10:40:44,897 (Thread-18) FileSystemStore.allObjUids(/StateManager/AbstractRecord/XAResourceRecord, InputObjectState Uid   : 0:0:0:0
InputObjectState Type  : null
InputObjectState Size  : 0
InputObjectState Buffer: , 0)
10:40:44,897 (Thread-18) OutputObjectState::OutputObjectState()
10:45:42,120 (Thread-18) Periodic recovery - second pass <Wt, 30 wrz 2014 10:45:42>
10:45:42,120 (Thread-18) AtomicActionRecoveryModule: Second pass
10:45:42,136 (Thread-18) +---- RESOURCE INITIATED RECOVERY
10:45:42,136 (Thread-18) +--- RES RECOVERER: org.jboss.jms.server.recovery.MessagingXAResourceRecovery@7c9a899d
10:45:42,136 (Thread-18) org.jboss.jms.server.recovery.MessagingXAResourceRecovery@7c9a899d hasMoreResources
10:45:42,136 (Thread-18) org.jboss.jms.server.recovery.MessagingXAResourceRecovery@7c9a899d getXAResource
10:45:42,136 (Thread-18)  +-- CALLING XARECOVERY
10:45:42,136 (Thread-18) xarecovery of org.jboss.jms.server.recovery.MessagingXAResourceWrapper@53673666
10:45:42,136 (Thread-18) Recover java:DefaultEMSProvider
10:45:43,728 (Thread-18) [EMS] XA Recovery #15 sees 58 XIDs: [{formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:efbe:542a6d09:858-3f57c7ff:efbe:542a6d09:9aa}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ecce:542a6c31:53b-3f57c7ff:ecce:542a6c31:693}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1dad-3f57c7ff:f885:542a6d79:1efb}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ecce:542a6c31:537-3f57c7ff:ecce:542a6c31:688}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:514-3f57c7ff:ed72:542a6c5e:66d}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1dd0-3f57c7ff:f885:542a6d79:1f13}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ef42:542a6ce0:26a-3f57c7ff:ef42:542a6ce0:3e1}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:4f5-3f57c7ff:ed72:542a6c5e:63c}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:16b-3f57c7ff:eb5c:542a6bc7:2c3}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1dbe-3f57c7ff:f885:542a6d79:1f14}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ee22:542a6c8c:333-3f57c7ff:ee22:542a6c8c:48d}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:166-3f57c7ff:eb5c:542a6bc7:2b8}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ebbb:542a6bef:aaf-3f57c7ff:ebbb:542a6bef:bfc}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eafe:542a6b9a:160-3f57c7ff:eafe:542a6b9a:2c8}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ef42:542a6ce0:2a1-3f57c7ff:ef42:542a6ce0:3ec}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:efbe:542a6d09:863-3f57c7ff:efbe:542a6d09:9bd}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1d93-3f57c7ff:f885:542a6d79:1ee5}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eeb2:542a6cb6:47e-3f57c7ff:eeb2:542a6cb6:5cc}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:52c-3f57c7ff:ed72:542a6c5e:67e}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:1ad-3f57c7ff:eb5c:542a6bc7:2ef}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:efbe:542a6d09:879-3f57c7ff:efbe:542a6d09:9cb}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ecce:542a6c31:558-3f57c7ff:ecce:542a6c31:6a9}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:4ce-3f57c7ff:ed72:542a6c5e:626}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ee22:542a6c8c:304-3f57c7ff:ee22:542a6c8c:456}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ef42:542a6ce0:289-3f57c7ff:ef42:542a6ce0:3d6}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eafe:542a6b9a:183-3f57c7ff:eafe:542a6b9a:2d3}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:187-3f57c7ff:eb5c:542a6bc7:2d9}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eafe:542a6b9a:18c-3f57c7ff:eafe:542a6b9a:2e9}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ee22:542a6c8c:310-3f57c7ff:ee22:542a6c8c:461}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ee22:542a6c8c:31a-3f57c7ff:ee22:542a6c8c:471}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:4df-3f57c7ff:ed72:542a6c5e:647}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ebbb:542a6bef:a98-3f57c7ff:ebbb:542a6bef:be6}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:16a-3f57c7ff:eb5c:542a6bc7:2ce}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:196-3f57c7ff:eb5c:542a6bc7:2e4}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:503-3f57c7ff:ed72:542a6c5e:652}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ecce:542a6c31:54c-3f57c7ff:ecce:542a6c31:69e}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:4e7-3f57c7ff:ed72:542a6c5e:631}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1dd4-3f57c7ff:f885:542a6d79:1f27}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eafe:542a6b9a:176-3f57c7ff:eafe:542a6b9a:2bd}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1d9d-3f57c7ff:f885:542a6d79:1ef0}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ee22:542a6c8c:325-3f57c7ff:ee22:542a6c8c:477}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eeb2:542a6cb6:46d-3f57c7ff:eeb2:542a6cb6:5c1}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:4cd-3f57c7ff:ed72:542a6c5e:61b}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eeb2:542a6cb6:464-3f57c7ff:eeb2:542a6cb6:5b6}, {formatID=131075 gtrid_length=30 bqual_length=28 data=1--3f57c7ff:f885:542a6d79:1db2-3f57c7ff:f885:542a6d79:1f1c}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ef42:542a6ce0:28a-3f57c7ff:ef42:542a6ce0:3cb}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ee22:542a6c8c:33c-3f57c7ff:ee22:542a6c8c:482}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:efbe:542a6d09:86e-3f57c7ff:efbe:542a6d09:9c0}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eafe:542a6b9a:182-3f57c7ff:eafe:542a6b9a:2de}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:521-3f57c7ff:ed72:542a6c5e:673}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:19e-3f57c7ff:eb5c:542a6bc7:305}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ebbb:542a6bef:aa0-3f57c7ff:ebbb:542a6bef:bf9}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ef42:542a6ce0:2ac-3f57c7ff:ef42:542a6ce0:3f7}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:f7d0:542a6d3a:5af-3f57c7ff:f7d0:542a6d3a:700}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eb5c:542a6bc7:192-3f57c7ff:eb5c:542a6bc7:2fa}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:efbe:542a6d09:842-3f57c7ff:efbe:542a6d09:99f}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:ed72:542a6c5e:50c-3f57c7ff:ed72:542a6c5e:65d}, {formatID=131075 gtrid_length=29 bqual_length=27 data=1--3f57c7ff:eeb2:542a6cb6:48d-3f57c7ff:eeb2:542a6cb6:5d7}]
10:45:43,730 (Thread-18) Found 58 xids in doubt

10:45:47,213 (Thread-18) +---- RESOURCE INITIATED RECOVERY completed
10:45:47,213 (Thread-18) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: -3f57c7ff:ecce:542a6c31:540) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:ecce:542a6c31:691
10:45:47,213 (Thread-18) [com.arjuna.ats.arjuna.coordinator.BasicAction_20] - Unpacked a 171 record
10:45:47,213 (Thread-18) StateManager::StateManager( 0:0:0:0 )
10:45:47,213 (Thread-18) [com.arjuna.ats.arjuna.coordinator.AbstractRecord_1] - AbstractRecord::AbstractRecord () - crash recovery constructor
10:45:47,213 (Thread-18) | RECOVERY XIDS for org.jboss.jms.server.recovery.MessagingXAResourceWrapper@2c8b18f5 contains -1217944454 ? true
10:45:47,213 (Thread-18) +---- RESOURCE INITIATED RECOVERY completed
10:45:47,213 (Thread-18) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: -3f57c7ff:ecce:542a6c31:540 -3f57c7ff:ecce:542a6c31:691) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:ecce:542a6c31:694
10:45:47,213 (Thread-18) [com.arjuna.ats.arjuna.coordinator.BasicAction_20] - Unpacked a 463 record
10:45:47,214 (Thread-18) [com.arjuna.ats.arjuna.coordinator.BasicAction_22] - HeuristicList - Unpacked heuristic list size of 0
10:45:47,214 (Thread-18) [com.arjuna.ats.arjuna.coordinator.BasicAction_25] - Restored action status of ActionStatus.COMMITTING 6
10:45:47,214 (Thread-18) [com.arjuna.ats.arjuna.coordinator.BasicAction_26] - Restored action type Top-level 0
10:45:47,215 (Thread-18) [com.arjuna.ats.arjuna.coordinator.BasicAction_27] - Restored heuristic decision of TwoPhaseOutcome.PREPARE_OK 0
10:45:47,215 (Thread-18) [com.arjuna.ats.arjuna.recovery.RecoverAtomicAction_1] - RecoverAtomicAction.replayPhase2 recovering -3f57c7ff:ecce:542a6c31:53b ActionStatus is ActionStatus.COMMITTED
10:45:47,215 (Thread-18) BasicAction::phase2Commit() for action-id -3f57c7ff:ecce:542a6c31:53b
10:45:47,215 (Thread-18) BasicAction::doCommit (com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord@6fd5af22)
10:45:47,215 (Thread-18) XAResourceRecord.topLevelCommit for < 131075, 29, 27, 494545511025355995510210258101999910158535250975499514958535198455110253559955102102581019999101585352509754995149585351102 >
10:45:47,215 (Thread-18) [EMS] Recovery session will be created
10:45:47,216 (Thread-18) [EMS] Establishing handler for connection #-17

10:45:48,222 (Thread-18) BasicAction::updateState() for action-id -3f57c7ff:ecce:542a6c31:53b
10:45:48,222 (Thread-18) FileSystemStore.remove_committed(-3f57c7ff:ecce:542a6c31:53b, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction)
10:45:48,222 (Thread-18) ShadowingStore.remove_state(-3f57c7ff:ecce:542a6c31:53b, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:45:48,222 (Thread-18) HashedStore.genPathName(-3f57c7ff:ecce:542a6c31:53b, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_SHADOW)
10:45:48,222 (Thread-18) HashedStore.genPathName(-3f57c7ff:ecce:542a6c31:53b, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:45:48,222 (Thread-18) [com.arjuna.ats.internal.arjuna.objectstore.ShadowingStore_22] - ShadowingStore.currentState(-3f57c7ff:ecce:542a6c31:53b, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction) - returning ObjectStore.OS_COMMITTED
10:45:48,222 (Thread-18) HashedStore.genPathName(-3f57c7ff:ecce:542a6c31:53b, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:45:48,222 (Thread-18) FileSystemStore.openAndLock(C:\_dev\jboss-5.1.0.GA\server\default\data/tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#102#\-3f57c7ff_ecce_542a6c31_53b, FileLock.F_WRLCK, false)
10:45:48,223 (Thread-18) FileSystemStore.closeAndUnlock(C:\_dev\jboss-5.1.0.GA\server\default\data\tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#102#\-3f57c7ff_ecce_542a6c31_53b, null, null)
10:45:48,223 (Thread-18) [com.arjuna.ats.arjuna.recovery.RecoverAtomicAction_3] - RecoverAtomicAction.replayPhase2( -3f57c7ff:ecce:542a6c31:53b )  finished
10:45:48,223 (Thread-18) HashedStore.genPathName(-3f57c7ff:ef42:542a6ce0:26a, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_SHADOW)
10:45:48,223 (Thread-18) HashedStore.genPathName(-3f57c7ff:ef42:542a6ce0:26a, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:45:48,224 (Thread-18) [com.arjuna.ats.internal.arjuna.objectstore.ShadowingStore_22] - ShadowingStore.currentState(-3f57c7ff:ef42:542a6ce0:26a, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction) - returning ObjectStore.OS_COMMITTED
10:45:48,224 (Thread-18) InputObjectState::InputObjectState()
10:45:48,224 (Thread-18) HashedStore.allObjUids(/Recovery/TransactionStatusManager, InputObjectState Uid   : 0:0:0:0
InputObjectState Type  : null
InputObjectState Size  : 0
InputObjectState Buffer: , 0)
10:45:48,224 (Thread-18) OutputObjectState::OutputObjectState()

Transactional MDB on JBoss 5: look inside

10:36:49,975 (Connection consumer 120) BaseTransaction.begin
10:36:49,975 (Connection consumer 120) StateManager::StateManager( 2 )
10:36:49,971 (Connection consumer 115) [com.arjuna.ats.arjuna.coordinator.BasicAction_7] - BasicAction::addChildThread () action -3f57c7ff:eafe:542a6b9a:a2 adding Thread[Connection consumer 115,5,JBoss Pooled Threads] result = true
10:36:49,975 (Connection consumer 120) BasicAction::BasicAction()
10:36:49,976 (Connection consumer 120) BasicAction::Begin() for action-id -3f57c7ff:eafe:542a6b9a:ab
10:36:49,976 (Connection consumer 120) BasicAction::actionInitialise() for action-id -3f57c7ff:eafe:542a6b9a:ab
10:36:49,976 (Connection consumer 120) ActionHierarchy::ActionHierarchy(5)
10:36:49,976 (Connection consumer 120) ActionHierarchy::add(-3f57c7ff:eafe:542a6b9a:ab, 1)
10:36:49,976 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_6] - BasicAction::addChildThread () action -3f57c7ff:eafe:542a6b9a:ab adding Thread[Connection consumer 120,5,JBoss Pooled Threads]
10:36:49,976 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_7] - BasicAction::addChildThread () action -3f57c7ff:eafe:542a6b9a:ab adding Thread[Connection consumer 120,5,JBoss Pooled Threads] result = true
10:36:49,979 (Connection consumer 120) TransactionReaper::create ( 120000 )
10:36:49,979 (Connection consumer 120) TransactionReaper::insert ( BasicAction: -3f57c7ff:eafe:542a6b9a:ab status: ActionStatus.RUNNING, 300 )
10:36:49,979 (Connection consumer 120) ReaperElement::ReaperElement ( BasicAction: -3f57c7ff:eafe:542a6b9a:ab status: ActionStatus.RUNNING, 300 )
10:36:49,979 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) MessageEndpoint TestQueue started transaction=TransactionImple < ac, BasicAction: -3f57c7ff:eafe:542a6b9a:ab status: ActionStatus.RUNNING >
10:36:49,979 (Connection consumer 120) TransactionImple.enlistResource ( com.tibco.tibjms.XResource@4f9ee648 )
10:36:49,979 (Connection consumer 120) TransactionImple.getStatus
10:36:49,980 (Connection consumer 115) TransactionReaper::create ( 120000 )
10:36:49,980 (Connection consumer 120) StateManager::StateManager( 1 )
10:36:49,980 (Connection consumer 120) AbstractRecord::AbstractRecord (-3f57c7ff:eafe:542a6b9a:ad, 1)
10:36:49,971 (Thread-15) ReaperThread.run ()
10:36:49,980 (Connection consumer 120) XAResourceRecord.XAResourceRecord ( < 131075, 28, 26, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797589799 > )
10:36:49,980 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: empty) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:eafe:542a6b9a:ad
10:36:49,980 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) MessageEndpoint TestQueue enlisted=com.tibco.tibjms.XResource@4f9ee648
10:36:49,980 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) MessageEndpoint TestQueue in use by public abstract void javax.jms.MessageListener.onMessage(javax.jms.Message) Thread[Connection consumer 120,5,JBoss Pooled Threads]
10:36:49,980 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) MessageEndpoint TestQueue delivering
10:36:51,262 [org.jboss.ejb3.pool.StrictMaxPool] (Connection consumer 120) Acquired(true) strictMaxSize semaphore, remaining=0

10:36:51,262 INFO  [STDOUT] (Connection consumer 120) Received text: 2014-09-30T10:35:10.304+02:00 on test.fw1, id=ID:EMS-SERVER-DC2.2354542947B3299:101

10:36:52,330 (Connection consumer 120) TransactionImple.registerSynchronization
10:36:52,331 (Connection consumer 120) TransactionImple.enlistResource ( org.jboss.resource.adapter.jdbc.xa.XAManagedConnection@5c675382 )
10:36:52,331 (Connection consumer 120) TransactionImple.getStatus
10:36:52,331 (Connection consumer 120) AbstractRecord::AbstractRecord (-3f57c7ff:eafe:542a6b9a:1d1, 1)
10:36:52,331 (Connection consumer 120) XAResourceRecord.XAResourceRecord ( < 131075, 28, 27, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797584910048 > )
10:36:52,333 (Connection consumer 120) TransactionImple.enlistResource ( org.jboss.resource.adapter.jms.JmsXAResource@21f7e1c6 )
10:36:52,333 (Connection consumer 120) TransactionImple.getStatus
10:36:52,333 (Connection consumer 120) StateManager::StateManager( 1 )
10:36:52,333 (Connection consumer 120) AbstractRecord::AbstractRecord (-3f57c7ff:eafe:542a6b9a:1d7, 1)
10:36:52,333 (Connection consumer 120) XAResourceRecord.XAResourceRecord ( < 131075, 28, 27, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797584910054 > )
10:36:52,360 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: -3f57c7ff:eafe:542a6b9a:ad -3f57c7ff:eafe:542a6b9a:1d1) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:eafe:542a6b9a:1d7
10:36:52,361 (Connection consumer 120) TransactionImple.getStatus
10:36:52,361 (Connection consumer 120) TransactionImple.getStatus
10:36:52,392 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) Invocation took 2412ms: public abstract void javax.jms.MessageListener.onMessage(javax.jms.Message)
10:36:52,392 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) MessageEndpoint TestQueue in use by public abstract void javax.resource.spi.endpoint.MessageEndpoint.afterDelivery() throws javax.resource.ResourceException Thread[Connection consumer 120,5,JBoss Pooled Threads]
10:36:52,392 (Connection consumer 120) TransactionImple.equals
10:36:52,392 (Connection consumer 120) TransactionImple.getStatus
10:36:52,392 [org.jboss.ejb3.mdb.inflow.MessageInflowLocalProxy] (Connection consumer 120) MessageEndpoint TestQueue commit
10:36:52,392 (Connection consumer 120) BaseTransaction.commit
10:36:52,392 (Connection consumer 120) TransactionImple.commitAndDisassociate
10:36:52,392 (Connection consumer 120) SynchronizationImple.beforeCompletion

10:36:52,392 (Connection consumer 120) BasicAction::End() for action-id -3f57c7ff:eafe:542a6b9a:ab
10:36:52,392 (Connection consumer 120) BasicAction::prepare () for action-id -3f57c7ff:eafe:542a6b9a:ab
10:36:52,392 (Connection consumer 120) ShadowingStore.ShadowingStore( 14 )
10:36:52,392 (Connection consumer 120) ShadowNoFileLockStore.ShadowNoFileLockStore( 14 )
10:36:52,392 (Connection consumer 120) HashedStore.HashedStore( 14 )
10:36:52,392 (Connection consumer 120) HashedStore.HashedActionStore()
10:36:52,392 (Connection consumer 120) FileSystemStore.setupStore()
10:36:52,392 (Connection consumer 120) FileSystemStore.createHierarchy(C:\_dev\jboss-5.1.0.GA\server\default\data/tx-object-store\HashedActionStore\defaultStore\)
10:36:52,393 (Connection consumer 120) XAResourceRecord.topLevelPrepare for < 131075, 28, 26, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797589799 >
10:36:52,430 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: empty) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:eafe:542a6b9a:ad
10:36:52,430 (Connection consumer 120) XAResourceRecord.topLevelPrepare for < 131075, 28, 27, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797584910048 >
10:36:52,494 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: -3f57c7ff:eafe:542a6b9a:ad) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:eafe:542a6b9a:1d1
10:36:52,494 (Connection consumer 120) XAResourceRecord.topLevelPrepare for < 131075, 28, 27, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797584910054 >
10:36:52,556 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.RecordList_5] - RecordList::insert(RecordList: -3f57c7ff:eafe:542a6b9a:ad -3f57c7ff:eafe:542a6b9a:1d1) : appending /StateManager/AbstractRecord/XAResourceRecord for -3f57c7ff:eafe:542a6b9a:1d7
10:36:52,557 (Connection consumer 120) OutputObjectState::OutputObjectState(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction)
10:36:52,557 (Connection consumer 120) BasicAction::save_state ()
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_14] - BasicAction::save_state - next record to pack is a 171 record (/StateManager/AbstractRecord/XAResourceRecord) should save it? = true
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_15] - Packing a 171 record
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_14] - BasicAction::save_state - next record to pack is a 171 record (/StateManager/AbstractRecord/XAResourceRecord) should save it? = true
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_15] - Packing a 171 record
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_14] - BasicAction::save_state - next record to pack is a 171 record (/StateManager/AbstractRecord/XAResourceRecord) should save it? = true
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_15] - Packing a 171 record
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_16] - Packing a NONE_RECORD
10:36:52,557 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_19] - Packing action status of ActionStatus.COMMITTING
10:36:52,557 (Connection consumer 120) FileSystemStore.write_committed(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction)
10:36:52,557 (Connection consumer 120) ShadowingStore.write_state(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:36:52,557 (Connection consumer 120) HashedStore.genPathName(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:36:52,557 (Connection consumer 120) FileSystemStore.openAndLock(C:\_dev\jboss-5.1.0.GA\server\default\data/tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#97#\-3f57c7ff_eafe_542a6b9a_ab, FileLock.F_WRLCK, true)
10:36:52,557 (Connection consumer 120) FileSystemStore.createHierarchy(C:\_dev\jboss-5.1.0.GA\server\default\data/tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#97#\-3f57c7ff_eafe_542a6b9a_ab)
10:36:52,647 (Connection consumer 120) FileSystemStore.closeAndUnlock(C:\_dev\jboss-5.1.0.GA\server\default\data\tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#97#\-3f57c7ff_eafe_542a6b9a_ab, null, java.io.FileOutputStream@7e1b811b)
10:36:52,651 (Connection consumer 120) BasicAction::phase2Commit() for action-id -3f57c7ff:eafe:542a6b9a:ab
10:36:52,651 (Connection consumer 120) BasicAction::doCommit (com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord@4407393d)
10:36:52,651 (Connection consumer 120) XAResourceRecord.topLevelCommit for < 131075, 28, 26, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797589799 >
10:36:52,734 (Connection consumer 120) BasicAction::doCommit (com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord@32955ada)
10:36:52,734 (Connection consumer 120) XAResourceRecord.topLevelCommit for < 131075, 28, 27, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797584910048 >
10:36:52,911 (Connection consumer 120) BasicAction::doCommit (com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord@1cc05dc2)
10:36:52,911 (Connection consumer 120) XAResourceRecord.topLevelCommit for < 131075, 28, 27, 494545511025355995510210258101971021015853525097549857975897984551102535599551021025810197102101585352509754985797584910054 >
10:36:52,947 (Connection consumer 120) BasicAction::updateState() for action-id -3f57c7ff:eafe:542a6b9a:ab
10:36:52,947 (Connection consumer 120) FileSystemStore.remove_committed(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction)
10:36:52,947 (Connection consumer 120) ShadowingStore.remove_state(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:36:52,947 (Connection consumer 120) HashedStore.genPathName(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_SHADOW)
10:36:52,947 (Connection consumer 120) HashedStore.genPathName(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:36:52,947 (Connection consumer 120) [com.arjuna.ats.internal.arjuna.objectstore.ShadowingStore_22] - ShadowingStore.currentState(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction) - returning ObjectStore.OS_COMMITTED
10:36:52,947 (Connection consumer 120) HashedStore.genPathName(-3f57c7ff:eafe:542a6b9a:ab, /StateManager/BasicAction/TwoPhaseCoordinator/AtomicAction, ObjectStore.OS_ORIGINAL)
10:36:52,947 (Connection consumer 120) FileSystemStore.openAndLock(C:\_dev\jboss-5.1.0.GA\server\default\data/tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#97#\-3f57c7ff_eafe_542a6b9a_ab, FileLock.F_WRLCK, false)
10:36:52,948 (Connection consumer 120) FileSystemStore.closeAndUnlock(C:\_dev\jboss-5.1.0.GA\server\default\data\tx-object-store\HashedActionStore\defaultStore\StateManager\BasicAction\TwoPhaseCoordinator\AtomicAction\#97#\-3f57c7ff_eafe_542a6b9a_ab, null, null)
10:36:52,948 (Connection consumer 120) SynchronizationImple.afterCompletion
10:36:52,948 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_8] - BasicAction::removeChildThread () action -3f57c7ff:eafe:542a6b9a:ab removing TSThread:17
10:36:52,948 (Connection consumer 120) [com.arjuna.ats.arjuna.coordinator.BasicAction_9] -  BasicAction::removeChildThread () action -3f57c7ff:eafe:542a6b9a:ab removing TSThread:17 result = true

JBossTS stores

https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/4.3/html/Transactions_Programmers_Guide/ch02s03s02.html

<!-- (default is BasicLockStore) --> <property name="com.arjuna.ats.txoj.lockstore.lockStoreType" value="BasicPersistentLockStore"/>

https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/Transactions_Development_Guide/ch15.html

<property name="com.arjuna.ats.arjuna.coordinator.actionStore" value="HashedActionStore"/>

czwartek, września 25, 2014

Globalny wynalazek IBM-a

Elektrownia słoneczna.

piątek, września 19, 2014

EMS datastores

EMS handles datastores by using dedicated threads, having heavily utilized queues on separated stores is a wise step to dismiss 'slow clock' warnings.

JBoss 7.4 (RedHat EAP 6.3) and MDB XA

It is possible to setup XA on EAP 6.3, however there is a minor problem with recovery:

14:53:47,756 WARN  [com.arjuna.ats.jta] (Periodic Recovery) ARJUNA016037: Could not find new XAResource to use for recovering non-serializable XAResource XAResourceRecord < resource:null, txid:< formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffffac1211d8:-30ef1c07:541c1f35:24dc, node_name=1, branch_uid=0:ffffac1211d8:-30ef1c07:541c1f35:24fa, subordinatenodename=null, eis_name=java:/sun/XAQueueConnectionFactory >, heuristic: TwoPhaseOutcome.FINISH_OK, product: x/X, jndiName: java:/sun/XAQueueConnectionFactory com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord@1c554391 >
14:53:47,756 WARN  [com.arjuna.ats.jta] (Periodic Recovery) ARJUNA016038: No XAResource to recover < formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffffac1211d8:-30ef1c07:541c1f35:24dc, node_name=1, branch_uid=0:ffffac1211d8:-30ef1c07:541c1f35:24fa, subordinatenodename=null, eis_name=java:/sun/XAQueueConnectionFactory >
14:53:47,800 WARN  [com.arjuna.ats.jta] (Periodic Recovery) ARJUNA016037: Could not find new XAResource to use for recovering non-serializable XAResource XAResourceRecord < resource:null, txid:< formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffffac1211d8:-30ef1c07:541c1f35:2e9a, node_name=1, branch_uid=0:ffffac1211d8:-30ef1c07:541c1f35:2ea6, subordinatenodename=null, eis_name=java:/XAOracleDS >, heuristic: TwoPhaseOutcome.FINISH_OK, product: Oracle/Oracle Database 11g Express Edition Release 11.2.0.2.0 - Production, jndiName: java:/XAOracleDS com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord@2ebed222 >
14:53:47,800 WARN  [com.arjuna.ats.jta] (Periodic Recovery) ARJUNA016038: No XAResource to recover < formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffffac1211d8:-30ef1c07:541c1f35:2e9a, node_name=1, branch_uid=0:ffffac1211d8:-30ef1c07:541c1f35:2ea6, subordinatenodename=null, eis_name=java:/XAOracleDS >

Transaction log has orphaned entries, which do not exists on remote Resource Managers. We can live with it.

środa, września 10, 2014

Konfiguracja dbstore.properties dla EMS-a

hibernate.cache.use_second_level_cache=false
hibernate.current_session_context_class=thread
hiberate.cache.provider_class=org.hibernate.cache.NoCacheProvider
hibernate.connection.provider_class=org.hibernate.connection.C3P0ConnectionProvider
hibernate.c3p0.connectionCustomizerClassName=com.tibco.tibems.tibemsd.internal.db.DataStoreConnectionCustomizer

# Hibernate optimization
hibernate.bytecode.use_reflection_optimizer=true
hibernate.jdbc.batch_size=50
hibernate.id.new_generator_mappings=true
javax.persistence.validation.mode=NONE
hibernate.c3p0.forceIgnoreUnresolvedTransactions=false

# Pool config
hibernate.c3p0.acquireIncrement=1
hibernate.c3p0.checkoutTimeout=10000
hibernate.c3p0.maxPoolSize=12
hibernate.c3p0.minPoolSize=4
hibernate.c3p0.numHelperThreads=6
hibernate.c3p0.maxStatementsPerConnection=100
hibernate.c3p0.maxIdleTimeExcessConnections=900
hibernate.c3p0.maxIdleTime=0
hibernate.c3p0.acquireRetryAttempts=20
hibernate.c3p0.acquireRetryDelay=3000
hibernate.c3p0.idleConnectionTestPeriod=0
# use connection.isvalid from JDBC 4.0
#hibernate.c3p0.preferredTestQuery=select 1 from dual
hibernate.c3p0.testConnectionOnCheckin=false
hibernate.c3p0.testConnectionOnCheckout=true
hibernate.generate_statistics=false

hibernate.connection.oracle.jdbc.StreamBufferSize = 8192
# Column size settings for storing messages
org.hibernate.dialect.Oracle10gDialect.small_len = 1024
org.hibernate.dialect.Oracle9iDialect.small_len = 1024

wtorek, września 09, 2014

Jak działa dbstore w EMS-ie

Klasa Javy com.tibco.tibems.tibemsd.internal.db.CallableDataStore jest używana przez kod C++ (poprzez JNI) do trzymania bieżących danych sterujących oraz zapisywania i odczytywania komunikatów. Do zapisywania wszystkich obiektów służy Hibernate:

[2014-09-09 10:48:28,006] DEBUG [tibemsd] (AbstractBatcher.java:424) - insert into EMS_MESSAGES (MESSAGE_SEQNO, TYPE, PRIORITY, DELIVERYMODE, REDELIVERED, DELIVERY_COUNT, DESTINATION, DESTINATION_TYPE, EXPIRATION, TIMESTAMP, DELIVERYTIME, REPLYTO, REPLYTO_TYPE, USERTYPE, MSGID, CORRELATIONID, COMPRESSED, CLIENTFLAGS, ENCODING, ENCODED_PROPERTIES, ENCODED_SUPPRESS_CONSIDS, TXNID, ZONEID, ROUTESRC, ROUTESEQNO, ROUTECONSID, PRODUCERID, DELETED, MESSAGE_SIZE, SMALL_MESSAGE_BODY, LARGE_MESSAGE_BODY, STORE_ID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
[2014-09-09 10:48:28,006] TRACE [tibemsd] (NullableType.java:133) - binding '337' to parameter: 1
[2014-09-09 10:48:28,006] TRACE [tibemsd] (NullableType.java:133) - binding '3' to parameter: 2
[2014-09-09 10:48:28,006] TRACE [tibemsd] (NullableType.java:133) - binding '4' to parameter: 3
[2014-09-09 10:48:28,006] TRACE [tibemsd] (NullableType.java:133) - binding '2' to parameter: 4
[2014-09-09 10:48:28,006] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 5
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 6
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding 'test.fw1' to parameter: 7
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding '1' to parameter: 8
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 9
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding '1410252507549' to parameter: 10
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 11
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 12
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 13
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 14
[2014-09-09 10:48:28,007] TRACE [tibemsd] (NullableType.java:133) - binding 'ID:EMS-SERVER-DC2.23FC540EBDA24:2' to parameter: 15
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 16
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:133) - binding 'false' to parameter: 17
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 18
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 19
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 20
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 21
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 22
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 23
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:126) - binding null to parameter: 24
[2014-09-09 10:48:28,008] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 25
[2014-09-09 10:48:28,009] TRACE [tibemsd] (NullableType.java:133) - binding '0' to parameter: 26
[2014-09-09 10:48:28,009] TRACE [tibemsd] (NullableType.java:133) - binding '1' to parameter: 27
[2014-09-09 10:48:28,009] TRACE [tibemsd] (NullableType.java:133) - binding 'false' to parameter: 28
[2014-09-09 10:48:28,009] TRACE [tibemsd] (NullableType.java:133) - binding '123' to parameter: 29
[2014-09-09 10:48:28,009] TRACE [tibemsd] (NullableType.java:133) - binding '337' to parameter: 32
[2014-09-09 10:48:28,085] DEBUG [tibemsd] (AbstractBatcher.java:424) - select message0_.STORE_ID as STORE1_17_0_, message0_.MESSAGE_SEQNO as MESSAGE2_17_0_, message0_.TYPE as TYPE17_0_, message0_.PRIORITY as PRIORITY17_0_, message0_.DELIVERYMODE as DELIVERY5_17_0_, message0_.REDELIVERED as REDELIVE6_17_0_, message0_.DELIVERY_COUNT as DELIVERY7_17_0_, message0_.DESTINATION as DESTINAT8_17_0_, message0_.DESTINATION_TYPE as DESTINAT9_17_0_, message0_.EXPIRATION as EXPIRATION17_0_, message0_.TIMESTAMP as TIMESTAMP17_0_, message0_.DELIVERYTIME as DELIVER12_17_0_, message0_.REPLYTO as REPLYTO17_0_, message0_.REPLYTO_TYPE as REPLYTO14_17_0_, message0_.USERTYPE as USERTYPE17_0_, message0_.MSGID as MSGID17_0_, message0_.CORRELATIONID as CORRELA17_17_0_, message0_.COMPRESSED as COMPRESSED17_0_, message0_.CLIENTFLAGS as CLIENTF19_17_0_, message0_.ENCODING as ENCODING17_0_, message0_.ENCODED_PROPERTIES as ENCODED21_17_0_, message0_.ENCODED_SUPPRESS_CONSIDS as ENCODED22_17_0_, message0_.TXNID as TXNID17_0_, message0_.ZONEID as ZONEID17_0_, message0_.ROUTESRC as ROUTESRC17_0_, message0_.ROUTESEQNO as ROUTESEQNO17_0_, message0_.ROUTECONSID as ROUTECO27_17_0_, message0_.PRODUCERID as PRODUCERID17_0_, message0_.DELETED as DELETED17_0_, message0_.MESSAGE_SIZE as MESSAGE30_17_0_, message0_.SMALL_MESSAGE_BODY as SMALL31_17_0_, message0_.LARGE_MESSAGE_BODY as LARGE32_17_0_ from EMS_MESSAGES message0_ where message0_.STORE_ID=?
[2014-09-09 10:48:28,085] TRACE [tibemsd] (NullableType.java:133) - binding '247' to parameter: 1
[2014-09-09 10:48:28,087] TRACE [tibemsd] (NullableType.java:172) - returning '247' as column: MESSAGE2_17_0_
[2014-09-09 10:48:28,087] TRACE [tibemsd] (NullableType.java:172) - returning '3' as column: TYPE17_0_
[2014-09-09 10:48:28,087] TRACE [tibemsd] (NullableType.java:172) - returning '4' as column: PRIORITY17_0_
[2014-09-09 10:48:28,087] TRACE [tibemsd] (NullableType.java:172) - returning '2' as column: DELIVERY5_17_0_
[2014-09-09 10:48:28,087] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: REDELIVE6_17_0_
[2014-09-09 10:48:28,087] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: DELIVERY7_17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:172) - returning 'test.fw1' as column: DESTINAT8_17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:172) - returning '1' as column: DESTINAT9_17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: EXPIRATION17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:172) - returning '1410249505675' as column: TIMESTAMP17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: DELIVER12_17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:166) - returning null as column: REPLYTO17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: REPLYTO14_17_0_
[2014-09-09 10:48:28,088] TRACE [tibemsd] (NullableType.java:166) - returning null as column: USERTYPE17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:172) - returning 'ID:EMS-SERVER-DC2.32DC540EB2EE4:141' as column: MSGID17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:166) - returning null as column: CORRELA17_17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:172) - returning 'false' as column: COMPRESSED17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: CLIENTF19_17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:166) - returning null as column: ENCODING17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:166) - returning null as column: ENCODED21_17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:166) - returning null as column: ENCODED22_17_0_
[2014-09-09 10:48:28,089] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: TXNID17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: ZONEID17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:166) - returning null as column: ROUTESRC17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: ROUTESEQNO17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:172) - returning '0' as column: ROUTECO27_17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:172) - returning '1' as column: PRODUCERID17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:172) - returning 'false' as column: DELETED17_0_
[2014-09-09 10:48:28,090] TRACE [tibemsd] (NullableType.java:172) - returning '125' as column: MESSAGE30_17_0_

Gdyby numer sekwencji komunikatu był trzymany w bazie (Hibernate IdentifierGenerator = native/sequence) a nie był typem long trzymanym w pamięci to wiele instancji EMS-a mogłoby korzystać z tego samego schematu bazodanowego.

środa, września 03, 2014

Why does EMS have poor client initialization concurrency?

"TibemsXAUniConnectionHandler/pushConnection-336" daemon prio=6 tid=0x000000000fbde000 nid=0x1e58 waiting for monitor entry [0x0000000026dce000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at com.tibco.tibjms.TibjmsxLinkTcp._createSocket(TibjmsxLinkTcp.java:797)
- waiting to lock <0x00000000fdea8b90> (a java.lang.Object)
at com.tibco.tibjms.TibjmsxLinkTcp.connect(TibjmsxLinkTcp.java:914)
at com.tibco.tibjms.TibjmsConnection._create(TibjmsConnection.java:1354)
at com.tibco.tibjms.TibjmsConnection.(TibjmsConnection.java:4320)
at com.tibco.tibjms.TibjmsXAConnection.(TibjmsXAConnection.java:30)
at com.tibco.tibjms.TibjmsXAQueueConnection.(TibjmsXAQueueConnection.java:29)
at com.tibco.tibjms.TibjmsxCFImpl._createImpl(TibjmsxCFImpl.java:164)
at com.tibco.tibjms.TibjmsxCFImpl._createConnection(TibjmsxCFImpl.java:253)
at com.tibco.tibjms.TibjmsXAQueueConnectionFactory.createXAQueueConnection(TibjmsXAQueueConnectionFactory.java:111)
at com.tibco.tibjms.XConnectionHandler$2.run(XConnectionHandler.java:201)

Process of connecting is degraded to single threaded processing due to private static Object _shutdownLock over which almost everything is synchronized inside TibjmsxLinkTcp class including socket creation and heartbeat processing. Fixing this lock (degrading from static) we step down from 8 seconds to 120ms on connection establishing while allocating 20 connections with 10 seconds heartbeat.

czwartek, sierpnia 21, 2014

What is the best JCA JMS Resource Adapter for JBoss 5.1, JBoss 7.1?

JMS JCA RA written by Frank Kieviet for SeeBeyond Technology Corporation and later developed at Sun Microsystems in JCAPS/OpenESB.

środa, sierpnia 20, 2014

GenericRa.rar with JBoss 5.1 and Oracle AQ


Generic JMS RA from Glassfish (created by Sun Microsystems, maintained by Oracle Corporation) is usable with Oracle AQ.

Adjust system properties:
oracle.jms.traceLevel=6
oracle.jms.useNativeXA=true
oracle.jms.useEmulatedXA=false
oracle.jms.j2eeCompliant=false.

<mbean code="org.jboss.jms.jndi.JMSProviderLoader"
          name="jboss.messaging:service=JMSProviderLoader,name=AQProvider">
      <attribute name="ProviderName">DefaultAQProvider</attribute>
      <attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
      <attribute name="FactoryRef">XAConnectionFactory</attribute>
      <attribute name="QueueFactoryRef">XAQueueConnectionFactory</attribute>
      <attribute name="TopicFactoryRef">XATopicConnectionFactory</attribute>
 <attribute name="Properties">
java.naming.factory.initial=oracle.jms.AQjmsInitialContextFactory
java.naming.security.principal=XA
java.naming.security.credentials=toortoor
db_url=jdbc:oracle:thin:@localhost:1521:XE
datasource=java:XAOracleDS
 </attribute>
   </mbean>

wtorek, sierpnia 19, 2014

ConnectionConsumer API for AQjmsConnection

public synchronized ConnectionConsumer createConnectionConsumer(
Destination destination, String s,
ServerSessionPool serversessionpool, int i) throws JMSException {
//AQjmsError.throwEx(102);
return _createConnectionConsumer(destination, null, s, serversessionpool, i);
}

private ConnectionConsumer _createConnectionConsumer(final Destination destination,
final String sub, final String s, final ServerSessionPool spool, final int max) {

return new ConnectionConsumer() {

private boolean closed = false;
private final LinkedBlockingQueue<Long> trigger = new LinkedBlockingQueue<Long>();
private AtomicLong counter = new AtomicLong(0);
@SuppressWarnings("unused")
private Thread bootstrapThread = bootstrapConsumers();

@Override
public ServerSessionPool getServerSessionPool() throws JMSException {
return spool;
}

@Override
public void close() throws JMSException {
closed = true;
}

public Thread bootstrapConsumers() {
Thread coordinator = new Thread() {
public void run() {
setName("ConsumerCoordinator "+Thread.currentThread().getId());
long lastCnt = 0;
while (!closed) {
Long value = null;
try {
value = trigger.poll(1, TimeUnit.SECONDS);
}
catch (InterruptedException e) {}
if ((value!=null && counter.get() < max) && !closed) {
Thread th = new Thread() {
public void run() {
runConsumer();
}
};
th.setDaemon(true);
th.start();
}
long currentCnt = counter.get();
if (currentCnt!=lastCnt) {
System.out.println("Server session pool #"+spool.hashCode()+" has got active "+counter.get()+" session(s), max is "+max);
lastCnt = currentCnt;
}
}
}
};
coordinator.setDaemon(true);
coordinator.start();
trigger.add(System.currentTimeMillis());
return coordinator;
}

public void runConsumer() {
Thread.currentThread().setName("Connection consumer "+Thread.currentThread().getId());
counter.incrementAndGet();
ServerSession ss = null;
Session sess = null;
MessageListener appSrvMessageListener = null;

while (!closed) {
try {
if (ss==null) {
ss = spool.getServerSession(); /* app srv will block */
trigger.put(System.currentTimeMillis());
sess = ss.getSession();
appSrvMessageListener = sess.getMessageListener();
}
if (sess==null) {
sess = ss.getSession();
}
sess.setMessageListener(null);
MessageConsumer mc = (sub!=null) ?
sess.createDurableSubscriber((Topic) destination, sub, s, false) :
sess.createConsumer(destination, s);

while (!closed) {
LinkedList list = new LinkedList();
int limit = 1; /* XA: one transaction per message */
for (int i=0; i < limit; i++) {
Message m = null;
try {
m = (i==limit-1) ? mc.receive(1000) : mc.receiveNoWait();
if (m!=null)
list.add(m);
}
catch (Exception e) {
System.out.println("Error while trying to receive message from JMS server, destination="+destination+", subscription="+sub+", selector="+s+": "+ e);
try {
sess.close();
}
catch (Exception ee) {}
sess = null;
break;
}
}
if (sess==null)
break;

if (!list.isEmpty()) {
synchronized (ss) {
for (Message m : list)
appSrvMessageListener.onMessage(m);
/*ss.start();*/
}
}
}
}
catch (Exception exception) {
System.out.println("Cannot consume JMS connection (but will retry), destination="+destination+", subscription="+sub+", selector="+s+": "+ exception);
try {
if (sess!=null)
sess.close();
}
catch (Exception e) {}
}
}
counter.decrementAndGet();
Logger.debug("EMS Connection Consumer closed (permanently), destination="+destination+", subscription="+sub+", selector="+s);
Logger.debug("Connection #"+hashCode()+" consumer counter is "+counter.get());
}
};
}

Oracle AQ XA Connection

There is a bug in aqapi.jar AQjmsGeneralDBConnection which prevents usage of XA outside Oracle WebLogic.

Here is a bugfix:

private String getProviderKey() throws JMSException {
try {
OracleConnection oracleconnection = (OracleConnection) m_dbConn;
+ if (oracleconnection==null)
+ oracleconnection = (OracleConnection) m_xaConn.getConnection();
String s = oracleconnection.getURL();
int i = s.indexOf('@');
String s1;
if (i < 0) {
if (!"jdbc:oracle:kprb:".equals(((Object) (s)))
&& !"jdbc:default:connection:".equals(((Object) (s))))
AQjmsError.throwEx(112);
s1 = "jdbc:oracle:kprb:";
} else {
s1 = s.substring(i);
}
return s1.toUpperCase();
} catch (SQLException sqlexception) {
throw new AQjmsException(sqlexception);
}
}

oracle.jms.traceLevel=6
oracle.jms.useNativeXA=true
oracle.jms.useEmulatedXA=false
oracle.jms.j2eeCompliant=true

execute dbms_aqadm.create_queue_table(queue_table => 'XA.JMS_Q_STORE001', queue_payload_type => 'sys.aq$_jms_message', multiple_consumers => false);
execute dbms_aqadm.create_queue(queue_name => 'XA.test_fw1', queue_table => 'XA.JMS_Q_STORE001');
execute dbms_aqadm.start_queue('XA.test_fw1')
@ActivationConfigProperty(propertyName = "destination", propertyValue = "Queues/test_fw1"),