Notes

DDSketch is a relative error quantile sketch designed for heavy-tailed data streams.

It has 3 guarantees:

  1. The estimate, for any quantile satisfies , so the difference is configurable to .
  2. Merging sketches is as accurate as sketching the union.
  3. Insert and merge are fast.

Setting up buckets

So, for a value of , which is chosen relative error, define as:

The positive real line is partitioned into buckets: Each bucket stores a count of how many observations fell into the range. Given a value , its bucket index is

To answer a query

Approximate the , where there are observations by:

  • Computing a rank target, .
  • Scan buckets by increasing , accumulating counts until . Take that , and return the bucket’s representative. .

Limiting Memory

Since naively the number of non-empty buckets grows linearly to , to keep memory bounded by , DDSketch limits the maximum number of buckets to .

  • Then, when there are too many non-empty buckets, it collapses the two lowest index buckets.
  • Say the two lowest index buckets are and . Then, do a merge, where ‘s items are moved to .
  • Accuracy is sacrificed for lower values, not the tails, unless the memory requirement, , is extremely low.

Theoretical Bounds

As long as the distribution is heavy tailed, like a lognormal distribution, then the memory usage only grows logarithmically. If , a sketch with a thousand buckets can maintain high quality high-quantile estimates with 1% relative error for millions of samples.

Example in Python

An example like this will accurately guess with only a thousand buckets.

Approx p50 = 0.9900000000000001
Approx p90 = 3.632761126626586
Approx p99 = 10.074696689511331
import math
from collections import defaultdict
 
 
class DDSketch:
    """
    Minimal DDSketch implementation for non-negative values.
 
    - Relative error: alpha (0 < alpha < 1)
    - Bucket boundaries: (gamma^(i-1), gamma^i], i in Z
    - max_buckets: if not None, collapse lowest buckets to enforce a size limit
    """
 
    def __init__(self, alpha=0.01, max_buckets=1000):
        if not (0 < alpha < 1):
            raise ValueError("alpha must be in (0, 1)")
        self.alpha = alpha
        self.gamma = (1 + alpha) / (1 - alpha)
        self.log_gamma = math.log(self.gamma)
 
        self.max_buckets = max_buckets
 
        # index -> count
        self.buckets = defaultdict(int)
        self.zero_count = 0  # for x == 0
        self.count = 0
 
        self.min_value = None
        self.max_value = None
 
    def _bucket_index(self, x: float) -> int:
        """Map x > 0 to bucket index i = ceil(log_gamma(x))."""
        return math.ceil(math.log(x) / self.log_gamma)
 
    def add(self, x: float):
        """Insert a single observation x >= 0."""
        if x < 0:
            raise ValueError("This simple implementation supports only non-negative values")
 
        if self.count == 0:
            self.min_value = x
            self.max_value = x
        else:
            self.min_value = min(self.min_value, x)
            self.max_value = max(self.max_value, x)
 
        self.count += 1
 
        if x == 0.0:
            self.zero_count += 1
            return
 
        i = self._bucket_index(x)
        self.buckets[i] += 1
 
        if self.max_buckets is not None and len(self.buckets) > self.max_buckets:
            self._collapse_lowest_bucket()
 
    def _collapse_lowest_bucket(self):
        """
        Collapse the lowest-index bucket into the next one, as in the paper.
        This preserves relative accuracy for higher quantiles.
        """
        if not self.buckets:
            return
 
        # Smallest index with data
        i0 = min(self.buckets.keys())
 
        # Next index with data (if any)
        higher_indices = [i for i in self.buckets.keys() if i > i0]
        if not higher_indices:
            # Only one bucket; nothing meaningful to collapse into.
            return
 
        i1 = min(higher_indices)
 
        self.buckets[i1] += self.buckets[i0]
        del self.buckets[i0]
 
    def merge(self, other: "DDSketch"):
        """
        Merge another DDSketch into this one. Both must share the same alpha/max_buckets.
        """
        if not isinstance(other, DDSketch):
            raise TypeError("Can only merge another DDSketch")
 
        if other.alpha != self.alpha or other.gamma != self.gamma:
            raise ValueError("Cannot merge sketches with different alpha/gamma parameters")
 
        self.count += other.count
        self.zero_count += other.zero_count
 
        if self.min_value is None:
            self.min_value = other.min_value
            self.max_value = other.max_value
        elif other.min_value is not None:
            self.min_value = min(self.min_value, other.min_value)
            self.max_value = max(self.max_value, other.max_value)
 
        for i, c in other.buckets.items():
            self.buckets[i] += c
 
        # Enforce bucket limit
        if self.max_buckets is not None:
            while len(self.buckets) > self.max_buckets:
                self._collapse_lowest_bucket()
 
    def _bucket_value(self, i: int) -> float:
        """
        Representative value for bucket i.
        The paper uses 2 * gamma^i / (gamma + 1).
        """
        return 2.0 * (self.gamma ** i) / (self.gamma + 1.0)
 
    def quantile(self, q: float) -> float:
        """
        Return an approximate q-quantile for 0 <= q <= 1.
 
        For q=0 and q=1, this effectively returns something near min/max.
        """
        if not (0.0 <= q <= 1.0):
            raise ValueError("q must be in [0, 1]")
        if self.count == 0:
            raise ValueError("Cannot query an empty sketch")
 
        # Rank target in [0, n-1]
        target_rank = q * (self.count - 1)
 
        # Handle zeros first
        if target_rank < self.zero_count:
            return 0.0
 
        remaining_rank = target_rank - self.zero_count
 
        # Iterate buckets in ascending order of index
        cumulative = 0
        for i in sorted(self.buckets.keys()):
            c = self.buckets[i]
            if c == 0:
                continue
            if cumulative + c > remaining_rank:
                return self._bucket_value(i)
            cumulative += c
 
        # If we get here, numerical/rounding edge case; return max bucket value
        max_i = max(self.buckets.keys())
        return self._bucket_value(max_i)
 
 
if __name__ == "__main__":
    import random
 
    sketch = DDSketch(alpha=0.01, max_buckets=1000)
 
    # Simulate some positive, skewed data (e.g., log-normal latencies)
    for _ in range(10_000):
        x = math.exp(random.gauss(0, 1.0))  # lognormal-ish
        sketch.add(x)
 
    for q in [0.5, 0.9, 0.99]:
        print(f"Approx p{int(q*100)} =", sketch.quantile(q))