wtorek, lutego 24, 2015

How to release unused XiNodes from BW process variables

BW accumulates variables across job lifetime. Very often intermediate variables in later steps are not needed any more, but still occupy memory. It is possible to clean them and make more memory for new jobs, globally reducing RAM requirement for whole BW instance.

package Libraries.Utils.ReleaseVariable;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.io.*;
import org.xml.sax.InputSource;

import com.tibco.xml.datamodel.XiNode;
import com.tibco.xml.datamodel.XiParserFactory;
import com.tibco.xml.xdata.xpath.Variable;
import com.tibco.xml.xdata.xpath.VariableList;
import com.tibco.pe.core.*;

public class ReleaseVariableJavaCode{

 public static JobPool getJobPool() {
  for (Field f : Engine.class.getDeclaredFields()) {
   if (f.getType().getName().endsWith("JobPool")) {
    f.setAccessible(true);
    try {
     return (JobPool) f.get(null);
    }
    catch (Throwable te) {
     throw new RuntimeException("Cannot access JobPool: "+te.getMessage(), te);
    }    
   }   
  }
  throw new RuntimeException("Cannot access JobPool");
 }

 public static long[] getJobIds() {
  try {
   return getJobPool().getJobIds();
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access JobPool.getJobIds(): "+t.getMessage(), t);
  }
 }
 
 public static VariableList getJobVariables(long jid, int trackId) {
  try {
   Method getAttributes = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getAttributes", new Class<?>[] { int.class });
   getAttributes.setAccessible(true);
   return (VariableList) getAttributes.invoke(getJobPool().findJob(jid), trackId);
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access job variables: "+t.getMessage(), t);
  }
 }

 public int getTrackId(long jid) {
  try {
   Method getter = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getTrackId", new Class<?>[0]);
   getter.setAccessible(true);
   return (int) getter.invoke(getJobPool().findJob(jid));
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access job methods: "+t.getMessage(), t);
  }
 }

 public void nullifyXiNode(XiNode node) {
  while (node!=null && node.hasChildNodes())
   node.removeChild(node.getLastChild());
  if (callGC==1)
   System.gc();
 }

 public void prune(XiNode node, String[] tokens, int i) {
  //System.out.println("prune at level "+i+" of "+node);
  if (i==1)
   node = node.hasChildNodes() ? node.getFirstChild() : null; // root node
  while (node!=null) {
   //System.out.println("node name is "+(node.getName()!=null ? node.getName().getLocalName() : null));
   if (node.getName() != null && tokens[i].equals(node.getName().getLocalName())) {
    if (i == tokens.length-1) {
     //System.out.println("nullify");
     nullifyXiNode(  node );
    }
    else {
     XiNode nd = node.hasChildNodes() ? node.getFirstChild() : null;
     if (nd!=null) {
      prune(nd, tokens, i+1);      
     }
    }
   }
   node = node.hasNextSibling() ? node.getNextSibling() : null;
  }
 }
/****** START SET/GET METHOD, DO NOT MODIFY *****/
 protected long jobId = 0;
 protected String var = "";
 protected int callGC = 0;
 public long getjobId() {
  return jobId;
 }
 public void setjobId(long val) {
  jobId = val;
 }
 public String getvar() {
  return var;
 }
 public void setvar(String val) {
  var = val;
 }
 public int getcallGC() {
  return callGC;
 }
 public void setcallGC(int val) {
  callGC = val;
 }
/****** END SET/GET METHOD, DO NOT MODIFY *****/
 public ReleaseVariableJavaCode() {
 }
 public void invoke() throws Exception {
/* Available Variables: DO NOT MODIFY
 In  : long jobId
 In  : String var
 In  : int callGC
* Available Variables: DO NOT MODIFY *****/
String[] tokens = var.split("/");
int currentTrackId = getTrackId(jobId);

for (int iter=0; iter <= 4; iter++) {
    VariableList varList = getJobVariables(jobId, iter);
    Variable v = varList!=null && iter != currentTrackId ? varList.getVariable(tokens[0]) : null;

    if (v!=null) { 
      if (tokens.length == 1) {
          nullifyXiNode(  v.getValue() ); 
      }
      else if (tokens.length>1) {
  prune( v.getValue(), tokens, 1); 
      }
    }
}


}
}

