Sunday, May 3, 2020

reliable receiver VS unreliable receiver

Receiver's API

Among technical considerations, receivers can be built by extending abstract class Receiver. Two methods must be implemented: onStart() and onStop(), respectively used to, start and stop data consumption. The first method can contain all necessary initialization for data reading, such as opening connections, creating new threads etc.

But be careful, onStart() must be non-blocking. Thus to be able to properly start and stop receiver, data retrieval should be implement in new thread. Otherwise, even if streaming context exceeds its timeout, it won't be stopped because of blocking onStart() method.

Data is retrieved in new thread created and started by onStart() method but how it's sent to Spark ? Actually there are 2 methods to move data from receiver to Spark context, both used in two different types of receivers. Before talking about these methods, let's explain what are receiver types:

  • reliable receiver - this receiver acknowledges data sources when data is received and replicated successfully in Spark storage.
  • unreliable receiver - in this situation, the ack is not sent to the source.

Under-the-hood, the difference between them comes from the method used to send data to Spark. For the case of reliable receiver, data is sent through store(...) method taking in parameter collection-like objects (Iterator, ByteBuffer or ArrayBuffer). It's a blocking method which doesn't return as long as Spark doesn't notify receiver about successful data save. After returning, receiver can acknowledge source about data reception.

In the other side, unreliable receiver uses store(...) method taking in parameter a single object. This method is not blocking but it doesn't sent data immediately to Spark. Instead of that, it keeps data in memory and sends it as batch to Spark after accumulating some number of items.

Sometimes receiver can fail. In this case, receiver can be restarted through restart(...) method. However, the restart is not immediate, it's only scheduled. The restart execution consists on calling onStop() and onStart() methods subsequently withint configured delay (spark.streaming.receiverRestartDelay property).

No comments:

Post a Comment

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...