小编给大家分享一下hbase callQueue的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
1. Hbase RPC Server
先简单描述一下Server端(Master/RegionServer)Rpc请求的主要过程:
Server启动的时候就会按配置起对应数量的Rpc请求处理线程handler监听CallQueues,当有任务到来的时候就会添加到CallQueue,Handler去消费处理请求
//Server端接到Client端请求后,会调用RpcScheduler去分发这个请求,默认RpcScheduler就是SimpleRpcScheduler
org.apache.hadoop.hbase.ipc.ServerRpcConnection#processRequest
this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call)) #703行
//SimpleRpcScheduler会将这个请求添加到CallQueue
org.apache.hadoop.hbase.ipc.SimpleRpcScheduler#dispatch
//每个handler会从自己的CallQueue中取任务处理,
org.apache.hadoop.hbase.ipc.RpcExecutor.Handler#run()
那么这些监听线程是何时启动的呢?
//在master或regionserver启动的时候,创建rpcServices
org.apache.hadoop.hbase.master.HMaster#createRpcServices 719
org.apache.hadoop.hbase.regionserver.HRegionServer#createRpcServices 781
org.apache.hadoop.hbase.master.MasterRpcServices#MasterRpcServices
org.apache.hadoop.hbase.regionserver.HRegionServer#createRpcServices
org.apache.hadoop.hbase.regionserver.RSRpcServices#RSRpcServices(org.apache.hadoop.hbase.regionserver.HRegionServer)
//创建处理RPC请求的Server端
org.apache.hadoop.hbase.regionserver.RSRpcServices#createRpcServer 1294L
//创建默认的RpcServer(SimpleRpcScheduler)
org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory#create(org.apache.hadoop.conf.Configuration, org.apache.hadoop.hbase.ipc.PriorityFunction, org.apache.hadoop.hbase.Abortable)
//为不同的RpcExecutor启动handlers
org.apache.hadoop.hbase.regionserver.RSRpcServices#start
org.apache.hadoop.hbase.ipc.SimpleRpcServer#start
org.apache.hadoop.hbase.ipc.SimpleRpcScheduler#start
2. RpcScheduler分类
hbase中RpScheduler主要就两种:SimpleRpcScheduler和FifoRpcScheduler
其使用类型由hbase.region.server.rpc.scheduler.factory.class决定,默认为SimpleRpcSchedulerFactory.class
我们安装phoenix后还可以使用org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory

3. SimpleRpcScheduler
SimpleRpcScheduler中有四种CallQueue,不同的条件每种CallQueue对应的rpcExecutor可能也是不一样的

其实我们比较关心的,每种callExecuotr的以下四个值:callQ数量numCallQueues,handler的数量,rpcExecutor的实现类,maxQueueLength

我们在日志中或者hbase ui中可以看到如上图关于handler及queue的描述
1. Call Queue 普通队列
handlerCount=hbase.regionserver.handler.count 默认30
numCallQueues=hbase.regionserver.handler.count*0.1
callqReadShare=hbase.ipc.server.callqueue.read.ratio 默认0
callQueueType=hbase.ipc.server.callqueue.type 默认fifo
maxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCount
<span >如果 callqReadShare >0</span>
表示读写分离,假设为0.7,则表示读队列占70%,写队列占30%
假设handlerCount=100,则numCallQueues=handlerCount*0.1=10,读队列有7个,写队列有3个
此时的rpcExecutor的实现类为RWQueueRpcExecutor,handlers及callQueue的对应关系如下图
name=default.RWQ
<span >如果 callqReadShare<=0,callQueueType=fifo</span>
<span >如果 callqReadShare<=0,callQueueType=其它</span>
2. Priority Queue 优先级队列
priorityHandlerCount=hbase.regionserver.metahandler.count 默认20
metaCallqReadShare=hbase.ipc.server.metacallqueue.read.ratio 默认0.9f
maxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCount
maxPriorityQueueLength=hbase.ipc.server.priority.max.callqueue.length 默认=maxQueueLength
<span >如果 metaCallqReadShare > 0</span>
different read/write handler for meta, at least 1 read handler and 1 write handler
此时的rpcExecutor的实现类为MetaRWQueueRpcExecutor
name=priority.RWQ
<span >如果 metaCallqReadShare <=0,priorityHandlerCount > 0</span>
3. Replication Queue 复本队列
replicationHandlerCount=hbase.regionserver.replication.handler.count 默认为3
maxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCount
<span >replicationHandlerCount > 0 才有这个队列</span>
4. Meta Transition Queue meta迁移队列
metaTransitionHandler=hbase.master.meta.transition.handler.count 默认1
maxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCount
maxPriorityQueueLength=hbase.ipc.server.priority.max.callqueue.length 默认=maxQueueLength
<span >metaTransitionHandler>0 才有这个队列</span>
下面这张图详细画出了4种rpcExecutor的选择和条件。

