package Libraries.Utils.ReleaseVariable; import java.lang.reflect.*; import java.util.*; import java.util.concurrent.atomic.*; import java.io.*; import org.xml.sax.InputSource; import com.tibco.xml.datamodel.XiNode; import com.tibco.xml.datamodel.XiParserFactory; import com.tibco.xml.xdata.xpath.Variable; import com.tibco.xml.xdata.xpath.VariableList; import com.tibco.pe.core.*; public class ReleaseVariableJavaCode{ public static JobPool getJobPool() { for (Field f : Engine.class.getDeclaredFields()) { if (f.getType().getName().endsWith("JobPool")) { f.setAccessible(true); try { return (JobPool) f.get(null); } catch (Throwable te) { throw new RuntimeException("Cannot access JobPool: "+te.getMessage(), te); } } } throw new RuntimeException("Cannot access JobPool"); } public static long[] getJobIds() { try { return getJobPool().getJobIds(); } catch (Throwable t) { throw new RuntimeException("Cannot access JobPool.getJobIds(): "+t.getMessage(), t); } } public static VariableList getJobVariables(long jid, int trackId) { try { Method getAttributes = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getAttributes", new Class<?>[] { int.class }); getAttributes.setAccessible(true); return (VariableList) getAttributes.invoke(getJobPool().findJob(jid), trackId); } catch (Throwable t) { throw new RuntimeException("Cannot access job variables: "+t.getMessage(), t); } } public int getTrackId(long jid) { try { Method getter = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getTrackId", new Class<?>[0]); getter.setAccessible(true); return (int) getter.invoke(getJobPool().findJob(jid)); } catch (Throwable t) { throw new RuntimeException("Cannot access job methods: "+t.getMessage(), t); } } public void nullifyXiNode(XiNode node) { while (node!=null && node.hasChildNodes()) node.removeChild(node.getLastChild()); if (callGC==1) System.gc(); } public void prune(XiNode node, String[] tokens, int i) { //System.out.println("prune at level "+i+" of "+node); if (i==1) node = node.hasChildNodes() ? node.getFirstChild() : null; // root node while (node!=null) { //System.out.println("node name is "+(node.getName()!=null ? node.getName().getLocalName() : null)); if (node.getName() != null && tokens[i].equals(node.getName().getLocalName())) { if (i == tokens.length-1) { //System.out.println("nullify"); nullifyXiNode( node ); } else { XiNode nd = node.hasChildNodes() ? node.getFirstChild() : null; if (nd!=null) { prune(nd, tokens, i+1); } } } node = node.hasNextSibling() ? node.getNextSibling() : null; } } /****** START SET/GET METHOD, DO NOT MODIFY *****/ protected long jobId = 0; protected String var = ""; protected int callGC = 0; public long getjobId() { return jobId; } public void setjobId(long val) { jobId = val; } public String getvar() { return var; } public void setvar(String val) { var = val; } public int getcallGC() { return callGC; } public void setcallGC(int val) { callGC = val; } /****** END SET/GET METHOD, DO NOT MODIFY *****/ public ReleaseVariableJavaCode() { } public void invoke() throws Exception { /* Available Variables: DO NOT MODIFY In : long jobId In : String var In : int callGC * Available Variables: DO NOT MODIFY *****/ String[] tokens = var.split("/"); int currentTrackId = getTrackId(jobId); for (int iter=0; iter <= 4; iter++) { VariableList varList = getJobVariables(jobId, iter); Variable v = varList!=null && iter != currentTrackId ? varList.getVariable(tokens[0]) : null; if (v!=null) { if (tokens.length == 1) { nullifyXiNode( v.getValue() ); } else if (tokens.length>1) { prune( v.getValue(), tokens, 1); } } } } }
wtorek, lutego 24, 2015
How to release unused XiNodes from BW process variables
BW accumulates variables across job lifetime. Very often intermediate variables in later steps are not needed any more, but still occupy memory. It is possible to clean them and make more memory for new jobs, globally reducing RAM requirement for whole BW instance.
poniedziałek, lutego 23, 2015
Aerospike sever has got synchronization bugs :(
Bugs found during VAT testing (http://1307723433353.blogspot.com/2014/05/vat-validation-of-architecture-in-tests.html) |
piątek, lutego 20, 2015
How to get memory usage of Tibco BW Job
package Libraries.Utils.GetJobsMemoryUsage; import java.lang.reflect.*; import java.util.*; import java.util.concurrent.atomic.*; import java.io.*; import org.xml.sax.InputSource; import com.tibco.xml.datamodel.XiNode; import com.tibco.xml.datamodel.XiParserFactory; import com.tibco.xml.xdata.xpath.Variable; import com.tibco.xml.xdata.xpath.VariableList; import com.tibco.pe.core.*; public class GetJobsMemoryUsageJavaCode{ public static JobPool getJobPool() { for (Field f : Engine.class.getDeclaredFields()) { if (f.getType().getName().endsWith("JobPool")) { f.setAccessible(true); try { return (JobPool) f.get(null); } catch (Throwable te) { throw new RuntimeException("Cannot access JobPool: "+te.getMessage(), te); } } } throw new RuntimeException("Cannot access JobPool"); } public static long[] getJobIds() { try { return getJobPool().getJobIds(); } catch (Throwable t) { throw new RuntimeException("Cannot access JobPool.getJobIds(): "+t.getMessage(), t); } } public static String getJobName(long id) { try { Object job = getJobPool().findJob(id); if (job!=null) { Method getWorkflow = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getWorkflow", new Class<?>[0]); getWorkflow.setAccessible(true); com.tibco.pe.core.Workflow w = (com.tibco.pe.core.Workflow)getWorkflow.invoke(job); return w!=null ? w.getName() : "no-workflow-name"; } else return null; } catch (Throwable t) { throw new RuntimeException("Cannot access Job data: "+t.getMessage(), t); } } public static VariableList getJobVariables(long jid, int trackId) { try { Method getAttributes = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getAttributes", new Class<?>[] { int.class }); getAttributes.setAccessible(true); return (VariableList) getAttributes.invoke(getJobPool().findJob(jid), trackId); } catch (Throwable t) { throw new RuntimeException("Cannot access job variables: "+t.getMessage(), t); } } public int getTrackId(long jid) { try { Method getter = Class.forName("com.tibco.pe.core.Job").getDeclaredMethod("getTrackId", new Class<?>[0]); getter.setAccessible(true); return (int) getter.invoke(getJobPool().findJob(jid)); } catch (Throwable t) { throw new RuntimeException("Cannot access job methods: "+t.getMessage(), t); } } /****** START SET/GET METHOD, DO NOT MODIFY *****/ protected long jobId = 0; protected String[] usage = null; public long getjobId() { return jobId; } public void setjobId(long val) { jobId = val; } public String[] getusage() { return usage; } public void setusage(String[] val) { usage = val; } /****** END SET/GET METHOD, DO NOT MODIFY *****/ public GetJobsMemoryUsageJavaCode() { } public void invoke() throws Exception { /* Available Variables: DO NOT MODIFY In : long jobId Out : String[] usage * Available Variables: DO NOT MODIFY *****/ LinkedList<String> usageList = new LinkedList<String>(); long ids[] = jobId > 0 ? new long[] { jobId } : getJobIds(); if (ids!=null) { StringBuilder sb = new StringBuilder(); for (long id : ids) { String jobName = getJobName(id); sb.setLength(0); AtomicLong mem = new AtomicLong(0); int currentTrackId = getTrackId(id); for (int trackId = 0; trackId < 4; trackId++) { VariableList varList = trackId == currentTrackId ? null : getJobVariables(id, trackId); if (varList!=null) { for (Object varName : varList.getVariableNames()) { Variable v = varList.getVariable(varName.toString()); long size = 0; if (v.getValue()==null) size = (v.getNumber()+"").length(); else { final AtomicInteger cnt = new AtomicInteger(0); OutputStream cos = new OutputStream() { public void write(int i) { cnt.addAndGet(4); } public void write(byte bytes[], int off, int len) throws IOException { cnt.addAndGet(len); } }; com.tibco.xml.datamodel.helpers.XiSerializer.serialize( v.getValue(), cos, "utf-8", true ); size = cnt.get(); } sb.append(varName).append('[').append(trackId).append(']').append("=").append(size).append("|"); mem.addAndGet(size); } } } sb.insert(0, jobName+"="+mem.longValue()+"|"); usageList.add(sb.toString()); } } usage = usageList.toArray(new String[0]);} }
czwartek, lutego 19, 2015
wtorek, lutego 10, 2015
Fix 'No space left on device' with btrfs
Add new disk. Then: btrfs device add /dev/sdd /. And it's fixed.
piątek, lutego 06, 2015
Aerospike or not Aerospike
[4] + clustering without the need for GFS2/RHEL cluster/NAS
[5] + out of box interDC replication
[3] + faster than O_SYNC file and DB
[2] + cheaper for IT operations than 'canonical solutions for JEE'
[2] + better support, because vendor 'does care'
[2] - new uknown product
[2] - needs extensive crash test/validation test
[4] - need to buy support
[2] - huge RAM usage
[4] - will the company remain on the market for a decade?
16:14
[5] + out of box interDC replication
[3] + faster than O_SYNC file and DB
[2] + cheaper for IT operations than 'canonical solutions for JEE'
[2] + better support, because vendor 'does care'
[2] - new uknown product
[2] - needs extensive crash test/validation test
[4] - need to buy support
[2] - huge RAM usage
[4] - will the company remain on the market for a decade?
16:14
Aerospike - fast or safe?
Let's look at https://github.com/aerospike/aerospike-server/blob/180ed47a5fffc54b3e45faccb33c908bc189db2e/as/src/storage/drv_ssd.c and try to find open() system call and fsync() system call. We see that open flags like O_SYNC are parametrized. There is also loop for fsync with parametrized sleep time. Relevant configuration (https://github.com/aerospike/aerospike-server/blob/master/as/src/base/cfg.c) lives in enable-osync and fsync-max-sec. Now, check default values at http://www.aerospike.com/docs/reference/configuration/. We see that synchronous writes are disabled by default, and fsync is also disabled. This means that there are no safety guaranties for single Aerospike node (see also: flush-max-ms). Due to asd/OS/storage crash data will likely be lost or corrupted. Probabilistic situation for 3 nodes on different racks with battery backed SSD array is quite different and the combined risk is very small (if we do not consider whole datacenter crash) and therefore Aerospike is acceptable solution with standard sane data safety/integrity/availability criteria.
czwartek, lutego 05, 2015
wtorek, lutego 03, 2015
Tibco EMS on Aerospike NoSQL cluster
It is possible to have EMS distributed, partitioned, fault tolerant and replicated without clustered filesystem? Just hook into system calls and override storage access. Aerospike NoSQL solution comes with C client. EMS is also written in C/C++. Everything is now possible. Sky is the limit! Prof of concept doesn't have compaction implemented. It can be achieved by as_query_where + delete.
Now some benchmarks. 20 sender threads send 50 000 1KB messages, in the same time 20 receivers is active. Time of piping 1 mln messages (1GB) is measured. EMS on BTRFS inside virtual OpenSUSE 13.2 needs 3651 seconds, while EMS on Aerospike NoSQL needs 1961 seconds. EMS on file is 86% slower than on Aerospike.
Update: raw device with O_SYNC time is 2754s.
Subskrybuj:
Posty (Atom)