Bloom filters are probabilistic data structure for determining whether an element is in a set. Such a data structure offers only two methods - `add`

and `mightContain`

. Google's guava library offers a nice implementation, but unfortunately this implementation (like every implementation I've found) is not concurrent. Concurrent reads are no problem, but writes are trickier - and reading while writing is also not straightforward.

The best I've been able to come up with is the following version, which combines a `BloomFilter`

with a `ConcurrentHashMap`

. Unfortunately, the space behavior of this data structure is also not as good as a `BloomFilter`

- the space complexity varies inversely with the concurrency. Code for this is available on github.

The basic idea is the following. We store an `AtomicReference`

to the `BloomFilter`

, and we also store a `ConcurrentHashMap`

:

```
class SemiConcurrentBloomFilter[A](filterSize: Int = 1024*1024, falsePositiveProbability: Double = 1e-12, insertionsBeforeCompact: Int = 1024)(implicit funnel: Funnel[A]) {
private val newValues = new java.util.concurrent.ConcurrentHashMap[A,A]()
private val mainFilter: AtomicReference[BloomFilter[A]] = new AtomicReference(
BloomFilter.create[A](funnel, filterSize, falsePositiveProbability)
)
private val insertionsSinceCompact = new AtomicInteger(0)
```

When we `add`

an element to the filter, we simply insert it into `newValues`

. We also increment the `insertionsSinceCompact`

value.
Once `numInserted`

reaches `insertionsBeforeCompact`

(or some multiple)`, we *attempt* to compact the data structure.

```
def add(a: A) = {
val bf = mainFilter.get()
if (!bf.mightContain(a) ) {
newValues.put(a,a)
val numInserted = insertionsSinceCompact.incrementAndGet()
if (numInserted % insertionsBeforeCompact == 0) {
maybeCompact
}
}
}
```

I'll come back to the compaction process in a moment. The `mightContain`

operation is then defined as first checking if a value is in the `ConcurrentHashMap`

, and then checking if it's in the `BloomFilter`

:

```
def mightContain(a: A) = {
val fromNew = newValues.get(a) //If a has been removed from newValues
if (fromNew != null) {
true
} else { //Then we have already added
mainFilter.get().mightContain(a)
}
}
```

The compaction process consists of moving values from the `ConcurrentHashMap`

into the `BloomFilter`

. However, we can't simply do this willy nilly - there is a very real possibility that multiple compactions will happen simultaneously. What we must do is copy the Bloom filter, add the elements in `newValues`

to it, and remember which values we added:

```
def maybeCompact: Unit = {
attemptedCompactions.incrementAndGet()
val size = newValues.size()
val toAdd: Seq[A] = newValues.keys().take(size).toSeq //We restrict the size to ensure that we don't sit in an infinite loop here
val oldBf = mainFilter.get()
val newBf = oldBf.copy()
toAdd.foreach(newBf.put _)
val addedNew: Boolean = mainFilter.compareAndSet(oldBf, newBf)
```

So far this is safely concurrent - we have merely *read* the `ConcurrentHashMap`

, and then *written* these values to the `BloomFilter`

. So at this time, the elements are contained in both the `ConcurrentHashMap`

and also the `BloomFilter`

. So at this time, the `mightContain`

operation will surely return the correct result. We make sure to limit the number of elements we move to `size`

elements because we don't want this operation to run forever if a separate thread keeps calling `add`

.

Then, in the event we successfully replaced the old `BloomFilter`

with the new one, we can now safely remove those elements from the `ConcurrentHashMap`

:

```
if (addedNew) { //If this succeeds, then both mainFilter and newValues contain all elements of toAdd
toAdd.foreach(newValues.remove _) //Now it's safe to remove from newValues
insertionsSinceCompact.addAndGet(-1*size) //Finally reduce insertions
successfulCompactions.incrementAndGet() //Statistics
}
}
```

During and after this process, the `mightContain`

operation will also return the correct result. These values might be absent from `newValues`

, but they will be present in the `BloomFilter`

.

This data structure also has no deadlocks. Suppose multiple threads begin a compaction simultaneously. At least one of these threads will successfully call `mainFilter.compareAndSet(oldBf, newBf)`

- the rest will fail. This means that elements will be moved from the `ConcurrentHashMap`

into the `BloomFilter`

- i.e. the process never gets stuck.

All told, this means we have a valid concurrent data structure.

In principle, I believe that the space complexity of this an amortized constant provided the rate of insertion is not too large. Suppose it takes a time `c*size`

for a compaction event to occur, and suppose new elements are added at a rate `a`

per unit time. When `time == insertionsSinceCompact / a`

, a compaction will be triggered. This will take time `c*insertionsSinceCompact`

. So as of time `insertionsSinceCompact/a + c*insertionsSinceCompact = (1/a+c)insertionsSinceCompact`

, a compaction will be complete. This means that the largest the `ConcurrentHashMap`

will ever get is `a*(1/a+c)insertionsSinceCompact = (1+ac)insertionsSinceCompact`

.

So although the constant factor is larger, this data structure has similar complexity to a BloomFilter.

So now, dear reader, I ask you - is this correct? Does anyone know of a better version of this? What I've come up with here seems right to me, but I'd love to know what the internet thinks of this. And since I know a lot of smart folks read my blog, I'm reaching out to you.