관리 메뉴

HAMA 블로그

Kafka, Storm, Spark Streaming 의 메세지 보증 본문

오픈소스, 미들웨어

Kafka, Storm, Spark Streaming 의 메세지 보증

[하마] 이승현 (wowlsh93@gmail.com) 2016. 4. 20. 14:41

Apache Storm 버전 : Storm 1.0.0 released (12 Apr 2016)

Apache Kafka 버전 : 0.9.0.1 (2016년 4월 20일 글쓴시점) 

Spark Streaming 버전 : 1.6.1 (2016년 3월 9일)  


Apache Storm 

스톰은 메세지전달 실패시 바로잡는 능력(fault tolerant) 이 있으며,  다양한 레벨의 메세지 보증 전f략을 가지고 있다. 

  1. at-most-once:  이 모드에서는 만약 실패나 타임아웃이 발생했을때 메세지를 버릴수 있으며, 이 모드는 특별한 핸들링을 요구하지 않으며, 메세지들은 스파웃에 의해 생성된 순서대로 처리된다.
  2. at-least-once:  이 모드는 각각의 스파웃 튜플이 설정된 타임아웃 안에서 "fully" 진행상태인지 아닌지를 추적한다. 어떤 인풋 튜플이 타임아웃 안에 fully 진행 상황이 아니면 다시 발행(re-emiited) 된다.  이 의미는 동일한 튜플이 한번 더 진행될 수 도 있다는 이야기 이며, 순서가 바뀔 수 도 있다. 
  3. exactly-once: Trident 와 함께 사용될때, 스톰은 " exactly-once" 보증을 제공한다. 


읽을꺼리 : Exactly-Once Processing with Trident - The Fake Truth


Apache Kafka 

How do I get exactly-once messaging from Kafka?

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production:

  1. Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
  2. Include a primary key (UUID or something) in the message and deduplicate on the consumer.


If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

  1. Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  2. The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon


읽을꺼리 :  Kafka Clients (At-most-once, At-least-once, Exactly-once, and Avro Client)


Spark Streaming

Fault-tolerance Semantics

In this section, we will discuss the behavior of Spark Streaming applications in the event of failures.

Background

To understand the semantics provided by Spark Streaming, let us remember the basic fault-tolerance semantics of Spark’s RDDs.

  1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD remembers the lineage of deterministic operations that were used on a fault-tolerant input dataset to create it.
  2. If any partition of an RDD is lost due to a worker node failure, then that partition can be re-computed from the original fault-tolerant dataset using the lineage of operations.
  3. Assuming that all of the RDD transformations are deterministic, the data in the final transformed RDD will always be the same irrespective of failures in the Spark cluster.

Spark operates on data in fault-tolerant file systems like HDFS or S3. Hence, all of the RDDs generated from the fault-tolerant data are also fault-tolerant. However, this is not the case for Spark Streaming as the data in most cases is received over the network (except when fileStreamis used). To achieve the same fault-tolerance properties for all of the generated RDDs, the received data is replicated among multiple Spark executors in worker nodes in the cluster (default replication factor is 2). This leads to two kinds of data in the system that need to recovered in the event of failures:

  1. Data received and replicated - This data survives failure of a single worker node as a copy of it exists on one of the other nodes.
  2. Data received but buffered for replication - Since this is not replicated, the only way to recover this data is to get it again from the source.

Furthermore, there are two kinds of failures that we should be concerned about:

  1. Failure of a Worker Node - Any of the worker nodes running executors can fail, and all in-memory data on those nodes will be lost. If any receivers were running on failed nodes, then their buffered data will be lost.
  2. Failure of the Driver Node - If the driver node running the Spark Streaming application fails, then obviously the SparkContext is lost, and all executors with their in-memory data are lost.

With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.

Definitions

The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

  1. At most once: Each record will be either processed once or not processed at all.
  2. At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.
  3. Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.

Basic Semantics

