Error on subsequent graph executions in embedded mode

Hi gang,

I created an application that manages various CloverETL graphs, their executions, monitors their starting times, whether they were executed successfully etc. Graphs are executed at various times depending on Cron expressions defined by the user in the GUI.

Underneath the application runs CloverETL engine in embedded mode. I used to load graphs from file via


TransformationGraph graph = loadGraph(new FileInputStream(graphFile), projectRuntimeContext);

before every single execution but after many, many executions I would invariably run into PermGen space problem.

Following the advice I found here

viewtopic.php?f=5&t=3347

I switched to caching graphs and re-using them rather than loading them from scratch. This eliminated the PermGen problem.

Today, however, I discovered another problem that I cannot quite cope with. In certain situations a graph is executed flawlessly on the first run but all subsequent runs result in an error. When the graph is loaded from the file it always works ie. I can re-run the same graph many many times (at least until I don’t run into PermGen). However any attempt to re-run the same graph (cached in memory) causes an error.

This behavior doesn’t concern all graphs, some graphs handle the run, reset, re-run cycle well.

Below is a simplified example of a graph that always crashes after the second invokation (the SQL queries are mock queries in the example, in real life they would fetch some meaningful data). It is invoked by the following code:


import static org.jetel.main.runGraph.executeGraph;
.
.
.

Future<Result> futureResult = executeGraph(graph, projectRuntimeContext);
Result result = futureResult.get();
graph.reset();

The error message is

org.jetel.exception.JetelException: The field ‘q_body’ contain unsupported null value.
at org.jetel.util.ReadableChannelIterator.next(ReadableChannelIterator.java:219)
at org.jetel.component.DBInputTable.execute(DBInputTable.java:287)
at org.jetel.graph.Node.run(Node.java:425)
at java.lang.Thread.run(Thread.java:636)

And here is the graph itself:


<?xml version="1.0" encoding="UTF-8"?>
<Graph author="xxx" created="Thu Jan 19 15:16:45 CET 2012" guiVersion="3.1.0" id="1327312754675" licenseCode="community" licenseType="Community" modified="Mon Apr 23 14:58:16 CEST 2012" modifiedBy="xxx" name="output1" revision="1.484" showComponentDetails="true">
<Global>
<Metadata id="Metadata4" previewAttachmentCharset="ISO-8859-1">
<Record fieldDelimiter="|" name="opt_matrix" previewAttachmentCharset="ISO-8859-1" recordDelimiter="\r\n" type="delimited">
<Field name="capture" type="string"/>
<Field name="msisdn" type="string"/>
<Field name="kw" type="string"/>
<Field name="sh" type="string"/>
<Field name="alias" type="string"/>
<Field name="optin" type="string"/>
<Field name="delivery_status" type="string"/>
<Field name="optin_timestamp" type="string"/>
<Field name="operator" type="integer"/>
<Field name="matrix_messageid" type="integer"/>
<Field name="templateId" type="long"/>
<Field name="request" type="string"/>
<Field name="response" type="string"/>
<Field name="mtcount" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata0">
<Record fieldDelimiter="|" name="query" recordDelimiter="\r\n" type="delimited">
<Field name="q_body" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata15">
<Record fieldDelimiter="|" name="recordName4" recordDelimiter="\n" type="delimited">
<Field name="field1" type="string"/>
</Record>
</Metadata>
<Connection database="MYSQL" dbURL="${ETL_DB_CONNECTION}" id="JDBC0" jdbcSpecific="MYSQL" name="etl" password="${ETL_DB_PASS}" type="JDBC" user="${ETL_DB_USERNAME}"/>
<Connection database="POSTGRE" dbURL="${REP_DB_CONNECTION}" id="JDBC5" jdbcSpecific="POSTGRE" name="REP DB" password="${REP_DB_PASS}" type="JDBC" user="${REP_DB_USERNAME}"/>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
<Dictionary/>
</Global>
<Phase number="0">
<Node dbConnection="JDBC5" enabled="enabled" guiHeight="69" guiName="DBInputTable" guiWidth="130" guiX="56" guiY="32" id="DB_INPUT_TABLE0" sqlQuery="select '1';" type="DB_INPUT_TABLE"/>
<Edge fromNode="DB_INPUT_TABLE0:0" guiBendpoints="" guiRouter="Manhattan" id="Edge0" inPort="Port 0 (in)" metadata="Metadata15" outPort="Port 0 (out)" toNode="REFORMAT1:0"/>
</Phase>
<Phase number="2">
<Node dbConnection="JDBC5" enabled="enabled" guiHeight="69" guiName="webCC opt-ins" guiWidth="139" guiX="140" guiY="171" id="DB_INPUT_TABLE2" printStatements="true" type="DB_INPUT_TABLE" url="port:$0.q_body:discrete"/>
<Node enabled="enabled" guiHeight="69" guiName="opt query" guiWidth="128" guiX="247" guiY="32" id="REFORMAT1" type="REFORMAT">
<attr name="transform"><![CDATA[//#CTL2

// Transforms input record into output record.

function integer transform() {
	$0.q_body = "select  'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 0, 0, cast (1 as bigint), 'x', 'x', 'x'";

	return ALL;
}
// Called during component initialization.
// function boolean init() {}

// Called during each graph run before the transform is executed. May be used to allocate and initialize resources
// required by the transform. All resources allocated within this method should be released
// by the postExecute() method.
// function void preExecute() {}

// Called only if transform() throws an exception.
// function integer transformOnError(string errorMessage, string stackTrace) {}

// Called during each graph run after the entire transform was executed. Should be used to free any resources
// allocated within the preExecute() method.
// function void postExecute() {}

// Called to return a user-defined error message when an error occurs.
// function string getMessage() {}
]]></attr>
</Node>
<Node enabled="enabled" guiHeight="69" guiName="Trash" guiWidth="128" guiX="390" guiY="171" id="TRASH0" type="TRASH"/>
<Edge fromNode="DB_INPUT_TABLE2:0" guiBendpoints="" guiRouter="Manhattan" id="Edge7" inPort="Port 0 (in)" metadata="Metadata4" outPort="Port 0 (out)" toNode="TRASH0:0"/>
<Edge fromNode="REFORMAT1:0" guiBendpoints="" guiRouter="Manhattan" id="Edge2" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (out)" toNode="DB_INPUT_TABLE2:0"/>
</Phase>
</Graph>

