...
Local dataset is used to store the intermediate result and based on the condition the next phase of execution is decided.
Example :
|----- Table Sink
Stream batch source -> Python Transform -> Condition node ------
|----- Database Sink
In this pipeline, at condition node based on number of records read from stream, we decide whether to store the result in table sink or database sink. if num records processed at python transform is less than 1M records, we use Table sink else we use Database sink. A transform stage after python transform will be responsible for writing to local dataset and adding num.records to workflow token, which will be used in condition node to make decision. <500k records
|----
----| 500k - 1m records
|----
| >1m records
|---- |--- sink1
condition1-| |--- transform transform2 -- sink2
|--- condition2--|
|--- sink3
Then condition nodes can make use of the records count from workflow token and decide to perform appropriate condition, either continue next executing condition node (which can make use of workflow token to perform further operation) or add program to execute next phasehere the next phase will start from sink1 or one of (transform2 or sink3) based on condition2.
API
Connection class should represent information on if-branch or else-branch for connections with condition branch as source.
...
Code Block |
---|
public class ConditionConnection extends Connection { private final String isIfBranch; // true of false to inform if branch is if or else. } |
Option - 3 :
Should this information be stored as part of ETLStage than Connection ?
Code Block | ||
---|---|---|
| ||
public abstract class Condition implements PipelineConfigurable { public static final String PLUGIN_TYPE = "condition"; /** * Implement this method to be executed for condition logic. if this returns true, if branch will be executed, if false then else branch * is executed * @param context the condition context, containing information about the pipeline run * @throws Exception when there is failure in method execution */ public abstract boolean apply(ConditionContext context) throws Exception; } |
...
Code Block | ||
---|---|---|
| ||
public interface ConditionContext extends StageContext, Transactional, SecureStore, SecureStoreManager {
/**
* Returns the logical start time of the batch job which triggers this instance of an action.
* Logical start time is the time when the triggering Batch job is supposed to start if it is
* started by the scheduler. Otherwise it would be the current time when the action runs.
*
* @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
*/
long getLogicalStartTime();
/**
* Return the runtime arguments.
*/
Map<String, String> getArguments();
/**
* @return The application namespace
*/
String getNamespace();
} |
Example Condition Plugin:
Code Block |
---|
public class ThresholdCondition extends Condition {
public static class ThresholdConditionConfig extends PluginConfig {
@Description("Name of the property that will be looked up in workflow token")
private String propertyName;
@Description("Threshold condition will evaluate to true if value of property is above this threshold value")
private Integer thresholdValue;
}
@Override
public boolean apply(ConditionContext context) throws Exception {
...
}
} |
Limitations :
1) If the input records processed from the previous stage hits the threshold, but we are writing to a local dataset, we cannot start the next stage, unless all records from previous stage are processed completely and written to the local dataset. Next stage's input will be sourced from local dataset.
...