Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
This design provides the capability for passing information between the triggering pipeline and triggered pipeline based on the program status based scheduling.
Goals
Provides clear API and backend design for passing program status change event payload between pipelines.
User Stories
- Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the username and password used by Pipeline A
- Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the stream id Pipeline A reads from
- Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the schema of a sink Pipeline A writes to.
Design
A new method will be provided in RuntimeContext
to return a new API class TriggeringScheduleInfo
to provide information of the schedule which launches the program. All the runtime arguments, stage configurations, user workflow tokens of the pipeline run which triggers the schedule will be included in the . To use these properties, users can define a mapping between a certain property from the triggering program and a runtime argument in the triggered program. Such mapping should be stored in the schedule properties with the key "triggering.properties.mapping". The app containing the triggered program should be able to decode the value contained in the field "triggering.properties.mapping" and get the corresponding properties from TriggeringScheduleInfo
and use the values as runtime arguments according to the properties mapping.
For pipeline app, the syntax for referring to different properties in the filed "triggering.properties.mapping" is defined below:
The syntax for runtime args from the triggering pipeline is: runtime-arg:<namespace>:<pipeline-name>#<runtime-arg-key>
Stage configuration from triggering pipeline: pipeline-config:<namespace>:<pipeline-name>#<pipeline-stage>:<stage-config-key>
User tokens from the triggering pipeline: token:<namespace>:<pipeline-name>#<node-name>:name>#<user-token-key> :<node-name>
Approach
Here's an example use case to illustrate the approach: Pipeline A in namespace Default triggers Pipeline B when Pipeline A completes. Pipeline B needs the runtime args hostname from Pipeline A as the value of its host field in runtime args. For instance, For instance, Pipeline A contains runtime args pair "hostname" -> "wiki.cask.co" , then in Pipeline B, the value of "host" should be "wiki.cask.co"
When setting up program status schedule, an entry "runtime-arg:default:A#hostname" -> "host" will be stored in a map, and this map will be converted to a JSON String to be stored as a value of the field "triggering.properties.mapping" in the schedule properties. When the notification of Pipeline A's completion triggers the schedule to launch Pipeline B, in the Pipeline app of Pipeline B, the field "triggering.properties.mapping" is extracted from schedule properties. From the decoded map, Pipeline app recognizes the key "runtime-arg:default:A:#hostname" means a runtime argument with key "hostname" in the Pipeline with name A in the default namespace. Pipeline app thus looks this runtime argument value in Pipeline A from TriggeringScheduleInfo
from the RuntimeContext , and provides it as the value for runtime argument "host" in Pipeline B
API changes
New Programmatic APIs
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This interface represents a context for a processor or elements of a processor. */ public interface RuntimeContext { ... /** * @return {@link TriggeringScheduleInfo} if the program is triggered by a schedule. Otherwise, returns {@code null}. */ @Nullable TriggeringScheduleInfo getTriggeringScheduleInfo(); ... } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The information of a schedule that tocan be passedused toby the program launched by itthe schedule. */ public class TriggeringScheduleInfo { private final String name; private final String description; private final TriggerInfo triggerInfo; private final Map<String, String> properties; public TriggeringScheduleInfo(String name, String description, TriggerInfo triggerInfo, private final Map<String, List<ApplicationSpecification>> namespaceApplicationSpecificationsMap; private final Map<String, WorkflowToken> userWorkflowTokenMap; private final Map<String, Map<String, String>> runtimeArgumentsMap; public TriggeringScheduleInfo(String name, String description, Map<String, String> properties, Map<String, String> properties) { this.name = name; this.description = description; this.properties = properties; this.triggerInfo = triggerInfo; } /** * @return Schedule's name, which is unique in an application. */ public String getName() { return name; } /** * @return Description of the schedule. */ public String getDescription() { return description; } /** * @return Information of the trigger contained in this schedule. */ public TriggerInfo getTriggerInfo() { return triggerInfo; } /** * @return Properties of the schedule. */ public Map<String, String> getProperties() { return properties; } } |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Base class for the trigger information to be passed to the triggered program.
*/
public abstract class TriggerInfo {
private final Trigger.Type type;
public TriggerInfo(Trigger.Type type) {
this.type = type;
}
/**
* @return The type of the trigger.
*/
public Trigger.Type getType() {
return type;
}
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The program status trigger information to be passed to the triggered program. */ public class ProgramStatusTriggerInfo extends TriggerInfo { private final String namespace; private final ApplicationSpecification applicationSpecification; private final ProgramType programType; private final String program; private final Set<ProgramStatus> triggerStatues; @Nullable private final String runId; @Nullable private final ProgramStatus programStatus; @Nullable private final WorkflowToken workflowToken; @Nullable private final Map<String, String> runtimeArguments; public ProgramStatusTriggerInfo(String namespace, ApplicationSpecification applicationSpecification, ProgramType programType, String Map<Stringprogram, List<ApplicationSpecification>>Set<ProgramStatus> namespaceApplicationSpecificationsMaptriggerStatues, @Nullable String Map<StringrunId, WorkflowToken>@Nullable ProgramStatus userWorkflowTokenMapprogramStatus, @Nullable Map<String,WorkflowToken workflowToken, @Nullable Map<String, String>>String> runtimeArgumentsMapruntimeArguments) { super(Trigger.Type.PROGRAM_STATUS); this.namespace = namespace; this.applicationSpecification = applicationSpecification; this.programType = programType; this.nameprogram = nameprogram; this.descriptiontriggerStatues = descriptiontriggerStatues; this.propertiesrunId = propertiesrunId; this.namespaceApplicationSpecificationsMapprogramStatus = namespaceApplicationSpecificationsMapprogramStatus; this.userWorkflowTokenMapworkflowToken = userWorkflowTokenMapworkflowToken; this.runtimeArgumentsMapruntimeArguments = runtimeArgumentsMapruntimeArguments; } /** * @return Schedule's name, which is unique in an applicationThe namespace of the triggering program. */ public String getNamegetNamespace() { return namenamespace; } /** * @return The application Descriptionspecification of the application that contains the triggering scheduleprogram. */ public StringApplicationSpecification getDescriptiongetApplicationSpecification() { return descriptionapplicationSpecification; } /** * @return The program Propertiestype of the scheduletriggering program. */ public Map<String, String> getPropertiesProgramType getProgramType() { return propertiesprogramType; } /** * @return A map with namespace names as keys, and the list of application specification of the parent applications * which contain the programs that trigger the schedule as values The program name of the triggering program. */ public String getProgram() { return program; } /** * @return All the program statuses that can satisfy the program status trigger. */ Map<String,public List<ApplicationSpecification>>Set<ProgramStatus> getNamespaceApplicationSpecificationsMapgetTriggerStatues() { return namespaceApplicationSpecificationsMaptriggerStatues; } /** * @return A map with workflow ID String as keys in the format of The program run Id of the triggering program run that can satisfy the program status trigger, * or {@code null} "namespace:application:application-version:workflow-name", or "namespace:application:workflow-name" if there is no such run. */ @Nullable public String getRunId() { return runId; } if the application version is of default version "-SNAPSHOT", and the corresponding user-scope /** * @return The program status of the triggering program run that can satisfy the program status trigger, * or {@code null} if there workflowis tokenno assuch valuesrun. */ @Nullable public Map<String, WorkflowToken> getUserWorkflowTokenMapProgramStatus getProgramStatus() { return userWorkflowTokenMapprogramStatus; } /** * @return The Aworkflow token mapif withthe program is IDa Stringworkflow aswith keysa inrun thethat formatcan of * * satisfy "namespace:application:application-version:program-type:program-name", the program status trigger, or an empty workflow token if there's no such run. * orReturn "namespace:application:program-type:program-name"{@code null} if the applicationprogram versionis isnot ofa workflow. */ @Nullable public WorkflowToken getWorkflowToken() { return workflowToken; } default version "-SNAPSHOT", and the /** * @return The runtime arguments key-value pairs as values of the triggering program run that can satisfy the program status trigger, * or {@code null} if there is no such run. */ @Nullable public Map<String, Map<String, String>> getRuntimeArgumentsMapString> getRuntimeArguments() { return runtimeArgumentsMapruntimeArguments; } } |
UI Impact or Changes
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3