In any stream processing system, broadly speaking, there are three steps in processing the data.

  1. Receiving the data: The data is received from sources using Receivers or otherwise.

  2. Transforming the data: The received data is transformed using DStream and RDD transformations.

  3. Pushing out the data: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc.

If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide an exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let’s understand the semantics of these steps in the context of Spark Streaming.

  1. Receiving the data: Different input sources provide different guarantees. This is discussed in detail in the next subsection.

  2. Transforming the data: All data that has been received will be processed exactly once, thanks to the guarantees that RDDs provide. Even if there are failures, as long as the received input data is accessible, the final transformed RDDs will always have the same contents.

  3. Pushing out the data: Output operations by default ensure at-least once semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). But users can implement their own transaction mechanisms to achieve exactly-once semantics. This is discussed in more details later in the section.

Semantics of Received Data

Different input sources provide different guarantees, ranging from at-least once to exactly once. Read for more details.

With Files

If all of the input data is already present in a fault-tolerant file system like HDFS, Spark Streaming can always recover from any failure and process all of the data. This gives exactly-once semantics, meaning all of the data will be processed exactly once no matter what fails.

With Receiver-based Sources

For input sources based on receivers, the fault-tolerance semantics depend on both the failure scenario and the type of receiver. As we discussed earlier, there are two types of receivers:

  1. Reliable Receiver - These receivers acknowledge reliable sources only after ensuring that the received data has been replicated. If such a receiver fails, the source will not receive acknowledgment for the buffered (unreplicated) data. Therefore, if the receiver is restarted, the source will resend the data, and no data will be lost due to the failure.
  2. Unreliable Receiver - Such receivers do not send acknowledgment and therefore can lose data when they fail due to worker or driver failures.

Depending on what type of receivers are used we achieve the following semantics. If a worker node fails, then there is no data loss with reliable receivers. With unreliable receivers, data received but not replicated can get lost. If the driver node fails, then besides these losses, all of the past data that was received and replicated in memory will be lost. This will affect the results of the stateful transformations.

To avoid this loss of past received data, Spark 1.2 introduced write ahead logs which save the received data to fault-tolerant storage. With thewrite ahead logs enabled and reliable receivers, there is zero data loss. In terms of semantics, it provides an at-least once guarantee.

The following table summarizes the semantics under failures:

Deployment ScenarioWorker FailureDriver Failure
Spark 1.1 or earlier, OR
Spark 1.2 or later without write ahead logs
Buffered data lost with unreliable receivers
Zero data loss with reliable receivers
At-least once semantics
Buffered data lost with unreliable receivers
Past data lost with all receivers
Undefined semantics
Spark 1.2 or later with write ahead logsZero data loss with reliable receivers
At-least once semantics
Zero data loss with reliable receivers and files
At-least once semantics

With Kafka Direct API

In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 1.6.1) is further discussed in the Kafka Integration Guide.

Semantics of output operations

Output operations (like foreachRDD) have at-least once semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to file systems using the saveAs***Files operations (as the file will simply get overwritten with the same data), additional effort may be necessary to achieve exactly-once semantics. There are two approaches.

  • Idempotent updates: Multiple attempts always write the same data. For example, saveAs***Files always writes the same data to the generated files.

  • Transactional updates: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.

    • Use the batch time (available in foreachRDD) and the partition index of the RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
    • Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else, if this was already committed, skip the update.

      dstream.foreachRDD { (rdd, time) =>
        rdd.foreachPartition { partitionIterator =>
          val partitionId = TaskContext.get.partitionId()
          val uniqueId = generateUniqueId(time.milliseconds, partitionId)
          // use this uniqueId to transactionally commit the data in partitionIterator
        }
      }

읽을꺼리 :  Spark Streaming vs Storm Trident 


다른 읽을꺼리 
High-throughput, low-latency, and exactly-once stream processing with Apache Flink


Comments