4. hanlder与queues的关系
Master或RegionServer启动的时候会启对应数量的handlers,这些handlers是干啥的呢,简单说就是处理各客户端对该regionserver的各种rpc请求的,一个handler就是一个线程,handler是怎么处理请求的呢?他不是直接处理请求,它是到指定队列里去取,也就是所有请求来了要先进某个队列,然后有handler去消费它。队列的存在可以认为是请求非常多,handler不能及时消费的时候的一个缓冲。
那么下面几个问题我们比较关心:
在程序中handler和queue的对应关系是什么样的呢?
有多少handler?
又有多少队列呢?
每个队列的长度又是多少呢?
**下面通过一幅图回答这几个问题:**hbase的callQueue有4种,这里以普通callQueue为例,原理是一样的
先介绍两个参数,再回答上面的4个问题:
hbase.regionserver.handler.count 默认30
在RegionServers上启动RPC Listener实例的数量,也就是处理rpc请求的线程数。Master handlers数量也是使用这个参数配置。太多的handler可能会适得其反。可以设置成为CPU数量的倍数。如果主要是只读的,handler和cpu数量相同就可以了。可以从CPU数量的两倍开始,再做调整
hbase.ipc.server.callqueue.handler.factor 默认0.1
确定callQueue数量的因素。值为0表示在所有handlers之间共享单个队列。值为1意味着每个handler都有自己的队列。
hbase.ipc.server.max.callqueue.length 默认为10倍hander大小
每个callQueue的队列长度,也就是可以容纳多少个请求
回答问题:
所以handler的数量默认是30个,我们可以通过hbase.regionserver.handler.count
去修改它,<font color=red>对应回答问题2</font>
callQueue的数量=hbase.regionserver.handler.count * hbase.ipc.server.callqueue.handler.factor
,也就是默认30个handler,factor为0.1的话,会有3个callQueue,<font color=red>对应回答问题3</font>
每个callQueue都是一个<BlockingQueue<CallRunner>
,它的capacity就是hbase.ipc.server.max.callqueue.length
,<font color=red>对应回答问题4</font>
handler和callQueues的关系:<font color=red>对应回答问题1</font>
这个在handler启动的时候就决定了,比如要起10个handlers,就遍历每个handler通过下标计算index = qindex + (i % qsize);
qindex是callQueue队列的索引,qindex的取值,跟所有队列是否根据读写分离配置有关系,没有就是0开始,参考SimpleRpcScheduler图。
for (int i = 0; i < numHandlers; i++) {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
activeHandlerCount);
handler.start();
handlers.add(handler);
}

5. Call queue is full问题
在hbase集群日志中经常会看到Call queue is full的问题,我们先找到报这个错误的源头:
/*
队列长度参数:hbase.ipc.server.max.callqueue.length 默认为10倍hander大小
队列满的大小 = 请求1*请求1大小 + 请求2*请求2大小 + ...+ 请求length*请求length大小
在队列中的大小:callQueueSizeInBytes.sum=队列中的所有请求大小之和
从队列中已经读取出来的大小:totalRequestSize=buf.limit() 也就是ByteBuff中的大小
是否报Call queue is full的条件就是:
1. 在队列中的大小+从队列中已经读取出来的大小> maxQueueSizeInBytes
maxQueueSizeInBytes参数:hbase.ipc.server.max.callqueue.size 默认为1024*1024*1024=1G
*/
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize +
this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, 0, this.callCleanup);
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.sendResponseIfReady();
return;
}
// 或
//2. 这种情况在相应的rpcScheduler dispatch的时候会判断当前队列大小是否超过maxQueueLength,如果超过了会直接返回false
if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", too many items queued ?");
call.sendResponseIfReady();
}
从代码中我们知道Call queue is full会有两种情况:CALL_QUEUE_TOO_BIG_EXCEPTION
尝试办法:
当报hbase.ipc.server.max.callqueue.size too small的时候,可以尝试增大hbase.ipc.server.max.callqueue.size
默认是102410241024=1073741824=1G
当报 too many items queued的时候,可以尝试增大hbase.ipc.server.max.callqueue.length
默认=`handlerCount*10
以上是“hbase callQueue的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!