I’m using Clover engine v 3.0. Any ideas what I am doing wrong? Would greatly appreciate any help.

Best regards,
Konrad Ciborowski
Kraków, Poland

Dear Konrad Ciborowski,

I am afraid that behavior you reported is a bug in component reset. I created https://bug.javlin.eu/browse/CL-2276

There is no workaround for this. You need to switch off graph reuse for now. If it is performance issue for you, you can try to do some kind of bulk processing. So graphs will be executed less often but on bigger datasets. Then graph load latency will not be that big issue.

After two days of debugging of the CloverETL engine I was able to pinpoint the problem. The ReadableChannelPortIterator class contains a record field which is initialized to


record = new DataRecord(inputPort.getMetadata());

and then, populated by records one by one as they are being read. When the last record is read the record field is set to null and the reset() methods don’t re-initialize it. So during the next run the following line in the getNextData() method

inputPort.readRecord(record);

takes the execution path to the DirectEdgeclass and the readRecord() method. Ultimately the following line

record.deserialize(readBuffer);

throws a NullPointerException (record is null).

A solution would be to re-initialize all components but it is not that simple as various graph elements contain initialized and firstRun variables which are private (on top of that PhaseConnectionEdge contains a wasInitialized field which is also private. After a lot of trial and error I was able to implement the following workaround. It is a very, very ugly hack (modifying values of private fields by reflection is one of the worst programming practices) but short of modifying the source of the engine I was unable to find anything else. I’m not even 100% certain that my fix doesn’t break something elsewhere.


private void resetGraphPhases(TransformationGraph graph) throws NoSuchFieldException, IllegalArgumentException, IllegalAccessException, NoSuchMethodException, InvocationTargetException, ComponentNotReadyException {

    Phase phases[] = graph.getPhases();
    for (int i = 0; i < phases.length; i++) {  
        Phase phase = phases[i];
        resetToInitialState(phase);                
        for (Node node : phase.getNodes().values()) {
            if (node.getClass().getName().equals("org.jetel.component.DBInputTable")) {
                resetToInitialState(node);
            }
        }
        for (Edge edge : phase.getEdges().values()) {
            resetToInitialState(edge);
            Field edgeBaseField = edge.getClass().getDeclaredField("edge");
            edgeBaseField.setAccessible(true);
            EdgeBase edgeBase = (EdgeBase)edgeBaseField.get(edge);
            if (edgeBase.getClass().getName().equals("org.jetel.graph.PhaseConnectionEdge")) {
                Field wasInitialized = edgeBase.getClass().getDeclaredField("wasInitialized");
                wasInitialized.setAccessible(true);
                wasInitialized.setBoolean(edgeBase, false);
            }
        }
        phase.init();
    }
}

private void resetToInitialState(GraphElement graphElement) throws SecurityException, IllegalAccessException, IllegalArgumentException {
    Class clazz = graphElement.getClass();
    graphElement.free();
    while (!clazz.getName().equals("org.jetel.graph.GraphElement")) {
        clazz = clazz.getSuperclass();
    }                                                        
    Field declaredFields[] = clazz.getDeclaredFields();

    for (int i = 0; i < declaredFields.length; i++) {
          if (declaredFields[i].getName().equals("initialized")) {
            declaredFields[i].setAccessible(true);
            declaredFields[i].setBoolean(graphElement, false);                    
        }
        if (declaredFields[i].getName().equals("firstRun")) {
            declaredFields[i].setAccessible(true);
            declaredFields[i].setBoolean(graphElement, true);                    
        }
    }
}