Looking for some best practices guidance here…
I need to build some ETL graphs to extract info from a group of databases, all of which look the same. This is for a hosting center, which is growing. This month we have 5 databases. Next month we may have 6, and the next we may have 8.
I can easily have some sort of parameter file that points to all the source dbs I want to process.
What would be the “right way” to approach that with CloverETL? I can build graphs through the GUI to handle one source, but I want it to be data-driven, and able to process against a list of sources.
I’m thinking I’m going to have to drive this entirely from Java…is that right?
Hello,
you can use RunGraph component:
1.Create graph for loading data to database:
<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1204557480788" name="loadData">
<Global>
<Metadata fileURL="meta/EMPLOYEE.fmt" id="Metadata0"/>
<Connection dbConfig="conn/${database}.cfg" id="Connection0" type="JDBC"/>
<Property fileURL="workspace.prm" id="GraphParameter1"/>
</Global>
<Phase number="0">
<Node fileURL="${DATAIN_DIR}/employees.list.dat" id="INPUT" type="DELIMITED_DATA_READER"/>
<Node dbConnection="Connection0" dbTable="employee_tmp" id="OUTPUT" type="DB_OUTPUT_TABLE"/>
<Edge fromNode="INPUT:0" id="INEDGE2" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (out)" toNode="OUTPUT:0"/>
</Phase>
</Graph>
2.Create graph for executing above graph:
<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1221203029835" name="executeGraphs">
<Global>
<Metadata id="Metadata1">
<Record fieldDelimiter="|" name="in" recordDelimiter="\n" type="delimited">
<Field name="graph" type="string"/>
<Field name="parameters" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata0">
<Record fieldDelimiter="|" name="out" recordDelimiter="\n" type="delimited">
<Field name="graph" type="string"/>
<Field name="result" type="string"/>
<Field name="description" type="string"/>
<Field name="message" type="string"/>
<Field name="duration" type="decimal"/>
</Record>
</Metadata>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
</Global>
<Phase number="0">
<Node fileURL="${DATAIN_DIR}/runGraphs.txt" id="DATA_READER0" type="DATA_READER"/>
<Node fileURL="${DATAOUT_DIR}/log.txt" id="DATA_WRITER0" type="DATA_WRITER"/>
<Node id="RUN_GRAPH0" ignoreGraphFail="true" logFile="log.txt" sameInstance="false" type="RUN_GRAPH"/>
<Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in regular mode: names of the graphs to be executed, otherwise nothing)" metadata="Metadata1" outPort="Port 0 (output)" toNode="RUN_GRAPH0:0"/>
<Edge fromNode="RUN_GRAPH0:0" id="Edge1" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (regular mode: status messages, pipeline mode: an information record in case of successful termination, otherwise nothing)" toNode="DATA_WRITER0:0"/>
</Phase>
</Graph>
3.Create input file for the 2nd graph (runGraphs.txt):
graph/loadData.grf|-P:database=oracle -plugins your path to clover.gui plugin/lib/plugins
graph/loadData.grf|-P:database=postgre -your path to clover.gui plugin/lib/plugins
OK…that helps. RunGraph is a component that lets you iterate over the execution of other graphs.
There is a twist, and wondering how to handle it with CloverETL.
Each database contains a particular client’s data, so I need to generate an ID for that client. If the ID has already been generated in my target database for the source database, I want to use that.
Is there a way to get DBOutputTable to do something like “insert if it doesn’t exist, returning the ID”?
DBInputTable can only execute sql statements.
If I understand well you want clover to generate the key? Or the database?
In the first case you can slightly modify your insert graph:
<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1204557480788" >
<Global>
<Metadata id="Metadata2">
<Record fieldDelimiter="," name="EMPLOYEE" recordDelimiter="\n" recordSize="-1" type="delimited">
<Field format="#" name="EMP_NO" nullable="true" shift="0" type="integer"/>
<Field name="FIRST_NAME" nullable="true" shift="0" type="string"/>
<Field name="LAST_NAME" nullable="true" shift="0" type="string"/>
<Field name="PHONE_EXT" nullable="true" shift="0" type="string"/>
<Field format="dd/MM/yyyy" name="HIRE_DATE" nullable="true" shift="0" type="date"/>
<Field name="DEPT_NO" nullable="true" shift="0" type="string"/>
<Field name="JOB_CODE" nullable="true" shift="0" type="string"/>
<Field name="JOB_GRADE" nullable="true" shift="0" type="numeric"/>
<Field name="JOB_COUNTRY" nullable="true" shift="0" type="string"/>
<Field name="SALARY" nullable="true" shift="0" type="numeric"/>
<Field name="FULL_NAME" nullable="true" shift="0" type="string"/>
<Field name="id" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata1">
<Record fieldDelimiter="|" name="ids" recordDelimiter="\n" type="delimited">
<Field name="id" type="string"/>
<Field name="database" type="string"/>
</Record>
</Metadata>
<Metadata fileURL="meta/EMPLOYEE.fmt" id="Metadata0"/>
<Connection dbConfig="conn/target.cfg" id="Connection1" type="JDBC"/>
<Connection dbConfig="conn/${database}.cfg" id="Connection0" type="JDBC"/>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
<LookupTable dbConnection="Connection1" id="LookupTable0" metadata="Metadata1" name="ids" type="dbLookup">
<attr name="sqlQuery"><![CDATA[select * from ids where databaseDesc = ?]]></attr>
</LookupTable>
</Global>
<Phase number="0">
<Node dbConnection="Connection0" dbTable="employee_tmp" id="DB_OUTPUT_TABLE0" type="DB_OUTPUT_TABLE"/>
<Node fileURL="${DATAIN_DIR}/employees.list.dat" id="INPUT" type="DELIMITED_DATA_READER"/>
<Node id="REFORMAT0" transformClass="MyTransform" type="REFORMAT"/>
<Edge fromNode="INPUT:0" id="INEDGE2" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (out)" toNode="REFORMAT0:0"/>
<Edge fromNode="REFORMAT0:0" id="Edge1" inPort="Port 0 (in)" metadata="Metadata2" outPort="Port 0 (out)" toNode="DB_OUTPUT_TABLE0:0"/>
</Phase>
</Graph>
with MyTransform.java:
import org.jetel.component.DataRecordTransform;
import org.jetel.data.DataRecord;
import org.jetel.data.lookup.LookupTable;
import org.jetel.exception.ComponentNotReadyException;
import org.jetel.exception.TransformException;
public class MyTransform extends DataRecordTransform {
String key;
String paramValue;
@Override
public boolean init() throws ComponentNotReadyException {
LookupTable mylookup;
try {
mylookup = graph.getLookupTable("LookupTable0");
mylookup.init();
} catch (Exception e) {
// TODO Auto-generated catch block
return false;
}
paramValue = getGraph().getGraphProperties().getProperty("source_database");
DataRecord record = mylookup.get(paramValue);
if (record != null) {
key = record.getField("id").getValue().toString();
}
return true;
}
@Override
public boolean transform(DataRecord[] arg0, DataRecord[] arg1)
throws TransformException {
defaultTransform(arg0, arg1);//does default mapping
arg1[0].getField("id").setValue(key != null ? key : createKey());
return true;
}
private String createKey(){
return "key1" + paramValue;
}
}
This allows you to check if database exists in processed databases; if yes you use existing key for inserting records, if not you can create new key in createKey() method.
That’s not exactly what I need.
Let’s say I have a file of clients. The file just contains the client name.
My reporting database has a table of CLIENTS, with columns COMPANYNAME and COMPANYID.
Every time I run my graph, I want to determine if I have any new clients in the file. For each new client, I want to insert a row into CLIENTS with a new COMPANYID numeric value.
Let’s assume on my first run, I just have “ABC Company” in the file and an empty CLIENTS table. The graph would run, and not finding “ABC Company” in the client table, would insert a new row with COMPANYID of 1.
Next time my file contains 2 lines:
1. ABC Company
2. XYZ Company
So the next time the graph runs, I find “XYZ Company” is new (doesn’t exist in the client table), and it gets inserted into the client table with a COMPANYID of 2.
Makes sense?
For finding non-existing clients you can use DBJoin component (see also graphDBUnloadParametrized.grf example); on the 0th port you get existing clients (you can send them to Trash ) and on the 1st one you get non-existing - thees you can send to DBOutputTable - on the first output port you can get autogenereted columns for some databases.