Please read HTTP Batch Source first to grab the core principals of pagination, formats parsing etc.
The plugin will work very similar to HTTP Batch Source, but it will make periodic calls to fetch new pages once all initial pages are loaded. Configurations will remain the same.
Configurations
Along with configs which are in batch source there are also these configs:
Section | Name | Description | Default | Widget | Validations |
---|---|---|---|---|---|
General | Track last page changes | If true. Until new page appears, plugin will track the last page available and if something on it changes it will re-emit records from this page. | false | Text Area | For pagination type "None". This is expected to be true. |
Maximum pages per fetch | Maximum number of pages put to RDD. Before it gets submitted to Spark Streaming. Default to "empty". Which means there is no limitation |
Also, error handling configurations will have to change to not have "Send to error", since this is not possible in streaming plugins at the moment.
Implementation
Custom InputDStream will be used instead of Receiver. Since Receiver does not get checkpointed.
public class HttpInputDStream extends InputDStream<StructuredRecord>
HttpInputDStream#compute
Compute method is called every batchInterval seconds by Spark. It returns RDD.
Return RDD by parts?
We don't want to load a million pages at once here. Since in that case Spark Stream will be just be empty waiting for RDD to be returned for a very long time.
So for such a cases I propose to load the pages for maximum batchInterval seconds, before returning them to RDD. And on the next HttpInputDStream#compute call continue from that place.
The interval should stop pages loading only when all the records from it are processed, so that we don't put multiple same records into RDD.
InputDStream return value: pages or structured records? (InputDStream<StructuredRecord> vs InputDStream<BasePage>)
- APPROACH 1. Return structured records from InputDStream.
BasePage children classes contain a lot of fields which are not serializable. Like iterators, http response. The main problem is to serialize http response, which is a large page.
Pros of the approach: no need to load into memory and serialize all the page contents, which might be pretty large.
- APPROACH 2. Return pages from InputDStream.
We can implement this if we save a whole byte array in HttpResponse, so it becomes Serializable. However this makes the application load the whole page contents at one point into memory.
Pros of the approach: parallel parsing of different pages in scope of Spark Stream.
Approach #1 will be used.
State Saving
HttpInputDStream will save an url of the page which was processed last. The variable gets checkpointed. So the application will continue from the next page when restarted.
class HttpInputDStream extends ... { private String lastProcessedPageUrl; private String lastProcessedPageHash; # needed when config.trackLastPageChanges is true }