Process only unprocessed files

Hi,

Lets assume I have directory with 1000 files.
CloverETL read the data from those files and wrote the data to database after processing.
I execute CloverETL again after 10 minutes, and 10 new files were added to the directory
during this period.
I want only the 10 new files to be processed, and not all the 1010 files.

1. Does ColverETL support this feature? If yes then how?
2. If I assume the name of the file is sequence number, is there an easier solution?

Regards,
Asaf

Hello,
unfortunately our incremental reading feature can’t recognized new files in the time being, but I’ve created an issue (http://bug.cloveretl.com/view.php?id=3130) for this feature.
The easiest way to achieve your goal is to moved successfully processed files into another folder. You can do it directly in Clover with SystemExecute component:

<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1256635798303" name="incrementalMultifile1" >
<Global>
<Metadata id="Metadata0">
<Record fieldDelimiter="|" name="recordName1" recordDelimiter="\n" type="delimited">
<Field name="field1" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata1" >
<Record fieldDelimiter="|" name="twoFields"  recordDelimiter="\n" type="delimited">
<Field auto_filling="source_name" name="fileName" type="string"/>
<Field name="field2" type="string"/>
</Record>
</Metadata>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
</Global>
<Phase number="0">
<Node fileURL="${DATAIN_DIR}/*" id="DATA_READER0" type="DATA_READER"/>
<Node id="SIMPLE_COPY0" type="SIMPLE_COPY"/>
<Node id="TRASH0" type="TRASH"/>
<Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (output)" toNode="SIMPLE_COPY0:0"/>
<Edge fromNode="SIMPLE_COPY0:0" id="Edge1" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (out)" toNode="TRASH0:0"/>
<Edge fromNode="SIMPLE_COPY0:1" id="Edge2" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 1 (out)" toNode="REFORMAT0:0"/>
</Phase>
<Phase number="1">
<Node id="REFORMAT0" type="REFORMAT">
<attr name="transform"><![CDATA[//#TL
string fileName = '';
// Transforms input record into output record.
function transform() {
	if ($0.fileName.eq.fileName) return SKIP;
	fileName = $0.fileName;
	$0.field1 := $0.fileName;
}
]]></attr>
</Node>
<Node id="SYS_EXECUTE0" interpreter="xargs sh ${}" type="SYS_EXECUTE">
<attr name="command"><![CDATA[mv -t processed-in $@
]]></attr>
</Node>
<Edge debugMode="true" fromNode="REFORMAT0:0" id="Edge3" inPort="Port 0 (input for command)" metadata="Metadata0" outPort="Port 0 (out)" toNode="SYS_EXECUTE0:0"/>
</Phase>
</Graph>

or, if the file name contains increasing number, you can add DataGenerator component, that prepares file names to read before your graph:

<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1256559327644" name="incrementalMultifile" revision="1.35">
<Global>
<Metadata id="Metadata0" >
<Record fieldDelimiter="|" name="recordName1"  recordDelimiter="\n" type="delimited">
<Field name="field1" type="string"/>
</Record>
</Metadata>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
</Global>
<Phase number="0">
<Node generateURL="${TRANS_DIR}/readFiles.java" id="DATA_GENERATOR0" recordsNumber="1000" type="DATA_GENERATOR"/>
<Node fileURL="port:$0.field1:source" id="DATA_READER0" type="DATA_READER"/>
<Node debugPrint="true" id="TRASH0" type="TRASH"/>
<Edge debugMode="true" fromNode="DATA_GENERATOR0:0" id="Edge2" inPort="Port 0 (input)" metadata="Metadata0" outPort="Port 0 (out)" toNode="DATA_READER0:0"/>
<Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (output)" toNode="TRASH0:0"/>
</Phase>
</Graph>

with readFiles.java:

import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.jetel.component.DataRecordGenerate;
import org.jetel.data.DataRecord;
import org.jetel.exception.ComponentNotReadyException;
import org.jetel.exception.TransformException;


public class readFiles extends DataRecordGenerate {
	
	private String NUMBER_FILE;
	private final static Pattern FILE_PATTERN = Pattern.compile("\\D*(\\d+)\\D*");
	
	private int lastNumber = -1;
	private File numberFile;

	private File[] inFile;

	private int index;
	protected int currentNumber;
	
	public boolean init() throws ComponentNotReadyException {
		NUMBER_FILE=  getGraph().getGraphProperties().getStringProperty("DATATMP_DIR") + "/inc.bin"; 
		numberFile = new File(NUMBER_FILE);
		if (numberFile.exists()) {
			try {
				lastNumber = (new ObjectInputStream(new FileInputStream(numberFile))).readInt();
			} catch (Exception e) {
				throw new ComponentNotReadyException(e);
			} 
		}
		inFile = (new File(getGraph().getGraphProperties().getStringProperty("DATAIN_DIR")).listFiles(new FileFilter() {
			public boolean accept(File pathname) {
				if (lastNumber == -1) return true;
				Matcher fnm = FILE_PATTERN.matcher(pathname.getName());
				if (fnm.find()) {
					currentNumber = Integer.parseInt(fnm.group(1));
					return currentNumber > lastNumber;
				}	
				return false;
			}
		}));
		index = 0;
		return super.init();
	}

	public int generate(DataRecord[] arg0) throws TransformException {
		if (index >= inFile.length) return SKIP;
		Matcher fnm = FILE_PATTERN.matcher(inFile[index].getName());
		if (fnm.find()) {
			currentNumber = Integer.parseInt(fnm.group(1));
			if (currentNumber > lastNumber) {
				lastNumber = currentNumber;
			}
			arg0[0].getField(0).setValue(inFile[index++].getAbsolutePath());
			return 0;
		}
		return SKIP;
	}

	public void finished() {
		try {
			if (!numberFile.exists()) {
					numberFile.createNewFile();
			}
			ObjectOutputStream writer = new ObjectOutputStream(new FileOutputStream(numberFile));
			writer.writeInt(lastNumber);
			writer.flush();
			writer.close();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		super.finished();
	}
}

Hi Agata,

Thanks a lot for your comprehensive response.

When I tried the DATA_GENERATOR approach I got the following error:
(org.jetel.graph.runtime.WatchDog executePhase:449) ERROR - Phase initialization failed with reason: DATA_READER …FAILED !
Reason: The field not found for the statement: ‘port:$0.field1:source’

Can you please assist with this error?

Hello asaf,
you shouldn’t have ‘port:$0.field1:source’ in the DataGenerator. This string should be used as fileURL in DataReader.