HyperLogLog 原理与实现详解

HyperLogLog(HLL)是一种用于**大规模去重计数(distinct count)**的概率算法。它可以在极低内存占用下,对海量数据的基数进行高精度估算,广泛应用于数据库、日志系统与分布式计算框架。

该算法由 Philippe Flajolet 等人在 2007 年提出,是 LogLog 系列算法的改进版本。


一、问题背景:为什么需要 HLL?

在大数据场景中,我们常需要回答:

如果直接使用:

当 N 达到亿级甚至百亿级时,内存不可接受。

HLL 的优势:


二、核心思想

HyperLogLog 基于一个概率事实:

一个均匀随机二进制数,出现 k 个前导零的概率 ≈ 2⁻ᵏ

如果你观察到某个数有 10 个前导零,说明你大约看到了 2¹⁰ ≈ 1024 个不同元素。

因此:

最大前导零数量 ≈ log₂(基数)

HLL 的核心就是利用这一现象。


三、算法结构

1. 数据结构

设:

每个桶只存一个整数(记录最大前导零数)。

flowchart LR
    A[输入元素] --> B[Hash]
    B --> C[划分桶 index]
    C --> D[计算前导零 rho]
    D --> E[更新桶最大值]

结构示意:

M[0], M[1], M[2], ..., M[m-1]

四、算法流程

Step 1:哈希

对元素 x 计算 64 位哈希:

h = Hash(x)

必须保证:


Step 2:分桶

将哈希值拆分为:

示意:

flowchart TD
    A[64-bit hash]
    A --> B[前 p 位]
    A --> C[剩余位]
    B --> D[桶编号]
    C --> E[前导零计算]

Step 3:计算前导零

定义:

ρ(w)=+1

更新桶:

M[index]=max(M[index],ρ(w))

五、基数估计公式

最终估算值:

E=αmm22M[i]

其中:

αm0.72131+1.079/m

这是对 LogLog 算法的重要改进 —— 使用调和平均降低方差。


六、小规模示例(直观理解)

假设:

处理若干元素后:

M[0] 2
M[1] 1
M[2] 4
M[3] 0

计算:

2M[i]

然后代入估算公式即可得到基数近似值。

误差较大是因为 m 太小。


七、误差分析

标准误差:

1.04m

例如:

m 误差
1024 3.2%
16384 0.8%

重要特性:

误差只与桶数量有关,与数据规模无关。


八、三段式修正机制

为了提高精度,HLL 使用分段修正:

flowchart TD
    A[原始估计 E]
    A -->|E 小| B[Linear Counting]
    A -->|中间区间| C[标准公式]
    A -->|E 很大| D[大数修正]
  1. 小范围 → 使用 Linear Counting
  2. 中范围 → 使用标准估计
  3. 超大范围 → 使用对数修正避免溢出

九、可合并性(分布式关键特性)

HLL 支持天然合并:

M[i]=max(M1[i],M2[i])

示意:

flowchart LR
    A[HLL 1] --> C[逐桶取 max]
    B[HLL 2] --> C
    C --> D[合并结果]

这使得 HLL 非常适合:


十、工程实现

1. Redis 实现

Redis 提供:

内部采用:


2. HyperLogLog++

Google 提出了改进版本 HyperLogLog++:

应用于:


十一、时间与空间复杂度

操作 复杂度
add O(1)
count O(m)
merge O(m)

空间:

m×6 bits

典型配置:


十二、直观总结

HyperLogLog 的本质:

  1. 利用随机哈希
  2. 统计最大前导零
  3. 多桶分区降低方差
  4. 调和平均修正估计

一句话概括:

用“最长前导零”推断数据规模,再用统计方法降低误差。


十三、适用场景

适合:

不适合:


结论

HyperLogLog 在精度、内存、可扩展性之间取得了优秀平衡,是现代大数据系统中的基础概率数据结构之一。

如需进一步深入,可探讨:

python示例

import hashlib
import math


class HyperLogLog:
    def __init__(self, p=14):
        self.p = p
        self.m = 1 << p
        self.registers = [0] * self.m

        if self.m == 16:
            self.alpha = 0.673
        elif self.m == 32:
            self.alpha = 0.697
        elif self.m == 64:
            self.alpha = 0.709
        else:
            self.alpha = 0.7213 / (1 + 1.079 / self.m)

    def _hash(self, value):
        h = hashlib.sha1(str(value).encode()).digest()
        return int.from_bytes(h[:8], 'big')

    def _leading_zeros(self, x, bits):
        if x == 0:
            return bits
        return bits - x.bit_length()

    def add(self, value):
        x = self._hash(value)

        # 前 p 位做桶索引
        index = x >> (64 - self.p)

        # 剩余低位用于前导零统计(正确方式)
        w = x & ((1 << (64 - self.p)) - 1)

        zeros = self._leading_zeros(w, 64 - self.p) + 1

        self.registers[index] = max(self.registers[index], zeros)

    def count(self):
        indicator = sum(2.0 ** -r for r in self.registers)
        E = self.alpha * self.m * self.m / indicator

        # 小范围修正
        V = self.registers.count(0)
        if E <= 2.5 * self.m and V > 0:
            E = self.m * math.log(self.m / V)

        return int(E)

    def merge(self, other):
        if self.m != other.m:
            raise ValueError("precision mismatch")

        for i in range(self.m):
            self.registers[i] = max(self.registers[i], other.registers[i])
            
hll = HyperLogLog(p=16)
# 添加 100 万个元素
for i in range(1_000_000):
    hll.add(i)

print("估算值:", hll.count())
估算值: 996630