Hi,
i AM AGETTING ERROR …
Sorter childSort has no more capacity to sort additional records.The output will be incomplete !
My code is …
public boolean processOrphanCheck(String dedupKeys1,String sortKeys1,String joinKeys1){
//String dbFields_FeedStatus={“feed_files_id”, “set_no”, “line”, “line_num”,“error_level”,“error_type”, “occurance”, “field_name”,“updated_at”,“updated_by”};
CloverDBField cloverDBField=new CloverDBField();
String dbFields_FeedStatus=cloverDBField.getCloverDBFields(“feed_errors”);
System.out.println("dedupKeys1: "+dedupKeys1[0]);
System.out.println(“sortKeys1 :”+sortKeys1[0]);
System.out.println("joinKeys1 : "+joinKeys1[0]);
extraInfoForTranform=new Properties();
extraInfoForTranform.put(“occurance”, 1);
extraInfoForTranform.put(“feed_files_id”, controller.feed_files_id);
extraInfoForTranform.put(“set_no”, controller.setno);
System.out.println(“^^^^^^^^^^^^^^^^^^^^File Size^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^”+controller.file_size);
extraInfoForTranform.put(“updated_by”, “System”);
String childFileName=controller.inputFileName;
System.out.println(“Parent File :”+this.parentFileName);
System.out.println(“Child File :”+childFileName);
boolean isDone = false;
DBConnection dbCon;
Phase PHASE_0 = new Phase(1);
// initialization; must be present
EngineInitializer.initEngine(null, null, null);
System.out
.println(“*******************************************************************************************”);
DataBaseConnection dbConn=new DataBaseConnection();
dbCon=dbConn.getConnection();
System.out.println(“…Db connections…”+dbCon);
try {
System.out.println(“Id of the DB Connection:::”+dbCon.getId());
dbCon.init();
} catch (Exception ex) {
ex.printStackTrace();
}
DataRecordMetadata childMetadata=controller.MetadataInput;
DataRecordMetadata parentMetadata=metadataInput;
childMetadata.setRecordDelimiters(SystemDelimiters.getLineSeperator());
DataRecordMetadata metadataDBError=ManageMetaData.getMetaData(“feed_errors.fmt”);
/*DataRecordMetadataXMLReaderWriter reader=new DataRecordMetadataXMLReaderWriter();
try{
metadataDBError=reader.read(new FileInputStream(“metadata/feed_errors.fmt”));
}catch(IOException ex){
isDone=false;
System.err.println(“Error when reading metadata!!”);
throw new RuntimeException(ex);
}
if (metadataDBError==null){
throw new RuntimeException(“No INPUT metadata”);
}
*/
Map parentmap=parentMetadata.getFieldNames();
String str=parentMetadata.getFieldDelimiters();
int parentMetaDataSize=parentMetadata.getNumFields();
System.out.println(str[0]+“delimiter”);
System.out.println(“Parent metadata size”+parentMetaDataSize);
Set entrySet1 =parentmap.entrySet();
Iterator iter1=entrySet1.iterator();
while (iter1.hasNext()) {
Map.Entry entry1 = (Map.Entry) iter1.next();
System.out.println(entry1.getKey()+“—parent—”+entry1.getValue());
}
Map childmap=childMetadata.getFieldNames();
Set entrySet =childmap.entrySet();
String common=null;
Iterator iter=entrySet.iterator();
int childMetaDataSize=childMetadata.getNumFields();
System.out.println(“Child metadata size”+childMetaDataSize);
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
System.out.println(entry.getKey()+“—child—”+entry.getValue());
}
TransformationGraph institutionOrphanCheckGraph = new TransformationGraph();
/* String parentQuery=“select code from feed_institution”;
String childQuery=“select pub_code from ip_institutions”;*/
// Nodes…
System.out.println(“Parent File:”+parentFileName);
System.out.println(“Child File:”+childFileName);
DelimitedDataReader parentDBReader = new DelimitedDataReader(“parentReader”,parentFileName);
parentDBReader.setSkipFirstLine(true);
Node parentDBReaderNode = parentDBReader;
// System.out.println(“Child File”+controller.inputFileName);
DelimitedDataReader childDBReader = new DelimitedDataReader(“childReader”,childFileName);
childDBReader.setSkipFirstLine(true);
Node childDBReaderNode = childDBReader;
/* String dedupKeys={“contortia_id”};
String sortKeys={“contortia_id”};
String joinKeys={“contortia_id”};*/
ToUpperCase toUpperCase=new ToUpperCase();
toUpperCase.setMetaDataSize(parentMetaDataSize);
toUpperCase.init(extraInfoForTranform);
Reformat parentReformToUpper=new Reformat(“parentReformToUpper”,toUpperCase);
Node parentReformToUpperNode=parentReformToUpper;
ToUpperCase toUpperCase1=new ToUpperCase();
toUpperCase1.setMetaDataSize(childMetaDataSize);
toUpperCase1.init(extraInfoForTranform);
Reformat childReformToUpper=new Reformat(“childReformToUpper”,toUpperCase1);
Node childReformToUpperNode=childReformToUpper;
String dedupKeys=dedupKeys1;
String sortKeys=sortKeys1;
String joinKeys=joinKeys1;
System.out.println(“Join Keys::::”+joinKeys[0]);//
Dedup parentDedup=new Dedup(“parentDedup”,dedupKeys,-1);
Node parentDedupNode = parentDedup;
Dedup childDedup=new Dedup(“childDedup”,dedupKeys,-1);
Node childDedupNode = childDedup;
Sort parentSort=new Sort(“parentSort”,sortKeys);
Node parentSortNode=parentSort;
Sort childSort=new Sort(“childSort”,sortKeys);
Node childSortNode=childSort;
IntegrationTransform integrationTrans=new IntegrationTransform();
integrationTrans.init();
integrationTrans.setcolumnNamesList(controller.columnNamesList);
integrationTrans.setMetaDataSize(parentMetaDataSize);
DataIntersection dataIntrsection=new DataIntersection(“dataIntersection”,joinKeys,integrationTrans);
Node dataIntrsectionNode=dataIntrsection;
Trash trash=new Trash(“trash”);
Node xlsReport_A_BNode=trash;
Trash trash2=new Trash(“trash2”);
Node xlsReport_ANode=trash2;
DBOutputTable orphan = new DBOutputTable(“DBOutPutTable1”, “Conn0”,
“feed_errors”);
orphan.setDBFields(dbFields_FeedStatus);
orphan.setCloverFields(dbFields_FeedStatus);
Node orphanNode = orphan;
OrphanTransform orphanTrans=new OrphanTransform();
// System.out.println(“property fille value”+controller.extraInfoForTranform);
orphanTrans.init(extraInfoForTranform);
orphanTrans.setorphanFields(joinKeys);
orphanTrans.setFieldDisplayMap(controller.field_display);
Reformat orphanTransReformat = new Reformat(“transform1”, orphanTrans);
Node orphanTransReformatNode = orphanTransReformat;
DBOutputTable missingInChild = new DBOutputTable(“DBOutPutTable2”, “Conn0”,
“feed_errors”);
missingInChild.setDBFields(dbFields_FeedStatus);
missingInChild.setCloverFields(dbFields_FeedStatus);
Node missingInChildNode = missingInChild;
MissingOrphanTransform missingOrphanTrans=new MissingOrphanTransform();
missingOrphanTrans.init(extraInfoForTranform);
missingOrphanTrans.setmissingFields(joinKeys);
missingOrphanTrans.setFieldDisplayMap(controller.field_display);
Reformat missingOrphanTransReformat = new Reformat(“transform2”, missingOrphanTrans);
Node missingOrphanTransReformatNode = missingOrphanTransReformat;
// /Edges…
Edge egde0 = new Edge(“InEdge0”, parentMetadata);
Edge egde00 = new Edge(“OutEdge00”, childMetadata);
Edge egde1 = new Edge(“InEdge1”, parentMetadata);
Edge egde2 = new Edge(“OutEdge2”, childMetadata);
Edge egde3 = new Edge(“OutEdge3”, parentMetadata);
Edge egde4 = new Edge(“OutEdge4”, childMetadata);
Edge egde5 = new Edge(“OutEdge5”, parentMetadata);
Edge egde6 = new Edge(“OutEdge6”, childMetadata);
Edge egde7 = new Edge(“OutEdge7”, parentMetadata);
Edge egde8 = new Edge(“OutEdge8”, parentMetadata);
Edge egde9 = new Edge(“OutEdge9”, childMetadata);
Edge egde10 = new Edge(“OutEdge10”, metadataDBError);
//Edge egde11 = new Edge(“OutEdge11”, metadataDBError);
egde0.connectReader(parentDBReaderNode, 0);
egde0.connectWriter(parentReformToUpperNode, 0);
egde00.connectReader(childDBReaderNode, 0);
egde00.connectWriter(childReformToUpperNode, 0);
egde1.connectReader(parentReformToUpperNode, 0);
egde1.connectWriter(parentSortNode, 0);
egde2.connectReader(childReformToUpperNode, 0);
egde2.connectWriter(childSortNode, 0);
egde3.connectReader(parentSortNode, 0);
egde3.connectWriter(parentDedupNode, 0);
egde4.connectReader(childSortNode, 0);
egde4.connectWriter(childDedupNode, 0);
egde5.connectReader(parentDedupNode, 0);
egde5.connectWriter(dataIntrsectionNode, 0);
egde6.connectReader(childDedupNode, 0);
egde6.connectWriter(dataIntrsectionNode, 1);
// egde4.connectWriter(dbOutPut, 0);
egde7.connectReader(dataIntrsectionNode, 0);
egde7.connectWriter(xlsReport_ANode, 0);
egde8.connectReader(dataIntrsectionNode, 1);
egde8.connectWriter(xlsReport_A_BNode, 0);
egde9.connectReader(dataIntrsectionNode, 2);
egde9.connectWriter(orphanTransReformatNode, 0);
egde10.connectReader(orphanTransReformatNode, 0);
egde10.connectWriter(orphanNode, 0);
//egde11.connectReader(orphanTransReformatNode, 0);
//connectWriter(orphanNode, 0);
////nodes…
parentDBReaderNode.addOutputPort(0, egde0);
childDBReaderNode.addOutputPort(0, egde00);
parentReformToUpperNode.addInputPort(0, egde0);
parentReformToUpperNode.addOutputPort(0, egde1);
childReformToUpperNode.addInputPort(0, egde00);
childReformToUpperNode.addOutputPort(0, egde2);
parentSortNode.addInputPort(0, egde1);
parentSortNode.addOutputPort(0, egde3);
childSortNode.addInputPort(0, egde2);
childSortNode.addOutputPort(0, egde4);
parentDedupNode.addInputPort(0, egde3);
parentDedupNode.addOutputPort(0, egde5);
childDedupNode.addInputPort(0, egde4);
childDedupNode.addOutputPort(0, egde6);
dataIntrsectionNode.addInputPort(0, egde5);
dataIntrsectionNode.addInputPort(1, egde6);
dataIntrsectionNode.addOutputPort(0, egde7);
dataIntrsectionNode.addOutputPort(1, egde8);
dataIntrsectionNode.addOutputPort(2, egde9);
xlsReport_ANode.addInputPort(0,egde7);
xlsReport_A_BNode.addInputPort(0, egde8);
//missingOrphanTransReformatNode.addInputPort(0, egde7);
//missingOrphanTransReformatNode.addOutputPort(0, egde10);
orphanTransReformatNode.addInputPort(0, egde9);
orphanTransReformatNode.addOutputPort(0, egde10);
//missingInChildNode.addInputPort(0, egde10);
orphanNode.addInputPort(0, egde10);
//xlsReport_ANode.addInputPort(0, egde7);
//xlsReport_BNode.addInputPort(0, egde9);
// add Edges & Nodes & Phases to graph
try {
//institutionOrphanCheckGraph.addConnection(dbConnection);
institutionOrphanCheckGraph.addConnection(dbCon);
institutionOrphanCheckGraph.addPhase(PHASE_0);
PHASE_0.addNode(parentDBReaderNode);
PHASE_0.addNode(childDBReaderNode);
PHASE_0.addNode(parentReformToUpperNode);
PHASE_0.addNode(childReformToUpperNode);
PHASE_0.addNode(parentDedupNode);
PHASE_0.addNode(parentSortNode);
PHASE_0.addNode(childDedupNode);
PHASE_0.addNode(childSortNode);
PHASE_0.addNode(dataIntrsectionNode);
PHASE_0.addNode(xlsReport_A_BNode);
//PHASE_0.addNode(missingOrphanTransReformatNode);
PHASE_0.addNode(orphanTransReformatNode);
PHASE_0.addNode(xlsReport_ANode);
//PHASE_0.addNode(missingInChildNode);
PHASE_0.addNode(orphanNode);
//PHASE_0.addNode(xlsReport_BNode);
// PHASE_0.addNode(dbOutPutTable);
// institutionGraph.addPhase(PHASE_0);
institutionOrphanCheckGraph.addEdge(egde0);
institutionOrphanCheckGraph.addEdge(egde00);
institutionOrphanCheckGraph.addEdge(egde1);
institutionOrphanCheckGraph.addEdge(egde2);
institutionOrphanCheckGraph.addEdge(egde3);
institutionOrphanCheckGraph.addEdge(egde4);
institutionOrphanCheckGraph.addEdge(egde5);
institutionOrphanCheckGraph.addEdge(egde6);
institutionOrphanCheckGraph.addEdge(egde7);
institutionOrphanCheckGraph.addEdge(egde8);
institutionOrphanCheckGraph.addEdge(egde9);
institutionOrphanCheckGraph.addEdge(egde10);
//institutionOrphanCheckGraph.addEdge(egde11);
// institutionGraph.addEdge(egde6);
} catch (GraphConfigurationException ex) {
isDone=false;
System.out.println(“::::::::::::::::::::::::::File Not Found:::::::::::::::::::::::::::::::::::::::::::::::::::::::”);
ex.printStackTrace();
}
// prepare runtime parameters - JMX is turned off
GraphRuntimeContext runtimeContext = new GraphRuntimeContext();
runtimeContext.setUseJMX(false);
GraphExecutor executor = new GraphExecutor();
System.out.println(“Grapgh Executer…” + executor);
System.out.println(“…Graph…” + institutionOrphanCheckGraph);
try {
GraphExecutor.initGraph(institutionOrphanCheckGraph);
} catch (ComponentNotReadyException e) {
System.out.println(“::::::::::::::::::::::::::File Not Found in Compontnt not ready:::::::::::::::::::::::::::::::::::::::::::::::::::::::”);
System.out.println(“Failed graph initialization!\n”
+ e.getMessage());
isDone=false;
return false;
}
Future result;
try {
result = executor.runGraph(institutionOrphanCheckGraph, runtimeContext);
Result rs = result.get();
executor.free();
institutionOrphanCheckGraph.free();
System.out.println(result.isDone() + “>>>>>>>>>>>>>>>>>>>>>>>”);
if (result.isDone()) {
System.out.println(“Done”);
isDone = true;
}
if (!result.get().equals(Result.FINISHED_OK)) {
System.out.println(result.get().message());
// System.out.println(“Failed graph execution!”);
// return false;
}
} catch (Exception e) {
isDone=false;
System.out.println(“Failed graph execution!\n” + e.getMessage());
// return false;
}
return isDone;
}
}