Shellbye.github.io
Shellbye.github.io copied to clipboard
RocketMQ 集群部署 java.lang.RuntimeException: Lock failed,MQ already started 解决笔记与源码阅读
问题描述
RocketMQ
单机是比较容易配置的,基本上是拆箱即用,按照很多网络上的说法,集群部署也是非常简单,当然,说简单的应该是机器资源比较充沛的人,本周我在两条机器上交叉部署双主双从时,在启动了 broker
的的机器 B 上尝试启动机器 A 的 slave 节点时,启动报错如下
java.lang.RuntimeException: Lock failed,MQ already started
at org.apache.rocketmq.store.DefaultMessageStore.start(DefaultMessageStore.java:220)
at org.apache.rocketmq.broker.BrokerController.start(BrokerController.java:826)
at org.apache.rocketmq.broker.BrokerStartup.start(BrokerStartup.java:64)
at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58)
解决方案
在查阅了网上一些资料之后,发现是RocketMQ
的存储路径冲突造成的,直接的解决方案就是在 broker
的配置文件中添加如下配置
storePathRootDir = /some/new/path/
并在启动时指定配置文件,
$ ./bin/mqbroker -c conf/broker.conf
这样就可以解决这个问题了。
源码分析
问题解决了,接着这个机会,顺道看看这部分的源码是啥样子的。首先是 broker
的启动类 org.apache.rocketmq.broker.BrokerStartup
,在其中的 start
方法中,调用了 org.apache.rocketmq.broker.BrokerController
的 start
方法,这里就是一系列的 start
了,如下
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start(); // <============
}
if (this.remotingServer != null) {
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
// ...其余代码
}
其中可以看到这里调用了 messageStore.start()
,这里的具体的实现(org.apache.rocketmq.store.DefaultMessageStore
)如下:
/**
* @throws Exception
*/
public void start() throws Exception {
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
// <============
throw new RuntimeException("Lock failed,MQ already started");
}
// ...其余代码
}
我们可以看到,这里就是最后抛出以上错误信息的地方。那么 messageStore
是从那里初始化的呢?还是要回到 BrokerController
,如下
if (result) {
try {
// <============
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
这里我们可以看到具体的实现使用的是 DefaultMessageStore
,而配置用的则是 org.apache.rocketmq.store.config.MessageStoreConfig
,到这里,我们就明白了,因为这里写明白了,存储的默认值是 user.home
下面的 store
,
public class MessageStoreConfig {
//The root directory in which the log data is kept
// <============
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
// ... 其他代码
}
所以我们才需要修改 broker
的配置,添加storePathRootDir
,并在启动时通过-c
添加配置文件路径。
我把同一个机器上的master、slave 的storePathRootDir路径改为了不同的路径,但是也还是会出现这个错误。
我也改过地址,依然也是偶尔出这个问题。不过我是三主三从 三个机器上装
我把同一个机器上的master、slave 的storePathRootDir路径改为了不同的路径,但是也还是会出现这个错误。
我也是,路径全改为不相同了,还是报错: /opt/rocketmq/bin/mqbroker: 48: [[: not found java.lang.RuntimeException: Lock failed,MQ already started