1.Kafka通讯的基本单位是Request/Response 2.基本结构: RequestOrResponse ---> MessageSize(RequestMessage | ResponseMessage) 名称类型描述ApiKeyInt16标识这次请求的API编号ApiVersionInt16标识请求的API版本,有了版本后就可以做到向后兼容CorrelationIdInt32由客户端指定的一个数字唯一标识这次请求的id,服务器端在处理请求后也会把同样的CorrelationId写到Response中,这样客户端就能把某个请求和响应对应起来了ClientIdstring客户端指定的用来描述客户端的字符串,会被用来记录日志和监控,它唯一标识一个客户端Request-Request的具体内容 3.通讯过程: 3.1客户端打开与服务端的Socket 3.2往Socket写入一个int32的数字(数字标识这次发送的Request有多少字节) 3.3服务器端先读出一个int32的整数从而获取这次Request的大小 3.4然后读取对应字节数的数据从而得到Request的具体内容 3.5服务器端处理了请求之后也用同样的发送发誓来发送响应 4.RequestMessage结构 4.1RequestMessage ---> ApiKey ApiVersion CorrelationId ClientId Request 名称类型描述MessageSizeint32标识RequestMessage或者ResponseMessage的长度RequestMessageResponseMessage--标识Request或者Response的内容 5.ResponseMessage 5.1ResponseMessage ---> CorrelationId Response 名称类型描述CorrelationIdint32对应Request的CorrelationIdResponse--对应Request的Response,不同的Request的Response的字段是不一样的 Kafka采用是经典的Reactor(同步IO)模式,也就是1个Acceptor响应客户端的连接请求,N个Processor来读取数据,这种模式可以构建出高性能的服务器 6.Message:Producer生产的消息,键-值对 6.1Message --- > Crc MagicByte Attributes Key Value 名称类型描述CRCInt32标识这条消息(不包括CRC字段本身)的校验码MagicByteInt8标识消息格式的版本,用来做向后兼容,目前值为0AttributesInt8标识这条消息的元数据,目前最低两位用来标识压缩格式Keybytes标识这条消息的Key,可以为nullValuebytes标识这条消息的Value。Kafka支持消息嵌套,也就是把一条消息作为Value放到另外一个消息里面 说明: CRC是一种消息检验方式,在Consumer拿到数据以后,CRC会获取MessageSize和MessageData的大小做比较,如果不一致则,那么这个操作的数据Consumer就不接收了,如果一直则才做处理。防止消息在传输过程中损坏,丢失的一种校验方式 7.MessageSet:用来组合多条Message,它在每条Message的基础上加上offset和MessageSize 7.1MessageSet --> [offset MessageSize Message] 名称类型描述OffsetInt64它用来作为log中的序列号,Producer在生产消息的时候还不知道具体的值是什么,可以随便填个数字进去MessageSizeInt32标识这条Message的大小Message-标识这条Message的具体内容,其格式见上一小结 8.Request/Response和Message/messageSet的关系 8.1 Request/Response是通讯层的结构,和网络的7层模型对比的话,它类似于TCP 8.2 Message/MessageSet定义的是业务层的结构,类似于网络7层模型中的HTTP层。Message/MessageSet只是Request/Response的payload中的一种数据结构 备注:Kafka的通讯协议中不包含Schema,格式也比较简单,这样设计的好处是协议自身的Overhead小,再加上把多条Message放到一期做压缩,提高压缩比率,从而在网络上传输的数据量会少一些 |