Bloom filters are probabilistic data structure for determining whether an element is in a set. Such a data structure offers only two methods - add and mightContain. Google's guava library offers a nice implementation, but unfortunately this implementation (like every implementation I've found) is not concurrent. Concurrent reads are no problem, but writes are trickier - and reading while writing is also not straightforward.

The best I've been able to come up with is the following version, which combines a BloomFilter with a ConcurrentHashMap. Unfortunately, the space behavior of this data structure is also not as good as a BloomFilter - the space complexity varies inversely with the concurrency. Code for this is available on github.

The basic idea is the following. We store an AtomicReference to the BloomFilter, and we also store a ConcurrentHashMap:

class SemiConcurrentBloomFilter[A](filterSize: Int = 1024*1024, falsePositiveProbability: Double = 1e-12, insertionsBeforeCompact: Int = 1024)(implicit funnel: Funnel[A]) {

  private val newValues = new java.util.concurrent.ConcurrentHashMap[A,A]()

  private val mainFilter: AtomicReference[BloomFilter[A]] = new AtomicReference(
    BloomFilter.create[A](funnel, filterSize, falsePositiveProbability)
  )

  private val insertionsSinceCompact = new AtomicInteger(0)

When we add an element to the filter, we simply insert it into newValues. We also increment the insertionsSinceCompact value. Once numInserted reaches insertionsBeforeCompact (or some multiple)`, we attempt to compact the data structure.

  def add(a: A) = {
    val bf = mainFilter.get()
    if (!bf.mightContain(a) ) {
      newValues.put(a,a)
      val numInserted = insertionsSinceCompact.incrementAndGet()
      if (numInserted % insertionsBeforeCompact == 0) {
        maybeCompact
      }
    }
  }

I'll come back to the compaction process in a moment. The mightContain operation is then defined as first checking if a value is in the ConcurrentHashMap, and then checking if it's in the BloomFilter:

  def mightContain(a: A) = {
    val fromNew = newValues.get(a) //If a has been removed from newValues
    if (fromNew != null) {
      true
    } else { //Then we have already added
      mainFilter.get().mightContain(a)
    }
  }

The compaction process consists of moving values from the ConcurrentHashMap into the BloomFilter. However, we can't simply do this willy nilly - there is a very real possibility that multiple compactions will happen simultaneously. What we must do is copy the Bloom filter, add the elements in newValues to it, and remember which values we added:

  def maybeCompact: Unit = {
    attemptedCompactions.incrementAndGet()
    val size = newValues.size()
    val toAdd: Seq[A] = newValues.keys().take(size).toSeq //We restrict the size to ensure that we don't sit in an infinite loop here
    val oldBf = mainFilter.get()
    val newBf = oldBf.copy()
    toAdd.foreach(newBf.put _)
    val addedNew: Boolean = mainFilter.compareAndSet(oldBf, newBf)

So far this is safely concurrent - we have merely read the ConcurrentHashMap, and then written these values to the BloomFilter. So at this time, the elements are contained in both the ConcurrentHashMap and also the BloomFilter. So at this time, the mightContain operation will surely return the correct result. We make sure to limit the number of elements we move to size elements because we don't want this operation to run forever if a separate thread keeps calling add.

Then, in the event we successfully replaced the old BloomFilter with the new one, we can now safely remove those elements from the ConcurrentHashMap:

    if (addedNew) { //If this succeeds, then both mainFilter and newValues contain all elements of toAdd
      toAdd.foreach(newValues.remove _) //Now it's safe to remove from newValues
      insertionsSinceCompact.addAndGet(-1*size) //Finally reduce insertions
      successfulCompactions.incrementAndGet() //Statistics
    }
  }

During and after this process, the mightContain operation will also return the correct result. These values might be absent from newValues, but they will be present in the BloomFilter.

This data structure also has no deadlocks. Suppose multiple threads begin a compaction simultaneously. At least one of these threads will successfully call mainFilter.compareAndSet(oldBf, newBf) - the rest will fail. This means that elements will be moved from the ConcurrentHashMap into the BloomFilter - i.e. the process never gets stuck.

All told, this means we have a valid concurrent data structure.

In principle, I believe that the space complexity of this an amortized constant provided the rate of insertion is not too large. Suppose it takes a time c*size for a compaction event to occur, and suppose new elements are added at a rate a per unit time. When time == insertionsSinceCompact / a, a compaction will be triggered. This will take time c*insertionsSinceCompact. So as of time insertionsSinceCompact/a + c*insertionsSinceCompact = (1/a+c)insertionsSinceCompact, a compaction will be complete. This means that the largest the ConcurrentHashMap will ever get is a*(1/a+c)insertionsSinceCompact = (1+ac)insertionsSinceCompact.

So although the constant factor is larger, this data structure has similar complexity to a BloomFilter.

So now, dear reader, I ask you - is this correct? Does anyone know of a better version of this? What I've come up with here seems right to me, but I'd love to know what the internet thinks of this. And since I know a lot of smart folks read my blog, I'm reaching out to you.


Subscribe to the mailing list


Comments

comments powered by Disqus