poniedziałek, lutego 23, 2015

Aerospike sever has got synchronization bugs :(

Bugs found during VAT testing (http://1307723433353.blogspot.com/2014/05/vat-validation-of-architecture-in-tests.html)
Aerospike is Open Source and if you encounter a problem you can analyze it yourself without waiting for the support in a different timezone. You can recompile fixed code and bring it immediately to the production. It is a way faster than with any platinum level support. You need only one guy who knows Linux and C.

piątek, lutego 20, 2015

How to get memory usage of Tibco BW Job

package Libraries.Utils.GetJobsMemoryUsage;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.io.*;
import org.xml.sax.InputSource;

import com.tibco.xml.datamodel.XiNode;
import com.tibco.xml.datamodel.XiParserFactory;
import com.tibco.xml.xdata.xpath.Variable;
import com.tibco.xml.xdata.xpath.VariableList;
import com.tibco.pe.core.*;

public class GetJobsMemoryUsageJavaCode{

 public static JobPool getJobPool() {
  for (Field f : Engine.class.getDeclaredFields()) {
   if (f.getType().getName().endsWith("JobPool")) {
    f.setAccessible(true);
    try {
     return (JobPool) f.get(null);
    }
    catch (Throwable te) {
     throw new RuntimeException("Cannot access JobPool: "+te.getMessage(), te);
    }    
   }   
  }
  throw new RuntimeException("Cannot access JobPool");
 }

 public static long[] getJobIds() {
  try {
   return getJobPool().getJobIds();
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access JobPool.getJobIds(): "+t.getMessage(), t);
  }
 }

 public static String getJobName(long id) {
  try {
   Object job = getJobPool().findJob(id);
   if (job!=null) {
    Method getWorkflow = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getWorkflow", new Class<?>[0]);
    getWorkflow.setAccessible(true);
    com.tibco.pe.core.Workflow w = (com.tibco.pe.core.Workflow)getWorkflow.invoke(job);
    return w!=null ? w.getName() : "no-workflow-name";
   }
   else
    return null;
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access Job data: "+t.getMessage(), t);
  }
 }

 public static VariableList getJobVariables(long jid, int trackId) {
  try {
   Method getAttributes = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getAttributes", new Class<?>[] { int.class });
   getAttributes.setAccessible(true);
   return (VariableList) getAttributes.invoke(getJobPool().findJob(jid), trackId);
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access job variables: "+t.getMessage(), t);
  }
 }

 public int getTrackId(long jid) {
  try {
   Method getter = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getTrackId", new Class<?>[0]);
   getter.setAccessible(true);
   return (int) getter.invoke(getJobPool().findJob(jid));
  }
  catch (Throwable t) {
   throw new RuntimeException("Cannot access job methods: "+t.getMessage(), t);
  }
 }
/****** START SET/GET METHOD, DO NOT MODIFY *****/
 protected long jobId = 0;
 protected String[] usage = null;
 public long getjobId() {
  return jobId;
 }
 public void setjobId(long val) {
  jobId = val;
 }
 public String[] getusage() {
  return usage;
 }
 public void setusage(String[] val) {
  usage = val;
 }
/****** END SET/GET METHOD, DO NOT MODIFY *****/
 public GetJobsMemoryUsageJavaCode() {
 }
 public void invoke() throws Exception {
/* Available Variables: DO NOT MODIFY
 In  : long jobId
 Out : String[] usage
* Available Variables: DO NOT MODIFY *****/

LinkedList<String> usageList = new LinkedList<String>();
long ids[] = jobId > 0 ? new long[] { jobId } : getJobIds();

if (ids!=null) {
 StringBuilder sb = new StringBuilder();
 for (long id : ids) {
  String jobName = getJobName(id);
  sb.setLength(0); 
  AtomicLong mem = new AtomicLong(0);
  int currentTrackId = getTrackId(id);

  for (int trackId = 0; trackId < 4; trackId++) {
   VariableList varList = trackId == currentTrackId ? null : getJobVariables(id, trackId);
   if (varList!=null) {  
    for (Object varName : varList.getVariableNames()) {
     Variable v = varList.getVariable(varName.toString());
     long size = 0;
     if (v.getValue()==null)
      size = (v.getNumber()+"").length();
     else {
      final AtomicInteger cnt = new AtomicInteger(0);
      OutputStream cos = new OutputStream() {
       public void write(int i) { cnt.addAndGet(4); }
       public void write(byte bytes[], int off, int len) throws IOException { cnt.addAndGet(len); }
      };
      com.tibco.xml.datamodel.helpers.XiSerializer.serialize( v.getValue(), cos, "utf-8", true );
      size = cnt.get();
     }
     sb.append(varName).append('[').append(trackId).append(']').append("=").append(size).append("|");  
     mem.addAndGet(size);
    } 
   }
  }  
  sb.insert(0, jobName+"="+mem.longValue()+"|");
  usageList.add(sb.toString());
 } 
}
usage = usageList.toArray(new String[0]);}
}


czwartek, lutego 19, 2015

Aerospike goes AIO!



Modified source code is here. I need to give it back to be compliant with GNU AGPLv3 licence.

wtorek, lutego 10, 2015

Horyzontalny transfer genów

http://io9.com/confirmation-that-photosynthesizing-sea-slugs-steal-gen-1683702602

Fix 'No space left on device' with btrfs

Add new disk. Then: btrfs device add /dev/sdd /. And it's fixed.

piątek, lutego 06, 2015

Aerospike or not Aerospike

[4] + clustering without the need for GFS2/RHEL cluster/NAS
[5] + out of box interDC replication
[3] + faster than O_SYNC file and DB
[2] + cheaper for IT operations than 'canonical solutions for JEE'
[2] + better support, because vendor 'does care'

[2] - new uknown product
[2] - needs extensive crash test/validation test
[4] - need to buy support
[2] - huge RAM usage
[4] - will the company remain on the market for a decade?

16:14

Aerospike - fast or safe?

Let's look at https://github.com/aerospike/aerospike-server/blob/180ed47a5fffc54b3e45faccb33c908bc189db2e/as/src/storage/drv_ssd.c and try to find open() system call and fsync() system call. We see that open flags like O_SYNC are parametrized. There is also loop for fsync with parametrized sleep time. Relevant configuration (https://github.com/aerospike/aerospike-server/blob/master/as/src/base/cfg.c) lives in enable-osync and fsync-max-sec. Now, check default values at http://www.aerospike.com/docs/reference/configuration/. We see that synchronous writes are disabled by default, and fsync is also disabled. This means that there are no safety guaranties for single Aerospike node (see also: flush-max-ms). Due to asd/OS/storage crash data will likely be lost or corrupted. Probabilistic situation for 3 nodes on different racks with battery backed SSD array is quite different and the combined risk is very small (if we do not consider whole datacenter crash) and therefore Aerospike is acceptable solution with standard sane data safety/integrity/availability criteria.

czwartek, lutego 05, 2015

Nie tankuj na Orlenie


BMW i8

Godzina jazdy na torze tym cudem kosztuje 7 000 zł

Samo cudo kosztuje ponad 500 000 zł








wtorek, lutego 03, 2015

Tibco EMS on Aerospike NoSQL cluster

It is possible to have EMS distributed, partitioned, fault tolerant and replicated without clustered filesystem? Just hook into system calls and override storage access. Aerospike NoSQL solution comes with C client. EMS is also written in C/C++. Everything is now possible. Sky is the limit! Prof of concept doesn't have compaction implemented. It can be achieved by as_query_where + delete.

Now some benchmarks. 20 sender threads send 50 000 1KB messages, in the same time 20 receivers is active. Time of piping 1 mln messages (1GB) is measured. EMS on BTRFS inside virtual OpenSUSE 13.2 needs 3651 seconds, while EMS on Aerospike NoSQL needs 1961 seconds. EMS on file is 86% slower than on Aerospike.

Update: raw device with O_SYNC time is 2754s.