What component would I use if I wanted to scan a sorted input dataset on a key to generate a running total or some implicit grouping of records?
Here’s an example:
K: the key that can be based on one or more fields
D: some data
S: data indicator for switching the key context
G: a grouping based on the key and “seeing” the switch
I may have the following dataset:
K1 D1
K1 D2
K1 S
K1 D3
K1 D4
K1 D5
K1 S
K2 D1
K2 S
K2 D2
K2 D3
K2 S
I might want to create a running counter or group them together by creating
the latter two fields:
K1 D1 1 G1
K1 D2 2 G1
K1 S 3 G1
K1 D3 1 G2
K1 D4 2 G2
K1 D5 3 G2
K1 S 4 G2
...
To achieve this aim, you need to use Rollup component with following code:
//#TL
string SWITCH_VALUE = "S";
int count;
int groupCount = 0;
boolean wasSwitch;
// Called for the first data record in a new group. Starts the parsing of the new group.
function initGroup(groupAccumulator) {
count = 0;
groupCount++;
wasSwitch = false;
}
// Called for each data record in the group (including the first one and the last one).
// Implicitly returns false => updateTransform() is not called. When returns true, calls updateTransform().
function updateGroup(groupAccumulator) {
if (wasSwitch) {//next group
count = 1;
groupCount++;
wasSwitch = false;
}else{//group continues
count++;
if ($value == SWITCH_VALUE) {//indicator for switching the key context found
wasSwitch = true;
}
}
return true;//we need to call updateTransform function
}
// Called to transform data records that have been parsed so far into user-specified number of output data record(s).
// Counter (incremented by 1 starting from 0) stores the number of previous calls to this method for the current group update.
// Group accumulator can optionally be used.
// Function implicitly returns SKIP to skip sending any data records to output.
// Returning ALL causes each data record to be sent to all output port(s).
// Can also return a number of the output port to which individual data record should be sent.
function updateTransform(counter, groupAccumulator) {
if (counter > 0) return SKIP;//call this function once only for each input record
$key := $key; //mapping of input field
$value := $value; //mapping of input field
$counter := count; //create additional information
$groupping := "G" + groupCount; //create additional information
return ALL //by default the function returns SKIP, so we need to inform the component to send record on output port
}
// Called for the last data records in all groups sequentially, but only after all incoming data records have been parsed.
// Implicitly returns true => transform() is called for the whole group.
function finishGroup(groupAccumulator) {
return false;//we have transformed and sent to output all records in updateTransform function
}
// Called to transform the whole group of incoming data record(s) into user-specified number of output data record(s).
// Counter (incremented by 1 starting from 0) stores the number of previous calls to this method for the current group update.
// Group accumulator can optionally be used.
// Function implicitly returns SKIP to skip sending any data records to output.
// Returning ALL causes each data record to be sent to all output port(s).
// Can also return a number of the output port to which individual data record should be sent.
function transform(counter, groupAccumulator) {
//this function is never called as finishGroup returns false
}
The above implementation doesn’t use group accumulator at all. Following implementation does the same , but with group accumulator usage:
//#TL
string SWITCH_VALUE = "S";
int count;
int groupCount = 0;
boolean wasSwitch;
// Called for the first data record in a new group. Starts the parsing of the new group.
function initGroup(groupAccumulator) {
groupCount++;
wasSwitch = false;
groupAccumulator["counter"] = 0;
groupAccumulator["groupping"] = "G" + groupCount;
}
// Called for each data record in the group (including the first one and the last one).
// Implicitly returns false => updateTransform() is not called. When returns true, calls updateTransform().
function updateGroup(groupAccumulator) {
if (wasSwitch) {
groupAccumulator["counter"] = 1;
groupCount++;
groupAccumulator["groupping"] = "G" + groupCount;
wasSwitch = false;
}else{
groupAccumulator["counter"] = groupAccumulator["counter"] + 1;
if ($value == SWITCH_VALUE) {
wasSwitch = true;
}
}
return true;
}
// Called for the last data records in all groups sequentially, but only after all incoming data records have been parsed.
// Implicitly returns true => transform() is called for the whole group.
function finishGroup(groupAccumulator) {
return false;
}
// Called to transform data records that have been parsed so far into user-specified number of output data record(s).
// Counter (incremented by 1 starting from 0) stores the number of previous calls to this method for the current group update.
// Group accumulator can optionally be used.
// Function implicitly returns SKIP to skip sending any data records to output.
// Returning ALL causes each data record to be sent to all output port(s).
// Can also return a number of the output port to which individual data record should be sent.
function updateTransform(counter, groupAccumulator) {
if (counter > 0) return SKIP;
$key := $key;
$value := $value;
$counter := groupAccumulator["counter"];
$groupping := groupAccumulator["groupping"];
return ALL
}
// Called to transform the whole group of incoming data record(s) into user-specified number of output data record(s).
// Counter (incremented by 1 starting from 0) stores the number of previous calls to this method for the current group update.
// Group accumulator can optionally be used.
// Function implicitly returns SKIP to skip sending any data records to output.
// Returning ALL causes each data record to be sent to all output port(s).
// Can also return a number of the output port to which individual data record should be sent.
function transform(counter, groupAccumulator) {
}
}