采样算法之蓄水池算法

全链路跟踪框架 Hunter 中原先的采样算法在高并发的情况下会出现问题,原有代码如下:

protected boolean doSampled(final float sampleRate) {
    final SpanCollectorMetricHandler metricHandler = getMetricHandler();
 
    final long collectedCount = metricHandler.getCollectedCount();
    final long droppedCount = metricHandler.getDroppedCount();
 
    if (collectedCount == 0) {
        metricHandler.incrementCollectedSpans(1);
        return true;
    }
 
    if (droppedCount == 0) {
        metricHandler.incrementDroppedSpans(1);
        return false;
    }
 
    //此处会有并发问题
    if (((float) collectedCount) / (float) (collectedCount + droppedCount) < sampleRate) {
        metricHandler.incrementCollectedSpans(1);
        return true;
    } else {
        metricHandler.incrementDroppedSpans(1);
        return false;
    }
}

很显然问题会出现在 if 判断语句中,下面的 incrementCollectedSpans 会影响到上面 if 判断语句。

其次由这个采样算法定义的采样是在采样率基准线上下浮动,但是这样太平均了,几乎没有随机性。

蓄水池算法

蓄水池算法是一种常用的抽象算法,大致过程如下:

  1. 假设数据序列的规模为 n,需要采样的数量的为 k。

  2. 首先构建一个可容纳 k 个元素的数组,将序列的前 k 个元素放入数组中。

  3. 然后从第 k+1 个元素开始,以总体 k/n 的概率来决定该元素是否被替换到数组中(数组中的元素被替换的概率是相同的)。 当遍历完所有元素之后,数组中剩下的元素即为所需采取的样本。注意,这一步具体的执行方式是,从第 k+1 个元素开始,若当前是第 i 个元素,那么以 k/k+i 的概率放到数组中,这时数组中已经存在的元素被替换掉的概率为 1/k+i,只要这样总体的概率才会变成 (k/k+1) * (k+1/k+2) * (k+2/k+3) * ... = k/n。

具体细节可参看wikipedia

而 Java 可在 O(N)实现如下,参考stackoverflow

public static BitSet randomBitSet(int size, int cardinality, Random rnd) {
    if (0 > cardinality || cardinality > size) throw new IllegalArgumentException();
    BitSet result = new BitSet(size);
    int[] chosen = new int[cardinality];
    int i;
    for (i = 0; i < cardinality; ++ i) {
        chosen[i] = i;
        result.set(i);
    }
    for (; i < size; ++ i) {
        int j = rnd.nextInt(i+1);
        if (j < cardinality) {
            result.clear(chosen[j]);
            result.set(i);
            chosen[j] = i;
        }
    }
    return result;
}

Sampler 改造

此处分别对普通 sampler 以及根据 url 均衡采样的特制 sampler 进行使用蓄水池算法的相应改造。

普通 sampler

public class AdvancedBasedSampler extends Sampler{
 
    private final AtomicInteger counter = new AtomicInteger(0);
    private static BitSet            sampleDecisions;
 
    public AdvancedBasedSampler (float sampleRate){
        int outOf100 = (int) (sampleRate * 100.0f);
        sampleDecisions = RandomBitSet.genBitSet(100, outOf100, new Random());
    }
 
    @Override
    protected boolean doSampled(float sampleRate) {
        int i, j;
        do {
            i = this.counter.get();
            j = (i + 1) % 100;
        } while (!this.counter.compareAndSet(i, j));
        return sampleDecisions.get(i);
    }
}

URI 均衡 sampler

此处为每个 urIpatter 都生成一个 samplerData,存储 Bitset 以及计数器 AtomicInteger,然后单独采样,从而避免有些 uri 调用的少而采样不到的尴尬。

public class AdvancedUriPatternBasedSampler extends Sampler {
 
    private static ConcurrentHashMap<String, SamplerData> sampleDecisions;
 
    public AdvancedUriPatternBasedSampler(float sampleRate){
        sampleDecisions = new ConcurrentHashMap<>();
    }
 
    @Override
    protected boolean doSampled(float sampleRate) {
        String uriPattern = SpanContextHolder.getServletUriPattern();
        uriPattern = uriPattern == null ? Constants.NULL : uriPattern;
 
        SamplerData samplerData = sampleDecisions.get(uriPattern);
        if (samplerData == null){
            int outOf100 = (int) (sampleRate * 100.0f);
            BitSet bitSet = RandomBitSet.genBitSet(100, outOf100, new Random());
            AtomicInteger atomicInteger = new AtomicInteger(0);
            samplerData = new SamplerData(bitSet, atomicInteger);
 
            SamplerData oldData = sampleDecisions.putIfAbsent(uriPattern, samplerData);
            if (oldData != null){
                samplerData = oldData;
            }
        }
 
        BitSet bitSet = samplerData.getBitSet();
        AtomicInteger counter = samplerData.getAtomicInteger();
 
        int i, j;
        do {
            i = counter.get();
            j = (i + 1) % 100;
        } while (!counter.compareAndSet(i, j));
        return bitSet.get(i);
    }
 
 
    class SamplerData{
        BitSet bitSet;
        AtomicInteger atomicInteger;
 
        public SamplerData(BitSet bitSet, AtomicInteger atomicInteger) {
            this.bitSet = bitSet;
            this.atomicInteger = atomicInteger;
        }
 
        public BitSet getBitSet() {
            return bitSet;
        }
 
        public void setBitSet(BitSet bitSet) {
            this.bitSet = bitSet;
        }
 
        public AtomicInteger getAtomicInteger() {
            return atomicInteger;
        }
 
        public void setAtomicInteger(AtomicInteger atomicInteger) {
            this.atomicInteger = atomicInteger;
        }
    }
}
留下你的脚步
推荐阅读