Skip to main content

Distributed bloom filter


Requirements

To filter in very quick and efficient way the stream of location data coming from the external source with a rate of 1M events per second. Assumption that 80% of events should be filtered out and only 20% pass to the analytic system for further processing.
Filtering process should find the match between the event on input and predefined half-static data structure (predefined location areas) contains 60,000M entries.


In this article, I assume that reader familiar with Storm framework and Bloom filter definition.


Solution

Once requirement talking about a stream of data Storm framework was chosen, to provide efficient filtering Guava Bloom filter was chosen.

Using Bloom filter calculator find out that having 0.02% false positive probability Bloom filter bit array takes about 60G memory which can't be loaded into java process heap memory.

Created simple Storm topology contains Kafka Spout and Filter bolts.


There are several possible solutions


1. Use external storage to share Bloom filter bit array between Storm bolts responsible for filtering

Disadvantages


  • Bloom filter long response time, low system throughput
  • Required additional resources installation and maintenance


2. Caching Bloom filter bit array on each Filter bolt instance using Memory Mapped File technique (out of heap memory)

Disadvantages

  • Unmanaged usage of server memory may care to server collapse or another applications starvation running on the same server
  • Some kind of security threat


3. Chosen solution! Caching Bloom filter bit array on each Filter bolt instance in some sophisticated method





As I wrote before Storm topology consists of Kafka Spout and Filter bolts and using field grouping, that means having same location events always arrive at the same instance of Filter bolt.

So specific instance of Filter bolt should cache only Bloom filter bit array of relevant location areas.

Storm framework using simple formula for equal distribution between bolt instances as part of field group policy

hash(event location)%(#Filter bolts)

The same formula used in Filter bolts for partial Bloom filter caching

hash(event location)%(#Filter bolts)==boltId

In other words provided Bloom filter partitioning based on Storm field group policy.

Under the assumption that each Filter bolt process may cache in heap memory up to 3G of Bloom filter bit array, required 20 bolts to load all predefined location areas.

The main disadvantage of this solution is long warm up time - time to calculate and load relevant part of Bloom filter bit array to each Filter bolt instance.
To decrease warm up time in case of bolts restart Bloom filter bit array persisted to local disk.

Another disadvantage that required coordination between the number of predefined location areas and the number of Filter bolt instances.

Using this approach created scalable, distributed and durable solution providing fast filtering of the data stream.




Comments

Popular posts from this blog

Geo-spatial search in HBase

Nowadays geospatial query of the data became a part of almost every application working with location coordinates. There are a number of NoSQL databases supports geospatial search out of the box such as MongoDB, Couchbase, Solr etc. I'm going to write about bounding box query over HBase . I'm using this kind of query to select points located on visible geographic area of the WEB client. During my investigation, I realized that more effective and more complex solution is using Geohash or QuadKeys approach. These approaches required to redesign you data model then I found more simple (and less effective) solution - using built-in HBase encoder OrderedBytes  (hbase-common-0.98.4-hadoop2.jar) Current example of code working with HBase version 0.98.4. Actually bounding box query required comparison of latitude and longitude coordinates of the point saved in HBase. As you know HBase keep data in a binary format according to lexicographical order. Because of coordinates ...

Oozie workflow basic example

I asked to build periodic job to perform data aggregation MR and upload to HBase based on: 1. period of time 2. start running when input folder exists 3. start running when input folder contains _SUCCESS file So I define 4 steps in my workflow Step 1 : based on job requirements check if input folder exists and contains _SUCCESS file <decision name="upload-decision">    <switch>       <case to="create-csv">          ${fs:exists(startFlag)}       </case>       <default to="end"/>     </switch> </decision> Step 2 : Running MR job as java action of Oozie In prepare section appears deletion of _SUCCESS file and MR job output folder <action name="create-csv"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete p...