这篇文章主要介绍“RocketMQ的事务消息是什么意思”,在日常操作中,相信很多人在RocketMQ的事务消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ的事务消息是什么意思”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
一、 背景
阿里的RocketMQ以前版本阉割的消息回查,在新版又重新加入了,解决小公司没能力做可靠消息中间件产品。同时RocketMQ也参考了Kafka实现,性能上也很不错。
二、 版本
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
三、源码解读
官方demo
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
private static final String TX_PGROUP_NAME = "myTxProducerGroup";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.transTopic}")
private String springTransTopic;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Send transactional messages
testTransaction();
}
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder
.withPayload("Hello RocketMQ " + i)
.setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)
.build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
springTransTopic + ":" + tags[i % tags.length],
msg,
null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(),
sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// Return local transaction with success(commit), in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// Return local transaction with failure(rollback) , in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
}
事务消息调用的是RocketMQTemplate.sendMessageInTransaction()
,那么就从这里开始
//RocketMQTemplate
public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
try {
//从本地缓存中获取生产者组名的生产者
TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
charset, destination, message);
return txProducer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
进入txProducer.sendMessageInTransaction(rocketMsg, arg)
//TransactionMQProducer
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
//是否已经设置事务监听器(本地事务、回调查询)
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
进入defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg)
//DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//设置为预消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
//发送成功,当消息对象中的isWaitStoreMsgOK=true(默认true),如果 isWaitStoreMsgOK=false,当没有捕获到异常,那么将返回SEND_OK
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
//执行传入的本地分支事务
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
//执行注解或者生产者构造传入的事务监听器
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
//刷盘超时
case FLUSH_DISK_TIMEOUT:
//数据同步到Slave服务器器超时
case FLUSH_SLAVE_TIMEOUT:
//无Slave服务器器可用
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//发送二次确认消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//封装执行结果
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
//提交事务
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
//提交事务
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
//提交事务
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//单向发送二次确认消息,不需要服务端相应,由消息回查监听补偿
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
刚开始看RocketMQ的事务消息Example时,用的监听器执行本地事务,还以为是通过向服务端发送预消息,异步监听服务端响应再处理本地事务,那客户端根本没法实时响应。
到此,关于“RocketMQ的事务消息是什么意思”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!