Hi,
I want to integrate two files but my code is not going in the transform calss
my code is:
String childFileName=controller.inputFileName;
boolean isDone = false;
DBConnection dbCon;
Phase PHASE_0 = new Phase(1);
// initialization; must be present
EngineInitializer.initEngine(null, null, null);
System.out
.println(“*******************************************************************************************”);
// create connection object. Get driver and connect string from cfg file
// specified as a first argument
DataBaseConnection dbConn=new DataBaseConnection();
//System.out.println(“Test3333333333333”);
dbCon=dbConn.getConnection();
//dbCon = new DBConnection(“dbConnection”, “src/mysql.cfg”);
System.out.println(“…Db connections…”+dbCon);
try {
System.out.println(“Id of the DB Connection:::”+dbCon.getId());
dbCon.init();
} catch (Exception ex) {
ex.printStackTrace();
}
/*Connection conn = dbCon.getConnection(“Conn0”);
Statement st=null;
ResultSet rs1=null;
try {
st=conn.createStatement();
rs1=st.executeQuery(“select code from feed_institution”);
rs1.next();
System.out.println("Result Setttt "+rs1.getString(“code”));
} catch (SQLException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
} */
//this.createConnection();
// initialization; must be present
//DataRecordMetadata childMetadata=controller.MetadataInput;
// DataRecordMetadata parentMetadata=metadataInput;
DataRecordMetadata mergerd_metada = new DataRecordMetadata(
“metadaat3”, DataRecordMetadata.DELIMITED_RECORD);
mergerd_metada.addField(new DataFieldMetadata(“contortia_id1”,
DataFieldMetadata.STRING_FIELD, null));
mergerd_metada.addField(new DataFieldMetadata(“contortia_name”,
DataFieldMetadata.STRING_FIELD, null));
mergerd_metada.addField(new DataFieldMetadata(“institution_id”,
DataFieldMetadata.STRING_FIELD, null));
mergerd_metada.addField(new DataFieldMetadata(“contortia_id2”,
DataFieldMetadata.STRING_FIELD, null));
mergerd_metada.setFieldDelimiter(separator);
mergerd_metada.setRecordDelimiters(“\n”);
DataRecordMetadata parentMetadata = new DataRecordMetadata(
“metadaat1”, DataRecordMetadata.DELIMITED_RECORD);
parentMetadata.addField(new DataFieldMetadata(“contortia_id”,
DataFieldMetadata.STRING_FIELD, null));
parentMetadata.addField(new DataFieldMetadata(“contortia_name”,
DataFieldMetadata.STRING_FIELD, null));
parentMetadata.setFieldDelimiter(“,”);
parentMetadata.setRecordDelimiters(“\n”);
DataRecordMetadata childMetadata = new DataRecordMetadata(
“metadaat”, DataRecordMetadata.DELIMITED_RECORD);
childMetadata.addField(new DataFieldMetadata(“institution_id”,
DataFieldMetadata.STRING_FIELD, null));
childMetadata.addField(new DataFieldMetadata(“contortia_id”,
DataFieldMetadata.STRING_FIELD, null));
childMetadata.setFieldDelimiter(“,”);
childMetadata.setRecordDelimiters(“\n”);
Map parentmap=parentMetadata.getFieldNames();
String str=parentMetadata.getFieldDelimiters();
System.out.println(str[0]+“delimiter”);
Set entrySet1 =parentmap.entrySet();
Iterator iter1=entrySet1.iterator();
while (iter1.hasNext()) {
Map.Entry entry1 = (Map.Entry) iter1.next();
System.out.println(entry1.getKey()+“—parent—”+entry1.getValue());
}
Map childmap=childMetadata.getFieldNames();
Set entrySet =childmap.entrySet();
String common=null;
Iterator iter=entrySet.iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
System.out.println(entry.getKey()+“—child—”+entry.getValue());
}
Map mergerd_metadamap=mergerd_metada.getFieldNames();
Set entrySet2 =mergerd_metadamap.entrySet();
//String common=null;
Iterator iter2=entrySet2.iterator();
while (iter2.hasNext()) {
Map.Entry entry2 = (Map.Entry) iter2.next();
System.out.println(entry2.getKey()+“—merge—”+entry2.getValue());
}
TransformationGraph institutionOrphanCheckGraph = new TransformationGraph();
/* String parentQuery=“select code from feed_institution”;
String childQuery=“select pub_code from ip_institutions”;*/
// Nodes…
System.out.println(“PArent File:”+parentFileName);
System.out.println(“Child File:”+childFileName);
DelimitedDataReader parentDBReader = new DelimitedDataReader(“parentReader”,parentFileName);
parentDBReader.setSkipFirstLine(true);
Node parentDBReaderNode = parentDBReader;
// System.out.println(“Child File”+controller.inputFileName);
DelimitedDataReader childDBReader = new DelimitedDataReader(“childReader”,childFileName);
childDBReader.setSkipFirstLine(true);
Node childDBReaderNode = childDBReader;
String dedupKeys={“contortia_id”};
String sortKeys={“contortia_id”};
String joinKeys={“contortia_id”};
//System.out.println(“Join Keys::::”+joinKeys[0]);
Dedup parentDedup=new Dedup(“parentDedup”,dedupKeys,-1);
Node parentDedupNode = parentDedup;
Dedup childDedup=new Dedup(“childDedup”,dedupKeys,-1);
Node childDedupNode = childDedup;
Sort parentSort=new Sort(“parentSort”,sortKeys);
Node parentSortNode=parentSort;
Sort childSort=new Sort(“childSort”,sortKeys);
Node childSortNode=childSort;
OrphanTransform orphanDatTrans=new OrphanTransform();
orphanDatTrans.init();
DataIntersection dataIntrsection=new DataIntersection(“dataIntersection”,joinKeys,orphanDatTrans);
Node dataIntrsectionNode=dataIntrsection;
String outputFile1 = “C:/hanu/feeds/parent.csv”;
String outputFile2 = “C:/hanu/feeds/parent_child.csv”;
String outputFile3 = “C:/hanu/feeds/child.csv”;
XLSWriter xlsReport_A=new XLSWriter(“DataWriter1”,outputFile1,false);
XLSWriter xlsReport_A_B=new XLSWriter(“DataWriter2”,outputFile2,false);
XLSWriter xlsReport_B=new XLSWriter(“DataWriter3”,outputFile3,false);
//XLSWriter xlsReport1=new XLSWriter(“DataWriter2”,outputFile3,false);
//xlsReport.setName(“report1”); xlsReport1.setName(“report2”); Node
// nodeWrite2=xlsReport; Node nodeWrite3=xlsReport1;
Node xlsReport_ANode = new DelimitedDataWriter(“DataWriter1”, outputFile1,
false);
Node xlsReport_A_BNode = new DelimitedDataWriter(“DataWriter2”, outputFile2,
false);
Node xlsReport_BNode = new DelimitedDataWriter(“DataWriter3”, outputFile3,
false);
// /Edges…
Edge egde1 = new Edge(“InEdge1”, parentMetadata);
Edge egde2 = new Edge(“OutEdge2”, childMetadata);
Edge egde3 = new Edge(“OutEdge3”, parentMetadata);
Edge egde4 = new Edge(“OutEdge4”, childMetadata);
Edge egde5 = new Edge(“OutEdge5”, parentMetadata);
Edge egde6 = new Edge(“OutEdge6”, childMetadata);
Edge egde7 = new Edge(“OutEdge7”, parentMetadata);
Edge egde8 = new Edge(“OutEdge8”, mergerd_metada);
Edge egde9 = new Edge(“OutEdge9”, childMetadata);
egde1.connectReader(parentDBReaderNode, 0);
egde1.connectWriter(parentSortNode, 0);
egde2.connectReader(childDBReaderNode, 0);
egde2.connectWriter(childSortNode, 0);
egde3.connectReader(parentSortNode, 0);
egde3.connectWriter(parentDedupNode, 0);
egde4.connectReader(childSortNode, 0);
egde4.connectWriter(childDedupNode, 0);
egde5.connectReader(parentDedupNode, 0);
egde5.connectWriter(dataIntrsectionNode, 0);
egde6.connectReader(childDedupNode, 0);
egde6.connectWriter(dataIntrsectionNode, 1);
// egde4.connectWriter(dbOutPut, 0);
egde7.connectReader(dataIntrsectionNode, 0);
egde7.connectWriter(xlsReport_ANode, 0);
egde8.connectReader(dataIntrsectionNode, 1);
egde8.connectWriter(xlsReport_A_BNode, 0);
egde9.connectReader(dataIntrsectionNode, 2);
egde9.connectWriter(xlsReport_BNode, 0);
////nodes…
parentDBReaderNode.addOutputPort(0, egde1);
childDBReaderNode.addOutputPort(0, egde2);
parentSortNode.addInputPort(0, egde1);
parentSortNode.addOutputPort(0, egde3);
childSortNode.addInputPort(0, egde2);
childSortNode.addOutputPort(0, egde4);
parentDedupNode.addInputPort(0, egde3);
parentDedupNode.addOutputPort(0, egde5);
childDedupNode.addInputPort(0, egde4);
childDedupNode.addOutputPort(0, egde6);
dataIntrsectionNode.addInputPort(0, egde5);
dataIntrsectionNode.addInputPort(1, egde6);
dataIntrsectionNode.addOutputPort(0, egde7);
dataIntrsectionNode.addOutputPort(1, egde8);
dataIntrsectionNode.addOutputPort(2, egde9);
xlsReport_ANode.addInputPort(0, egde7);
xlsReport_A_BNode.addInputPort(0, egde8);
xlsReport_BNode.addInputPort(0, egde9);
// add Edges & Nodes & Phases to graph
try {
//institutionOrphanCheckGraph.addConnection(dbConnection);
//institutionOrphanCheckGraph.addConnection(dbCon);
institutionOrphanCheckGraph.addPhase(PHASE_0);
PHASE_0.addNode(parentDBReaderNode);
PHASE_0.addNode(childDBReaderNode);
PHASE_0.addNode(parentDedupNode);
PHASE_0.addNode(parentSortNode);
PHASE_0.addNode(childDedupNode);
PHASE_0.addNode(childSortNode);
PHASE_0.addNode(dataIntrsectionNode);
PHASE_0.addNode(xlsReport_ANode);
PHASE_0.addNode(xlsReport_A_BNode);
PHASE_0.addNode(xlsReport_BNode);
// PHASE_0.addNode(dbOutPutTable);
// institutionGraph.addPhase(PHASE_0);
institutionOrphanCheckGraph.addEdge(egde1);
institutionOrphanCheckGraph.addEdge(egde2);
institutionOrphanCheckGraph.addEdge(egde3);
institutionOrphanCheckGraph.addEdge(egde4);
institutionOrphanCheckGraph.addEdge(egde5);
institutionOrphanCheckGraph.addEdge(egde6);
institutionOrphanCheckGraph.addEdge(egde7);
institutionOrphanCheckGraph.addEdge(egde8);
institutionOrphanCheckGraph.addEdge(egde9);
// institutionGraph.addEdge(egde6);
} catch (GraphConfigurationException ex) {
System.out.println(“::::::::::::::::::::::::::File Not Found:::::::::::::::::::::::::::::::::::::::::::::::::::::::”);
ex.printStackTrace();
}
// prepare runtime parameters - JMX is turned off
GraphRuntimeContext runtimeContext = new GraphRuntimeContext();
runtimeContext.setUseJMX(false);
GraphExecutor executor = new GraphExecutor();
System.out.println(“Grapgh Executer…” + executor);
System.out.println(“…Graph…” + institutionOrphanCheckGraph);
try {
GraphExecutor.initGraph(institutionOrphanCheckGraph);
} catch (ComponentNotReadyException e) {
System.out.println(“::::::::::::::::::::::::::File Not Found in Compontnt not ready:::::::::::::::::::::::::::::::::::::::::::::::::::::::”);
System.out.println(“Failed graph initialization!\n”
+ e.getMessage());
return false;
}
Future result;
try {
result = executor.runGraph(institutionOrphanCheckGraph, runtimeContext);
Result rs = result.get();
executor.free();
institutionOrphanCheckGraph.free();
System.out.println(result.isDone() + “>>>>>>>>>>>>>>>>>>>>>>>”);
if (result.isDone()) {
System.out.println(“Done”);
isDone = true;
}
if (!result.get().equals(Result.FINISHED_OK)) {
System.out.println(result.get().message());
// System.out.println(“Failed graph execution!”);
// return false;
}
} catch (Exception e) {
System.out.println(“Failed graph execution!\n” + e.getMessage());
// return false;
}
return isDone;
}