aboutsummaryrefslogtreecommitdiff
path: root/cachedb/redis.c
diff options
context:
space:
mode:
Diffstat (limited to 'cachedb/redis.c')
-rw-r--r--cachedb/redis.c280
1 files changed, 201 insertions, 79 deletions
diff --git a/cachedb/redis.c b/cachedb/redis.c
index 68c033535a69..3dfa95859eb8 100644
--- a/cachedb/redis.c
+++ b/cachedb/redis.c
@@ -52,19 +52,38 @@
#include "hiredis/hiredis.h"
struct redis_moddata {
- redisContext** ctxs; /* thread-specific redis contexts */
- int numctxs; /* number of ctx entries */
- const char* server_host; /* server's IP address or host name */
- int server_port; /* server's TCP port */
- const char* server_path; /* server's unix path, or "", NULL if unused */
- const char* server_password; /* server's AUTH password, or "", NULL if unused */
- struct timeval command_timeout; /* timeout for commands */
- struct timeval connect_timeout; /* timeout for connect */
- int logical_db; /* the redis logical database to use */
+ /* thread-specific redis contexts */
+ redisContext** ctxs;
+ redisContext** replica_ctxs;
+ /* number of ctx entries */
+ int numctxs;
+ /* server's IP address or host name */
+ const char* server_host;
+ const char* replica_server_host;
+ /* server's TCP port */
+ int server_port;
+ int replica_server_port;
+ /* server's unix path, or "", NULL if unused */
+ const char* server_path;
+ const char* replica_server_path;
+ /* server's AUTH password, or "", NULL if unused */
+ const char* server_password;
+ const char* replica_server_password;
+ /* timeout for commands */
+ struct timeval command_timeout;
+ struct timeval replica_command_timeout;
+ /* timeout for connection setup */
+ struct timeval connect_timeout;
+ struct timeval replica_connect_timeout;
+ /* the redis logical database to use */
+ int logical_db;
+ int replica_logical_db;
+ /* if the SET with EX command is supported */
+ int set_with_ex_available;
};
static redisReply* redis_command(struct module_env*, struct cachedb_env*,
- const char*, const uint8_t*, size_t);
+ const char*, const uint8_t*, size_t, int);
static void
moddata_clean(struct redis_moddata** moddata) {
@@ -78,21 +97,30 @@ moddata_clean(struct redis_moddata** moddata) {
}
free((*moddata)->ctxs);
}
+ if((*moddata)->replica_ctxs) {
+ int i;
+ for(i = 0; i < (*moddata)->numctxs; i++) {
+ if((*moddata)->replica_ctxs[i])
+ redisFree((*moddata)->replica_ctxs[i]);
+ }
+ free((*moddata)->replica_ctxs);
+ }
free(*moddata);
*moddata = NULL;
}
static redisContext*
-redis_connect(const struct redis_moddata* moddata)
+redis_connect(const char* host, int port, const char* path,
+ const char* password, int logical_db,
+ const struct timeval connect_timeout,
+ const struct timeval command_timeout)
{
redisContext* ctx;
- if(moddata->server_path && moddata->server_path[0]!=0) {
- ctx = redisConnectUnixWithTimeout(moddata->server_path,
- moddata->connect_timeout);
+ if(path && path[0]!=0) {
+ ctx = redisConnectUnixWithTimeout(path, connect_timeout);
} else {
- ctx = redisConnectWithTimeout(moddata->server_host,
- moddata->server_port, moddata->connect_timeout);
+ ctx = redisConnectWithTimeout(host, port, connect_timeout);
}
if(!ctx || ctx->err) {
const char *errstr = "out of memory";
@@ -101,13 +129,13 @@ redis_connect(const struct redis_moddata* moddata)
log_err("failed to connect to redis server: %s", errstr);
goto fail;
}
- if(redisSetTimeout(ctx, moddata->command_timeout) != REDIS_OK) {
- log_err("failed to set redis timeout");
+ if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
+ log_err("failed to set redis timeout, %s", ctx->errstr);
goto fail;
}
- if(moddata->server_password && moddata->server_password[0]!=0) {
+ if(password && password[0]!=0) {
redisReply* rep;
- rep = redisCommand(ctx, "AUTH %s", moddata->server_password);
+ rep = redisCommand(ctx, "AUTH %s", password);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to authenticate with password");
freeReplyObject(rep);
@@ -115,18 +143,25 @@ redis_connect(const struct redis_moddata* moddata)
}
freeReplyObject(rep);
}
- if(moddata->logical_db > 0) {
+ if(logical_db > 0) {
redisReply* rep;
- rep = redisCommand(ctx, "SELECT %d", moddata->logical_db);
+ rep = redisCommand(ctx, "SELECT %d", logical_db);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to set logical database (%d)",
- moddata->logical_db);
+ logical_db);
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
- verbose(VERB_OPS, "Connection to Redis established");
+ if(verbosity >= VERB_OPS) {
+ char port_str[6+1];
+ port_str[0] = ' ';
+ (void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port);
+ verbose(VERB_OPS, "Connection to Redis established (%s%s)",
+ path&&path[0]!=0?path:host,
+ path&&path[0]!=0?"":port_str);
+ }
return ctx;
fail:
@@ -135,6 +170,14 @@ fail:
return NULL;
}
+static void
+set_timeout(struct timeval* timeout, int value, int explicit_value)
+{
+ int v = explicit_value != 0 ? explicit_value : value;
+ timeout->tv_sec = v / 1000;
+ timeout->tv_usec = (v % 1000) * 1000;
+}
+
static int
redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
{
@@ -149,57 +192,98 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
goto fail;
}
moddata->numctxs = env->cfg->num_threads;
- moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
- if(!moddata->ctxs) {
- log_err("out of memory");
- goto fail;
- }
- /* note: server_host is a shallow reference to configured string.
- * we don't have to free it in this module. */
+ /* note: server_host and similar string configuration options are
+ * shallow references to configured strings; we don't have to free them
+ * in this module. */
moddata->server_host = env->cfg->redis_server_host;
+ moddata->replica_server_host = env->cfg->redis_replica_server_host;
+
moddata->server_port = env->cfg->redis_server_port;
+ moddata->replica_server_port = env->cfg->redis_replica_server_port;
+
moddata->server_path = env->cfg->redis_server_path;
+ moddata->replica_server_path = env->cfg->redis_replica_server_path;
+
moddata->server_password = env->cfg->redis_server_password;
- moddata->command_timeout.tv_sec = env->cfg->redis_timeout / 1000;
- moddata->command_timeout.tv_usec =
- (env->cfg->redis_timeout % 1000) * 1000;
- moddata->connect_timeout.tv_sec = env->cfg->redis_timeout / 1000;
- moddata->connect_timeout.tv_usec =
- (env->cfg->redis_timeout % 1000) * 1000;
- if(env->cfg->redis_command_timeout != 0) {
- moddata->command_timeout.tv_sec =
- env->cfg->redis_command_timeout / 1000;
- moddata->command_timeout.tv_usec =
- (env->cfg->redis_command_timeout % 1000) * 1000;
+ moddata->replica_server_password = env->cfg->redis_replica_server_password;
+
+ set_timeout(&moddata->command_timeout,
+ env->cfg->redis_timeout,
+ env->cfg->redis_command_timeout);
+ set_timeout(&moddata->replica_command_timeout,
+ env->cfg->redis_replica_timeout,
+ env->cfg->redis_replica_command_timeout);
+ set_timeout(&moddata->connect_timeout,
+ env->cfg->redis_timeout,
+ env->cfg->redis_connect_timeout);
+ set_timeout(&moddata->replica_connect_timeout,
+ env->cfg->redis_replica_timeout,
+ env->cfg->redis_replica_connect_timeout);
+
+ moddata->logical_db = env->cfg->redis_logical_db;
+ moddata->replica_logical_db = env->cfg->redis_replica_logical_db;
+
+ moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
+ if(!moddata->ctxs) {
+ log_err("out of memory");
+ goto fail;
}
- if(env->cfg->redis_connect_timeout != 0) {
- moddata->connect_timeout.tv_sec =
- env->cfg->redis_connect_timeout / 1000;
- moddata->connect_timeout.tv_usec =
- (env->cfg->redis_connect_timeout % 1000) * 1000;
+ if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
+ || (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
+ /* There is a replica configured, allocate ctxs */
+ moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
+ if(!moddata->replica_ctxs) {
+ log_err("out of memory");
+ goto fail;
+ }
}
- moddata->logical_db = env->cfg->redis_logical_db;
for(i = 0; i < moddata->numctxs; i++) {
- redisContext* ctx = redis_connect(moddata);
+ redisContext* ctx = redis_connect(
+ moddata->server_host,
+ moddata->server_port,
+ moddata->server_path,
+ moddata->server_password,
+ moddata->logical_db,
+ moddata->connect_timeout,
+ moddata->command_timeout);
if(!ctx) {
- log_err("redis_init: failed to init redis");
- goto fail;
+ log_err("redis_init: failed to init redis "
+ "(for thread %d)", i);
+ /* And continue, the context can be established
+ * later, just like after a disconnect. */
}
moddata->ctxs[i] = ctx;
}
+ if(moddata->replica_ctxs) {
+ for(i = 0; i < moddata->numctxs; i++) {
+ redisContext* ctx = redis_connect(
+ moddata->replica_server_host,
+ moddata->replica_server_port,
+ moddata->replica_server_path,
+ moddata->replica_server_password,
+ moddata->replica_logical_db,
+ moddata->replica_connect_timeout,
+ moddata->replica_command_timeout);
+ if(!ctx) {
+ log_err("redis_init: failed to init redis "
+ "replica (for thread %d)", i);
+ /* And continue, the context can be established
+ * later, just like after a disconnect. */
+ }
+ moddata->replica_ctxs[i] = ctx;
+ }
+ }
cachedb_env->backend_data = moddata;
- if(env->cfg->redis_expire_records) {
+ if(env->cfg->redis_expire_records &&
+ moddata->ctxs[env->alloc->thread_num] != NULL) {
redisReply* rep = NULL;
int redis_reply_type = 0;
- /** check if setex command is supported */
+ /** check if set with ex command is supported */
rep = redis_command(env, cachedb_env,
- "SETEX __UNBOUND_REDIS_CHECK__ 1 none", NULL, 0);
+ "SET __UNBOUND_REDIS_CHECK__ none EX 1", NULL, 0, 1);
if(!rep) {
/** init failed, no response from redis server*/
- log_err("redis_init: failed to init redis, the "
- "redis-expire-records option requires the SETEX command "
- "(redis >= 2.0.0)");
- goto fail;
+ goto set_with_ex_fail;
}
redis_reply_type = rep->type;
freeReplyObject(rep);
@@ -207,15 +291,18 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
case REDIS_REPLY_STATUS:
break;
default:
- /** init failed, setex command not supported */
- log_err("redis_init: failed to init redis, the "
- "redis-expire-records option requires the SETEX command "
- "(redis >= 2.0.0)");
- goto fail;
+ /** init failed, set_with_ex command not supported */
+ goto set_with_ex_fail;
}
+ moddata->set_with_ex_available = 1;
}
return 1;
+set_with_ex_fail:
+ log_err("redis_init: failure during redis_init, the "
+ "redis-expire-records option requires the SET with EX command "
+ "(redis >= 2.6.2)");
+ return 1;
fail:
moddata_clean(&moddata);
return 0;
@@ -246,9 +333,9 @@ redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
*/
static redisReply*
redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
- const char* command, const uint8_t* data, size_t data_len)
+ const char* command, const uint8_t* data, size_t data_len, int write)
{
- redisContext* ctx;
+ redisContext* ctx, **ctx_selector;
redisReply* rep;
struct redis_moddata* d = (struct redis_moddata*)
cachedb_env->backend_data;
@@ -259,17 +346,38 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
* assumption throughout the unbound architecture, so we simply assert
* it. */
log_assert(env->alloc->thread_num < d->numctxs);
- ctx = d->ctxs[env->alloc->thread_num];
+
+ ctx_selector = !write && d->replica_ctxs
+ ?d->replica_ctxs
+ :d->ctxs;
+ ctx = ctx_selector[env->alloc->thread_num];
/* If we've not established a connection to the server or we've closed
* it on a failure, try to re-establish a new one. Failures will be
* logged in redis_connect(). */
if(!ctx) {
- ctx = redis_connect(d);
- d->ctxs[env->alloc->thread_num] = ctx;
+ if(!write && d->replica_ctxs) {
+ ctx = redis_connect(
+ d->replica_server_host,
+ d->replica_server_port,
+ d->replica_server_path,
+ d->replica_server_password,
+ d->replica_logical_db,
+ d->replica_connect_timeout,
+ d->replica_command_timeout);
+ } else {
+ ctx = redis_connect(
+ d->server_host,
+ d->server_port,
+ d->server_path,
+ d->server_password,
+ d->logical_db,
+ d->connect_timeout,
+ d->command_timeout);
+ }
+ ctx_selector[env->alloc->thread_num] = ctx;
}
- if(!ctx)
- return NULL;
+ if(!ctx) return NULL;
/* Send the command and get a reply, synchronously. */
rep = (redisReply*)redisCommand(ctx, command, data, data_len);
@@ -279,7 +387,7 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
log_err("redis_command: failed to receive a reply, "
"closing connection: %s", ctx->errstr);
redisFree(ctx);
- d->ctxs[env->alloc->thread_num] = NULL;
+ ctx_selector[env->alloc->thread_num] = NULL;
return NULL;
}
@@ -309,7 +417,7 @@ redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
return 0;
}
- rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0);
+ rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
if(!rep)
return 0;
switch(rep->type) {
@@ -346,11 +454,16 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
{
redisReply* rep;
int n;
- int set_ttl = (env->cfg->redis_expire_records &&
+ struct redis_moddata* moddata = (struct redis_moddata*)
+ cachedb_env->backend_data;
+ int set_ttl = (moddata->set_with_ex_available &&
+ env->cfg->redis_expire_records &&
(!env->cfg->serve_expired || env->cfg->serve_expired_ttl > 0));
/* Supported commands:
* - "SET " + key + " %b"
- * - "SETEX " + key + " " + ttl + " %b"
+ * - "SET " + key + " %b EX " + ttl
+ * older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b"
+ * - "EXPIRE " + key + " 0"
*/
char cmdbuf[6+(CACHEDB_HASHSIZE/8)*2+11+3+1];
@@ -358,14 +471,22 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len);
/* build command to set to a binary safe string */
n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b", key);
+ } else if(ttl == 0) {
+ /* use the EXPIRE command, SET with EX 0 is an invalid time. */
+ /* Replies with REDIS_REPLY_INTEGER of 1. */
+ verbose(VERB_ALGO, "redis_store expire %s (%d bytes)",
+ key, (int)data_len);
+ n = snprintf(cmdbuf, sizeof(cmdbuf), "EXPIRE %s 0", key);
+ data = NULL;
+ data_len = 0;
} else {
/* add expired ttl time to redis ttl to avoid premature eviction of key */
ttl += env->cfg->serve_expired_ttl;
verbose(VERB_ALGO, "redis_store %s (%d bytes) with ttl %u",
- key, (int)data_len, (uint32_t)ttl);
+ key, (int)data_len, (unsigned)(uint32_t)ttl);
/* build command to set to a binary safe string */
- n = snprintf(cmdbuf, sizeof(cmdbuf), "SETEX %s %u %%b", key,
- (uint32_t)ttl);
+ n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b EX %u", key,
+ (unsigned)(uint32_t)ttl);
}
@@ -374,11 +495,12 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
return;
}
- rep = redis_command(env, cachedb_env, cmdbuf, data, data_len);
+ rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
if(rep) {
verbose(VERB_ALGO, "redis_store set completed");
if(rep->type != REDIS_REPLY_STATUS &&
- rep->type != REDIS_REPLY_ERROR) {
+ rep->type != REDIS_REPLY_ERROR &&
+ rep->type != REDIS_REPLY_INTEGER) {
log_err("redis_store: unexpected type of reply (%d)",
rep->type);
}