I have a use case about using DBOutputTable component. Read one data source, after transforming, then store data to multiple tables. My question here is how to handle the transaction issue (all success or all fail) ? For example, if one of those tables fails during inserting data, the others should be aborted or rolled back…?
It can be defined, what is DBOutputTable to do when inserting fails. commit attribute defines after how many records (without error) commit is performed. If you set it to MAX_INT, DBOutputTable never performs commit. You can call it yourself with DBExecute component or it is performed automatically when closing connection (during graph freeing). Commit is called always after successfully inserting number of records defined by commit attribute, even if errorAction is set to ROLLBACK. Rollback is performed for the last batch only. When you set atomicSQL to true, all SQL queries for one record, are executed as atomic operation, but the value of the commit attribute is ignored and commit is performed after each record.
Is it possible to use manual transaction handling in the whole graph? I need all database operations in my graph to either succeed or fail. The problem is that I am inserting data to multiple databases (MS SQL and Oracle).
My idea was to disable automatic commits and use DBExecute for “BEGIN TRANSACTION” on both databases in phase 0. If the graph fails, no changes would be saved. If the graph succeeds, the last phase whould be just “COMMIT” on both databases. I am aware that it would require the whole graph to use only one instance of each database connection and I am fine with that.
However, no matter what options I use for DBOutputTable or DBExecute, Clover commits the data before all database operations are over. Is it possible (how?) to achieve the requested behavior?
It is not possible with DBOutputTable, but can be achieved with Reformat(s). Eg., when doing all inserts in one Reformat:
import java.sql.PreparedStatement;
import org.jetel.component.DataRecordTransform;
import org.jetel.connection.jdbc.CopySQLData;
import org.jetel.connection.jdbc.DBConnection;
import org.jetel.connection.jdbc.specific.JdbcSpecific.OperationType;
import org.jetel.data.DataRecord;
import org.jetel.exception.ComponentNotReadyException;
import org.jetel.exception.TransformException;
public class DBOutput extends DataRecordTransform {
PreparedStatement mssql_statement, oracle_statement;
CopySQLData[] mssql_transmap, oracle_transmap;
DBConnection mssql, oracle;
public boolean init() throws ComponentNotReadyException {
mssql = (DBConnection) graph.getConnection("Connection0");
oracle = (DBConnection) graph.getConnection("Connection1");
if (mssql == null || oracle == null) {
throw new ComponentNotReadyException("Connection not found");
}
try {
mssql_statement = mssql.getConnection(mssql.getId(), OperationType.WRITE).getSqlConnection().prepareStatement("insert into mssql_table values(?,?,?)");
oracle_statement = oracle.getConnection(oracle.getId(), OperationType.WRITE).getSqlConnection().prepareStatement("insert into oracle_table values(?,?,?)");
} catch (Exception e) {
throw new ComponentNotReadyException(e);
}
return true;
}
public int transform(DataRecord[] arg0, DataRecord[] arg1)
throws TransformException {
if (mssql_transmap == null) {
try {
mssql_transmap = CopySQLData.jetel2sqlTransMap(arg0[0], new int[]{0,1,2}, mssql.getJdbcSpecific());
oracle_transmap = CopySQLData.jetel2sqlTransMap(arg0[0], new int[]{3,4,5}, oracle.getJdbcSpecific());
} catch (Exception e) {
throw new TransformException(e.getMessage(), e);
}
}
try {
for (int i = 0; i < mssql_transmap.length; i++) {
mssql_transmap[i].jetel2sql(mssql_statement);
}
mssql_statement.execute();
for (int i = 0; i < oracle_transmap.length; i++) {
oracle_transmap[i].jetel2sql(oracle_statement);
}
oracle_statement.execute();
} catch (Exception e) {
try {
mssql.getConnection(mssql.getId()).getSqlConnection().rollback();
oracle.getConnection(oracle.getId()).getSqlConnection().rollback();
} catch (Exception e1) {
throw new TransformException("Can't perform rollback", e);
}
throw new TransformException("", e);
}
return ALL;
}
}
You can also do inserts for different databases in different components and perform rollback on both databases, when catching exception in both transformations. In such case remember to use “thread unsafe” connections (set threadSafeConnection=false - uncheck the “Thread-safe connection” check box in Advanced tab in Connection editor in both connections).