全链路跟踪框架中原先的采样算法在高并发的情况下会出现问题,需要做一些改造。
原有代码如下:
protected boolean doSampled(final float sampleRate) {
final SpanCollectorMetricHandler metricHandler = getMetricHandler();
final long collectedCount = metricHandler.getCollectedCount();
final long droppedCount = metricHandler.getDroppedCount();
if (collectedCount == 0) {
metricHandler.incrementCollectedSpans(1);
return true;
}
if (droppedCount == 0) {
metricHandler.incrementDroppedSpans(1);
return false;
}
//此处会有并发问题
if (((float) collectedCount) / (float) (collectedCount + droppedCount) < sampleRate) {
metricHandler.incrementCollectedSpans(1);
return true;
} else {
metricHandler.incrementDroppedSpans(1);
return false;
}
}
很显然问题会出现在if判断语句中,下面的incrementCollectedSpans会影响到上面if判断语句。
其次由这个采样算法定义的采样是在采样率基准线上下浮动,但是这样太平均了,几乎没有随机性。
蓄水池算法
蓄水池算法是一种常用的抽象算法,大致过程如下:
-
假设数据序列的规模为 n,需要采样的数量的为 k。
-
首先构建一个可容纳 k 个元素的数组,将序列的前 k 个元素放入数组中。
-
然后从第 k+1个元素开始,以总体 k/n 的概率来决定该元素是否被替换到数组中(数组中的元素被替换的概率是相同的)。 当遍历完所有元素之后,数组中剩下的元素即为所需采取的样本。注意,这一步具体的执行方式是,从第k+1个元素开始,若当前是第i个元素,那么以k/k+i的概率放到数组中,这时数组中已经存在的元素被替换掉的概率为1/k+i,只要这样总体的概率才会变成 (k/k+1) * (k+1/k+2) * (k+2/k+3) * ... = k/n。
具体细节可参看wikipedia
而Java可在O(N)实现如下,参考stackoverflow:
public static BitSet randomBitSet(int size, int cardinality, Random rnd) {
if (0 > cardinality || cardinality > size) throw new IllegalArgumentException();
BitSet result = new BitSet(size);
int[] chosen = new int[cardinality];
int i;
for (i = 0; i < cardinality; ++ i) {
chosen[i] = i;
result.set(i);
}
for (; i < size; ++ i) {
int j = rnd.nextInt(i+1);
if (j < cardinality) {
result.clear(chosen[j]);
result.set(i);
chosen[j] = i;
}
}
return result;
}
Sampler改造
此处分别对普通sampler以及根据url均衡采样的特制sampler进行使用蓄水池算法的相应改造。
普通sampler
public class AdvancedBasedSampler extends Sampler{
private final AtomicInteger counter = new AtomicInteger(0);
private static BitSet sampleDecisions;
public AdvancedBasedSampler (float sampleRate){
int outOf100 = (int) (sampleRate * 100.0f);
sampleDecisions = RandomBitSet.genBitSet(100, outOf100, new Random());
}
@Override
protected boolean doSampled(float sampleRate) {
int i, j;
do {
i = this.counter.get();
j = (i + 1) % 100;
} while (!this.counter.compareAndSet(i, j));
return sampleDecisions.get(i);
}
}
URI均衡sampler
此处为每个urIpatter都生成一个samplerData,存储Bitset以及计数器AtomicInteger,然后单独采样,从而避免有些uri调用的少而采样不到的尴尬。
public class AdvancedUriPatternBasedSampler extends Sampler {
private static ConcurrentHashMap<String, SamplerData> sampleDecisions;
public AdvancedUriPatternBasedSampler(float sampleRate){
sampleDecisions = new ConcurrentHashMap<>();
}
@Override
protected boolean doSampled(float sampleRate) {
String uriPattern = SpanContextHolder.getServletUriPattern();
uriPattern = uriPattern == null ? Constants.NULL : uriPattern;
SamplerData samplerData = sampleDecisions.get(uriPattern);
if (samplerData == null){
int outOf100 = (int) (sampleRate * 100.0f);
BitSet bitSet = RandomBitSet.genBitSet(100, outOf100, new Random());
AtomicInteger atomicInteger = new AtomicInteger(0);
samplerData = new SamplerData(bitSet, atomicInteger);
SamplerData oldData = sampleDecisions.putIfAbsent(uriPattern, samplerData);
if (oldData != null){
samplerData = oldData;
}
}
BitSet bitSet = samplerData.getBitSet();
AtomicInteger counter = samplerData.getAtomicInteger();
int i, j;
do {
i = counter.get();
j = (i + 1) % 100;
} while (!counter.compareAndSet(i, j));
return bitSet.get(i);
}
class SamplerData{
BitSet bitSet;
AtomicInteger atomicInteger;
public SamplerData(BitSet bitSet, AtomicInteger atomicInteger) {
this.bitSet = bitSet;
this.atomicInteger = atomicInteger;
}
public BitSet getBitSet() {
return bitSet;
}
public void setBitSet(BitSet bitSet) {
this.bitSet = bitSet;
}
public AtomicInteger getAtomicInteger() {
return atomicInteger;
}
public void setAtomicInteger(AtomicInteger atomicInteger) {
this.atomicInteger = atomicInteger;
}
}
}