Requirements
To filter in very quick and efficient way the stream of location data coming from the external source with a rate of 1M events per second. Assumption that 80% of events should be filtered out and only 20% pass to the analytic system for further processing.Filtering process should find the match between the event on input and predefined half-static data structure (predefined location areas) contains 60,000M entries.
In this article, I assume that reader familiar with Storm framework and Bloom filter definition.
Solution
Once requirement talking about a stream of data Storm framework was chosen, to provide efficient filtering Guava Bloom filter was chosen.Using Bloom filter calculator find out that having 0.02% false positive probability Bloom filter bit array takes about 60G memory which can't be loaded into java process heap memory.
Created simple Storm topology contains Kafka Spout and Filter bolts.
There are several possible solutions
1. Use external storage to share Bloom filter bit array between Storm bolts responsible for filtering
Disadvantages
- Bloom filter long response time, low system throughput
- Required additional resources installation and maintenance
2. Caching Bloom filter bit array on each Filter bolt instance using Memory Mapped File technique (out of heap memory)
Disadvantages
- Unmanaged usage of server memory may care to server collapse or another applications starvation running on the same server
- Some kind of security threat
3. Chosen solution! Caching Bloom filter bit array on each Filter bolt instance in some sophisticated method
As I wrote before Storm topology consists of Kafka Spout and Filter bolts and using field grouping, that means having same location events always arrive at the same instance of Filter bolt.
So specific instance of Filter bolt should cache only Bloom filter bit array of relevant location areas.
Storm framework using simple formula for equal distribution between bolt instances as part of field group policy
hash(event location)%(#Filter bolts)
The same formula used in Filter bolts for partial Bloom filter caching
hash(event location)%(#Filter bolts)==boltId
In other words provided Bloom filter partitioning based on Storm field group policy.
Under the assumption that each Filter bolt process may cache in heap memory up to 3G of Bloom filter bit array, required 20 bolts to load all predefined location areas.
The main disadvantage of this solution is long warm up time - time to calculate and load relevant part of Bloom filter bit array to each Filter bolt instance.
To decrease warm up time in case of bolts restart Bloom filter bit array persisted to local disk.
Another disadvantage that required coordination between the number of predefined location areas and the number of Filter bolt instances.
Using this approach created scalable, distributed and durable solution providing fast filtering of the data stream.
Comments
Post a Comment