diff options
Diffstat (limited to 'cachedb/redis.c')
-rw-r--r-- | cachedb/redis.c | 280 |
1 files changed, 201 insertions, 79 deletions
diff --git a/cachedb/redis.c b/cachedb/redis.c index 68c033535a69..3dfa95859eb8 100644 --- a/cachedb/redis.c +++ b/cachedb/redis.c @@ -52,19 +52,38 @@ #include "hiredis/hiredis.h" struct redis_moddata { - redisContext** ctxs; /* thread-specific redis contexts */ - int numctxs; /* number of ctx entries */ - const char* server_host; /* server's IP address or host name */ - int server_port; /* server's TCP port */ - const char* server_path; /* server's unix path, or "", NULL if unused */ - const char* server_password; /* server's AUTH password, or "", NULL if unused */ - struct timeval command_timeout; /* timeout for commands */ - struct timeval connect_timeout; /* timeout for connect */ - int logical_db; /* the redis logical database to use */ + /* thread-specific redis contexts */ + redisContext** ctxs; + redisContext** replica_ctxs; + /* number of ctx entries */ + int numctxs; + /* server's IP address or host name */ + const char* server_host; + const char* replica_server_host; + /* server's TCP port */ + int server_port; + int replica_server_port; + /* server's unix path, or "", NULL if unused */ + const char* server_path; + const char* replica_server_path; + /* server's AUTH password, or "", NULL if unused */ + const char* server_password; + const char* replica_server_password; + /* timeout for commands */ + struct timeval command_timeout; + struct timeval replica_command_timeout; + /* timeout for connection setup */ + struct timeval connect_timeout; + struct timeval replica_connect_timeout; + /* the redis logical database to use */ + int logical_db; + int replica_logical_db; + /* if the SET with EX command is supported */ + int set_with_ex_available; }; static redisReply* redis_command(struct module_env*, struct cachedb_env*, - const char*, const uint8_t*, size_t); + const char*, const uint8_t*, size_t, int); static void moddata_clean(struct redis_moddata** moddata) { @@ -78,21 +97,30 @@ moddata_clean(struct redis_moddata** moddata) { } free((*moddata)->ctxs); } + if((*moddata)->replica_ctxs) { + int i; + for(i = 0; i < (*moddata)->numctxs; i++) { + if((*moddata)->replica_ctxs[i]) + redisFree((*moddata)->replica_ctxs[i]); + } + free((*moddata)->replica_ctxs); + } free(*moddata); *moddata = NULL; } static redisContext* -redis_connect(const struct redis_moddata* moddata) +redis_connect(const char* host, int port, const char* path, + const char* password, int logical_db, + const struct timeval connect_timeout, + const struct timeval command_timeout) { redisContext* ctx; - if(moddata->server_path && moddata->server_path[0]!=0) { - ctx = redisConnectUnixWithTimeout(moddata->server_path, - moddata->connect_timeout); + if(path && path[0]!=0) { + ctx = redisConnectUnixWithTimeout(path, connect_timeout); } else { - ctx = redisConnectWithTimeout(moddata->server_host, - moddata->server_port, moddata->connect_timeout); + ctx = redisConnectWithTimeout(host, port, connect_timeout); } if(!ctx || ctx->err) { const char *errstr = "out of memory"; @@ -101,13 +129,13 @@ redis_connect(const struct redis_moddata* moddata) log_err("failed to connect to redis server: %s", errstr); goto fail; } - if(redisSetTimeout(ctx, moddata->command_timeout) != REDIS_OK) { - log_err("failed to set redis timeout"); + if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) { + log_err("failed to set redis timeout, %s", ctx->errstr); goto fail; } - if(moddata->server_password && moddata->server_password[0]!=0) { + if(password && password[0]!=0) { redisReply* rep; - rep = redisCommand(ctx, "AUTH %s", moddata->server_password); + rep = redisCommand(ctx, "AUTH %s", password); if(!rep || rep->type == REDIS_REPLY_ERROR) { log_err("failed to authenticate with password"); freeReplyObject(rep); @@ -115,18 +143,25 @@ redis_connect(const struct redis_moddata* moddata) } freeReplyObject(rep); } - if(moddata->logical_db > 0) { + if(logical_db > 0) { redisReply* rep; - rep = redisCommand(ctx, "SELECT %d", moddata->logical_db); + rep = redisCommand(ctx, "SELECT %d", logical_db); if(!rep || rep->type == REDIS_REPLY_ERROR) { log_err("failed to set logical database (%d)", - moddata->logical_db); + logical_db); freeReplyObject(rep); goto fail; } freeReplyObject(rep); } - verbose(VERB_OPS, "Connection to Redis established"); + if(verbosity >= VERB_OPS) { + char port_str[6+1]; + port_str[0] = ' '; + (void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port); + verbose(VERB_OPS, "Connection to Redis established (%s%s)", + path&&path[0]!=0?path:host, + path&&path[0]!=0?"":port_str); + } return ctx; fail: @@ -135,6 +170,14 @@ fail: return NULL; } +static void +set_timeout(struct timeval* timeout, int value, int explicit_value) +{ + int v = explicit_value != 0 ? explicit_value : value; + timeout->tv_sec = v / 1000; + timeout->tv_usec = (v % 1000) * 1000; +} + static int redis_init(struct module_env* env, struct cachedb_env* cachedb_env) { @@ -149,57 +192,98 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) goto fail; } moddata->numctxs = env->cfg->num_threads; - moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*)); - if(!moddata->ctxs) { - log_err("out of memory"); - goto fail; - } - /* note: server_host is a shallow reference to configured string. - * we don't have to free it in this module. */ + /* note: server_host and similar string configuration options are + * shallow references to configured strings; we don't have to free them + * in this module. */ moddata->server_host = env->cfg->redis_server_host; + moddata->replica_server_host = env->cfg->redis_replica_server_host; + moddata->server_port = env->cfg->redis_server_port; + moddata->replica_server_port = env->cfg->redis_replica_server_port; + moddata->server_path = env->cfg->redis_server_path; + moddata->replica_server_path = env->cfg->redis_replica_server_path; + moddata->server_password = env->cfg->redis_server_password; - moddata->command_timeout.tv_sec = env->cfg->redis_timeout / 1000; - moddata->command_timeout.tv_usec = - (env->cfg->redis_timeout % 1000) * 1000; - moddata->connect_timeout.tv_sec = env->cfg->redis_timeout / 1000; - moddata->connect_timeout.tv_usec = - (env->cfg->redis_timeout % 1000) * 1000; - if(env->cfg->redis_command_timeout != 0) { - moddata->command_timeout.tv_sec = - env->cfg->redis_command_timeout / 1000; - moddata->command_timeout.tv_usec = - (env->cfg->redis_command_timeout % 1000) * 1000; + moddata->replica_server_password = env->cfg->redis_replica_server_password; + + set_timeout(&moddata->command_timeout, + env->cfg->redis_timeout, + env->cfg->redis_command_timeout); + set_timeout(&moddata->replica_command_timeout, + env->cfg->redis_replica_timeout, + env->cfg->redis_replica_command_timeout); + set_timeout(&moddata->connect_timeout, + env->cfg->redis_timeout, + env->cfg->redis_connect_timeout); + set_timeout(&moddata->replica_connect_timeout, + env->cfg->redis_replica_timeout, + env->cfg->redis_replica_connect_timeout); + + moddata->logical_db = env->cfg->redis_logical_db; + moddata->replica_logical_db = env->cfg->redis_replica_logical_db; + + moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*)); + if(!moddata->ctxs) { + log_err("out of memory"); + goto fail; } - if(env->cfg->redis_connect_timeout != 0) { - moddata->connect_timeout.tv_sec = - env->cfg->redis_connect_timeout / 1000; - moddata->connect_timeout.tv_usec = - (env->cfg->redis_connect_timeout % 1000) * 1000; + if((moddata->replica_server_host && moddata->replica_server_host[0]!=0) + || (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) { + /* There is a replica configured, allocate ctxs */ + moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*)); + if(!moddata->replica_ctxs) { + log_err("out of memory"); + goto fail; + } } - moddata->logical_db = env->cfg->redis_logical_db; for(i = 0; i < moddata->numctxs; i++) { - redisContext* ctx = redis_connect(moddata); + redisContext* ctx = redis_connect( + moddata->server_host, + moddata->server_port, + moddata->server_path, + moddata->server_password, + moddata->logical_db, + moddata->connect_timeout, + moddata->command_timeout); if(!ctx) { - log_err("redis_init: failed to init redis"); - goto fail; + log_err("redis_init: failed to init redis " + "(for thread %d)", i); + /* And continue, the context can be established + * later, just like after a disconnect. */ } moddata->ctxs[i] = ctx; } + if(moddata->replica_ctxs) { + for(i = 0; i < moddata->numctxs; i++) { + redisContext* ctx = redis_connect( + moddata->replica_server_host, + moddata->replica_server_port, + moddata->replica_server_path, + moddata->replica_server_password, + moddata->replica_logical_db, + moddata->replica_connect_timeout, + moddata->replica_command_timeout); + if(!ctx) { + log_err("redis_init: failed to init redis " + "replica (for thread %d)", i); + /* And continue, the context can be established + * later, just like after a disconnect. */ + } + moddata->replica_ctxs[i] = ctx; + } + } cachedb_env->backend_data = moddata; - if(env->cfg->redis_expire_records) { + if(env->cfg->redis_expire_records && + moddata->ctxs[env->alloc->thread_num] != NULL) { redisReply* rep = NULL; int redis_reply_type = 0; - /** check if setex command is supported */ + /** check if set with ex command is supported */ rep = redis_command(env, cachedb_env, - "SETEX __UNBOUND_REDIS_CHECK__ 1 none", NULL, 0); + "SET __UNBOUND_REDIS_CHECK__ none EX 1", NULL, 0, 1); if(!rep) { /** init failed, no response from redis server*/ - log_err("redis_init: failed to init redis, the " - "redis-expire-records option requires the SETEX command " - "(redis >= 2.0.0)"); - goto fail; + goto set_with_ex_fail; } redis_reply_type = rep->type; freeReplyObject(rep); @@ -207,15 +291,18 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) case REDIS_REPLY_STATUS: break; default: - /** init failed, setex command not supported */ - log_err("redis_init: failed to init redis, the " - "redis-expire-records option requires the SETEX command " - "(redis >= 2.0.0)"); - goto fail; + /** init failed, set_with_ex command not supported */ + goto set_with_ex_fail; } + moddata->set_with_ex_available = 1; } return 1; +set_with_ex_fail: + log_err("redis_init: failure during redis_init, the " + "redis-expire-records option requires the SET with EX command " + "(redis >= 2.6.2)"); + return 1; fail: moddata_clean(&moddata); return 0; @@ -246,9 +333,9 @@ redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env) */ static redisReply* redis_command(struct module_env* env, struct cachedb_env* cachedb_env, - const char* command, const uint8_t* data, size_t data_len) + const char* command, const uint8_t* data, size_t data_len, int write) { - redisContext* ctx; + redisContext* ctx, **ctx_selector; redisReply* rep; struct redis_moddata* d = (struct redis_moddata*) cachedb_env->backend_data; @@ -259,17 +346,38 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env, * assumption throughout the unbound architecture, so we simply assert * it. */ log_assert(env->alloc->thread_num < d->numctxs); - ctx = d->ctxs[env->alloc->thread_num]; + + ctx_selector = !write && d->replica_ctxs + ?d->replica_ctxs + :d->ctxs; + ctx = ctx_selector[env->alloc->thread_num]; /* If we've not established a connection to the server or we've closed * it on a failure, try to re-establish a new one. Failures will be * logged in redis_connect(). */ if(!ctx) { - ctx = redis_connect(d); - d->ctxs[env->alloc->thread_num] = ctx; + if(!write && d->replica_ctxs) { + ctx = redis_connect( + d->replica_server_host, + d->replica_server_port, + d->replica_server_path, + d->replica_server_password, + d->replica_logical_db, + d->replica_connect_timeout, + d->replica_command_timeout); + } else { + ctx = redis_connect( + d->server_host, + d->server_port, + d->server_path, + d->server_password, + d->logical_db, + d->connect_timeout, + d->command_timeout); + } + ctx_selector[env->alloc->thread_num] = ctx; } - if(!ctx) - return NULL; + if(!ctx) return NULL; /* Send the command and get a reply, synchronously. */ rep = (redisReply*)redisCommand(ctx, command, data, data_len); @@ -279,7 +387,7 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env, log_err("redis_command: failed to receive a reply, " "closing connection: %s", ctx->errstr); redisFree(ctx); - d->ctxs[env->alloc->thread_num] = NULL; + ctx_selector[env->alloc->thread_num] = NULL; return NULL; } @@ -309,7 +417,7 @@ redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env, return 0; } - rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0); + rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0); if(!rep) return 0; switch(rep->type) { @@ -346,11 +454,16 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env, { redisReply* rep; int n; - int set_ttl = (env->cfg->redis_expire_records && + struct redis_moddata* moddata = (struct redis_moddata*) + cachedb_env->backend_data; + int set_ttl = (moddata->set_with_ex_available && + env->cfg->redis_expire_records && (!env->cfg->serve_expired || env->cfg->serve_expired_ttl > 0)); /* Supported commands: * - "SET " + key + " %b" - * - "SETEX " + key + " " + ttl + " %b" + * - "SET " + key + " %b EX " + ttl + * older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b" + * - "EXPIRE " + key + " 0" */ char cmdbuf[6+(CACHEDB_HASHSIZE/8)*2+11+3+1]; @@ -358,14 +471,22 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env, verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len); /* build command to set to a binary safe string */ n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b", key); + } else if(ttl == 0) { + /* use the EXPIRE command, SET with EX 0 is an invalid time. */ + /* Replies with REDIS_REPLY_INTEGER of 1. */ + verbose(VERB_ALGO, "redis_store expire %s (%d bytes)", + key, (int)data_len); + n = snprintf(cmdbuf, sizeof(cmdbuf), "EXPIRE %s 0", key); + data = NULL; + data_len = 0; } else { /* add expired ttl time to redis ttl to avoid premature eviction of key */ ttl += env->cfg->serve_expired_ttl; verbose(VERB_ALGO, "redis_store %s (%d bytes) with ttl %u", - key, (int)data_len, (uint32_t)ttl); + key, (int)data_len, (unsigned)(uint32_t)ttl); /* build command to set to a binary safe string */ - n = snprintf(cmdbuf, sizeof(cmdbuf), "SETEX %s %u %%b", key, - (uint32_t)ttl); + n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b EX %u", key, + (unsigned)(uint32_t)ttl); } @@ -374,11 +495,12 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env, return; } - rep = redis_command(env, cachedb_env, cmdbuf, data, data_len); + rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1); if(rep) { verbose(VERB_ALGO, "redis_store set completed"); if(rep->type != REDIS_REPLY_STATUS && - rep->type != REDIS_REPLY_ERROR) { + rep->type != REDIS_REPLY_ERROR && + rep->type != REDIS_REPLY_INTEGER) { log_err("redis_store: unexpected type of reply (%d)", rep->type); } |