指标计算

这里简单对各个步骤中涉及到多线程并发的情况以及滑动窗口的计算做一个简单介绍:
2.1:并发(threadLocal&SerializedSubject)
同一个接口收到多个请求时候,也就是这些请求命中的都是同一个commandKey时(统计指标是按照KEY为维度),每个请求都是一个独立的线程,每个线程内会产生多个各种各样的事件,首先同一个线程内的event拼接封装成HystrixCommandCompletion,上报的是一个HystrixCommandCompletion,流计算操作的也是一个个的HystrixCommandCompletion,不存在计算时候把各线程的事件混杂在一起的可能,如何保证的在下面会讲到
2.1.1:上报者是通过threadLocal线程隔离
首先hystrix启动后会创建一个threadLocal,当一个客户端请求不管是正常结束还是异常结束,都要上报上报状态,也就是执行handleCommandEnd,都会从threadLocal中返回一个当前线程的HystrixThreadEventStream,代码如下:
private void handleCommandEnd(boolean commandExecutionStarted) {
//省略部分代码
if (executionResultAtTimeOfCancellation == null) {
//上报metrics
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
} else {
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
}
}
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
//threadLocal中放置的是HystrixThreadEventStream,因为改写了init方法,所以无需set,直接可以获取
HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
if (executionStarted) {
concurrentExecutionCount.decrementAndGet();
}
}
//从threadLocal中获取事件流
public static HystrixThreadEventStream getInstance() {
return threadLocalStreams.get();
}
//threadLocal的定义,改写了init方法,所以不用单独调用set
private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
@Override
protected HystrixThreadEventStream initialValue() {
return new HystrixThreadEventStream(Thread.currentThread());
}
}
2.1.2:限流队列
每个线程会有唯一的HystrixThreadEventStream,因为是从theadLocal获取,每个HystrixThreadEventStream都会关联一个由Subject实现的队列,也就是每一个线程都有一个私有的队列,这里说它提供限流是因为采用了‘背压’的原理,所谓的‘背压’是指按需提供,根据消费者的能力去往队列生产,代码如下:
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
//把executionResult封装成HystrixCommandCompletion,HystrixCommandCompletion是流计算操作的基本单位
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
//writeOnlyCommandCompletionSubject就是一个通过RXjava实现的限流队列
writeOnlyCommandCompletionSubject.onNext(event);
}
//省略代码
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()//开启'背压功能'
.doOnNext(writeCommandCompletionsToShardedStreams)//核心是这个action的call方法
.unsafeSubscribe(Subscribers.empty());
2.2:数据流串行化
每个放入队列的HystrixCommandCompletion,都会执利doOnNext的Action,通过他的call方法去调用HystrixCommandCompletionStream的write方法,相同的commandKey具有同一个HystrixCommandCompletionStream实例,具体是通过currentHashMap做的实例隔离,HystrixCommandCompletionStream内部是通过一个SerializedSubject实现多个HystrixCommandCompletion并行写入的串行化,具体代码逻辑如下:
//限流队列收到数据后会执行call方法,是通过观察者注册了doOnnext事件
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
@Override
public void call(HystrixCommandCompletion commandCompletion) {
//同一个commandkey对应同一个串行队列的实例,因为同一个commandKey必须要收集该key下所有线程的metrix事件做统计,才能准确
HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
commandStream.write(commandCompletion);//写入串行队列,这里是核心
if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
threadPoolStream.write(commandCompletion);
}
}
};
//具体的write方法如下,需要重点关注writeOnlySubject的定义
public void write(HystrixCommandCompletion event) {
writeOnlySubject.onNext(event);
}
//下面是writeOnlySubject的定义,是通过SerializedSubject将并行的写入变为串行化
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
this.readOnlyStream = writeOnlySubject.share();
}
2.3:消费订阅
在hystrixCommand创建的时候,会对HystrixCommandCompletionStream进行订阅,目前有:
healthCountsStream
rollingCommandEventCounterStream
cumulativeCommandEventCounterStream
rollingCommandLatencyDistributionStream
rollingCommandUserLatencyDistributionStream
rollingCommandMaxConcurrencyStream
这几个消费者通过滚动窗口的形式,对数据做统计和指标计算,下面选取具有代表意义的healthCountsStream做讲解:
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
//统计计算指标的时间间隔-metricsHealthSnapshotIntervalInMilliseconds
final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
if (healthCountBucketSizeInMs == 0) {
throw new RuntimeException("You have set the bucket size to 0ms. Please set a positive number, so that the metric stream can be properly consumed");
}
//熔断窗口滑动周期,默认10秒,保留10秒内的统计数据,指定窗口期内,有效进行指标计算的次数=metricsRollingStatisticalWindowInMilliseconds/metricsHealthSnapshotIntervalInMilliseconds
final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs;
return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
}
//继承关系HealthCountStream-》BucketedRollingCounterStream-》BucketedCounterStream
//把各事件聚合成桶...省略代码,在BucketedCounterStream完成
this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
@Override
public Observable<Bucket> call() {
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
}
}
//聚合成桶的逻辑代码
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {
@Override
public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
for (HystrixEventType eventType: ALL_EVENT_TYPES) {
switch (eventType) {
case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
default:
initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);//对各类型的event做,分类汇总
break;
}
}
return initialCountArray;
}
};
//生成计算指标,在BucketedRollingCounterStream完成,省略部分代码
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
//计算指标聚合实现,reduceWindowToSummary
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
@Override
public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
return healthCounts.plus(bucketEventCounts);//重点看该方法
}
};
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount;
long updatedErrorCount = errorCount;
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
//多个线程的事件,被汇总计算以后,所有的事件相加得到总和
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
//失败的事件总和,注意只有FAIL+timeoutCount+THREAD_POOL_REJECTED+SEMAPHORE_REJECTED
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}