blog
blog copied to clipboard
Redis 的启动
Redis 中用一个 redisServer 对象来维护 redis 服务器的配置和状态:
// 5.0.3/src/server.c
struct redisServer server; /* Server global state */
启动过程:
// 5.0.3/src/server.c
int main(int argc, char **argv) {
...
// 初始化 server 配置 - 默认配置
initServerConfig();
// 初始化模块系统
moduleInitModulesSystem();
// 初始化 server 配置 - 保存启动命令的路径和参数
/* Store the executable path and arguments in a safe place in order
* to be able to restart the server later. */
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
// 以 sentinel 模式启动和初始化 sentinel
server.sentinel_mode = checkForSentinelMode(argc,argv);
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 以 redis-check-rdb/aof 模式启动检查 rdb/aof
/* Check if we need to start in redis-check-rdb/aof mode. We just execute
* the program main. However the program is part of the Redis executable
* so that we can easily execute an RDB check on loading errors. */
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
// 初始化 server 配置 - 自定义配置(命令行参数和配置文件)
if (argc >= 2) {
...
}
...
// 以服务或守护进程模式启动
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
if (background) daemonize();
// 初始化 server
initServer();
if (background || server.pidfile) createPidFile();
...
if (!server.sentinel_mode) {
...
// 模块加载
moduleLoadFromQueue();
// rdb/aof 数据加载
loadDataFromDisk();
...
} else {
sentinelIsRunning();
}
...
// 启动事件循环,开始监听事件
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
初始化 server 配置
默认配置
initServerConfig 函数初始化 redisServer 结构体,为 server 的一些配置项属性设置默认值以及初始化命令表:
void initServerConfig(void) {
...
/* Command table -- we initiialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
...
}
populateCommandTable 函数将 redisCommandTable 数组中的所有命令填充到 server 维护两个命令表:commands 和 orig_commands。
// 5.0.3/src/server.c
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
...
}
/* Populates the Redis Command Table starting from the hard coded list
* we have on top of redis.c file. */
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
int retval1, retval2;
...
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
orig_commands 命令表中保留了一份不受配置文件 redis.conf 的 rename-command 配置影响的命令。
自定义配置
int main(int argc, char **argv) {
...
if (argc >= 2) {
j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
char *configfile = NULL;
...
/* First argument is the config file name? */
if (argv[j][0] != '-' || argv[j][1] != '-') {
configfile = argv[j];
server.configfile = getAbsolutePath(configfile);
/* Replace the config file in server.exec_argv with
* its absolute path. */
zfree(server.exec_argv[j]);
server.exec_argv[j] = zstrdup(server.configfile);
j++;
}
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual file name
* is parsed, if any. */
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (!strcmp(argv[j], "--check-rdb")) {
/* Argument has no options, need to skip for parsing. */
j++;
continue;
}
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
...
resetServerSaveParams();
loadServerConfig(configfile,options);
sdsfree(options);
}
...
}
loadServerConfig 加载并解析配置文件,options 表示命令行输入的配置参数。
// 5.0.3/src/config.c
/* Load the server configuration from the specified filename.
* The function appends the additional configuration directives stored
* in the 'options' string to the config file before loading.
*
* Both filename and options can be NULL, in such a case are considered
* empty. This way loadServerConfig can be used to just load a file or
* just load a string. */
void loadServerConfig(char *filename, char *options) {
sds config = sdsempty();
char buf[CONFIG_MAX_LINE+1];
/* Load the file content */
if (filename) {
FILE *fp;
if (filename[0] == '-' && filename[1] == '\0') {
fp = stdin;
} else {
if ((fp = fopen(filename,"r")) == NULL) {
serverLog(LL_WARNING,
"Fatal error, can't open config file '%s'", filename);
exit(1);
}
}
while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)
config = sdscat(config,buf);
if (fp != stdin) fclose(fp);
}
/* Append the additional options */
if (options) {
config = sdscat(config,"\n");
config = sdscat(config,options);
}
loadServerConfigFromString(config);
sdsfree(config);
}
配置文件配置和命令行参数配置都合并到 config 里面,调用 loadServerConfigFromString 加载到 server 中。
// 5.0.3/src/config.c
void loadServerConfigFromString(char *config) {
char *err = NULL;
int linenum = 0, totlines, i;
int slaveof_linenum = 0;
sds *lines;
lines = sdssplitlen(config,strlen(config),"\n",1,&totlines);
for (i = 0; i < totlines; i++) {
sds *argv;
int argc;
linenum = i+1;
lines[i] = sdstrim(lines[i]," \t\r\n");
/* Skip comments and blank lines */
if (lines[i][0] == '#' || lines[i][0] == '\0') continue;
/* Split into arguments */
argv = sdssplitargs(lines[i],&argc);
if (argv == NULL) {
err = "Unbalanced quotes in configuration line";
goto loaderr;
}
/* Skip this line if the resulting command vector is empty. */
if (argc == 0) {
sdsfreesplitres(argv,argc);
continue;
}
sdstolower(argv[0]);
/* Execute config directives */
if (!strcasecmp(argv[0],"timeout") && argc == 2) {
server.maxidletime = atoi(argv[1]);
if (server.maxidletime < 0) {
err = "Invalid timeout value"; goto loaderr;
}
...
初始化 server
initServer 函数初始化 server 的状态,比如:
信号处理相关
signal(SIGHUP, SIG_IGN); // 忽略 SIGHUP 信号
signal(SIGPIPE, SIG_IGN); // 忽略 SIGPIPE 信号
setupSignalHandlers(); // 注册信号处理器
客户端相关
server.current_client = NULL;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
server.clients_pending_write = listCreate();
server.unblocked_clients = listCreate();
server.clients_waiting_acks = listCreate();
server.clients_paused = 0;
共享对象
createSharedObjects();
数据库相关
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
}
socket 相关
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
事件循环相关
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
其他
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
if (server.cluster_enabled) clusterInit();
replicationScriptCacheInit();
scriptingInit(1);
slowlogInit();
latencyMonitorInit();
bioInit();
server.initial_memory_usage = zmalloc_used_memory();
其他初始包括了一些子功能的初始化,aof、慢日志、延迟监控等。
启动事件循环
// 5.0.3/src/ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}