diff options
Diffstat (limited to 'contrib/unbound/cachedb/redis.c')
| -rw-r--r-- | contrib/unbound/cachedb/redis.c | 144 | 
1 files changed, 130 insertions, 14 deletions
| diff --git a/contrib/unbound/cachedb/redis.c b/contrib/unbound/cachedb/redis.c index 3dfa95859eb8..9383f1c8576c 100644 --- a/contrib/unbound/cachedb/redis.c +++ b/contrib/unbound/cachedb/redis.c @@ -46,6 +46,8 @@  #include "cachedb/cachedb.h"  #include "util/alloc.h"  #include "util/config_file.h" +#include "util/locks.h" +#include "util/timeval_func.h"  #include "sldns/sbuffer.h"  #ifdef USE_REDIS @@ -75,6 +77,18 @@ struct redis_moddata {  	/* timeout for connection setup */  	struct timeval connect_timeout;  	struct timeval replica_connect_timeout; +	/* the reconnect interval time. */ +	struct timeval reconnect_interval; +	struct timeval replica_reconnect_interval; +	/* reconnect attempts, 0 if connected, counts up failed reconnects. */ +	int reconnect_attempts; +	int replica_reconnect_attempts; +	/* Lock on reconnect_wait time. */ +	lock_basic_type wait_lock; +	lock_basic_type replica_wait_lock; +	/* reconnect wait time, wait until it has passed before reconnect. */ +	struct timeval reconnect_wait; +	struct timeval replica_reconnect_wait;  	/* the redis logical database to use */  	int logical_db;  	int replica_logical_db; @@ -82,6 +96,10 @@ struct redis_moddata {  	int set_with_ex_available;  }; +/** The limit on the number of redis connect attempts. After failure if + * the number is exceeded, the reconnects are throttled by the wait time. */ +#define REDIS_RECONNECT_ATTEMPT_LIMIT 3 +  static redisReply* redis_command(struct module_env*, struct cachedb_env*,  	const char*, const uint8_t*, size_t, int); @@ -105,6 +123,8 @@ moddata_clean(struct redis_moddata** moddata) {  		}  		free((*moddata)->replica_ctxs);  	} +	lock_basic_destroy(&(*moddata)->wait_lock); +	lock_basic_destroy(&(*moddata)->replica_wait_lock);  	free(*moddata);  	*moddata = NULL;  } @@ -113,10 +133,39 @@ static redisContext*  redis_connect(const char* host, int port, const char* path,  	const char* password, int logical_db,  	const struct timeval connect_timeout, -	const struct timeval command_timeout) +	const struct timeval command_timeout, +	const struct timeval* reconnect_interval, +	int* reconnect_attempts, +	struct timeval* reconnect_wait, +	lock_basic_type* wait_lock, +	struct timeval* now_tv, +	const char* infostr)  { +	struct timeval now_val;  	redisContext* ctx; +	/* See if the redis server is down, and reconnect has to wait. */ +	if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) { +		/* Acquire lock to look at timeval, the integer has atomic +		 * integrity. */ +		struct timeval wait_tv; +		if(now_tv) { +			now_val = *now_tv; +		} else { +			if(gettimeofday(&now_val, NULL) < 0) +				log_err("redis: gettimeofday: %s", +					strerror(errno)); +		} +		lock_basic_lock(wait_lock); +		wait_tv = *reconnect_wait; +		lock_basic_unlock(wait_lock); +		if(timeval_smaller(&now_val, &wait_tv)) { +			verbose(VERB_ALGO, "redis %sdown, reconnect wait", +				infostr); +			return NULL; +		} +	} +  	if(path && path[0]!=0) {  		ctx = redisConnectUnixWithTimeout(path, connect_timeout);  	} else { @@ -126,18 +175,18 @@ redis_connect(const char* host, int port, const char* path,  		const char *errstr = "out of memory";  		if(ctx)  			errstr = ctx->errstr; -		log_err("failed to connect to redis server: %s", errstr); +		log_err("failed to connect to redis %sserver: %s", infostr, errstr);  		goto fail;  	}  	if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) { -		log_err("failed to set redis timeout, %s", ctx->errstr); +		log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr);  		goto fail;  	}  	if(password && password[0]!=0) {  		redisReply* rep;  		rep = redisCommand(ctx, "AUTH %s", password);  		if(!rep || rep->type == REDIS_REPLY_ERROR) { -			log_err("failed to authenticate with password"); +			log_err("failed to authenticate %swith password", infostr);  			freeReplyObject(rep);  			goto fail;  		} @@ -147,18 +196,20 @@ redis_connect(const char* host, int port, const char* path,  		redisReply* rep;  		rep = redisCommand(ctx, "SELECT %d", logical_db);  		if(!rep || rep->type == REDIS_REPLY_ERROR) { -			log_err("failed to set logical database (%d)", -				logical_db); +			log_err("failed %sto set logical database (%d)", +				infostr, logical_db);  			freeReplyObject(rep);  			goto fail;  		}  		freeReplyObject(rep);  	} +	*reconnect_attempts = 0;  	if(verbosity >= VERB_OPS) {  		char port_str[6+1];  		port_str[0] = ' ';  		(void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port); -		verbose(VERB_OPS, "Connection to Redis established (%s%s)", +		verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)", +			infostr,  			path&&path[0]!=0?path:host,  			path&&path[0]!=0?"":port_str);  	} @@ -167,6 +218,25 @@ redis_connect(const char* host, int port, const char* path,  fail:  	if(ctx)  		redisFree(ctx); +	(*reconnect_attempts)++; +	if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) { +		/* Wait for the reconnect interval before trying again. */ +		struct timeval tv; +		if(now_tv) { +			now_val = *now_tv; +		} else { +			if(gettimeofday(&now_val, NULL) < 0) +				log_err("redis: gettimeofday: %s", +					strerror(errno)); +		} +		tv = now_val; +		timeval_add(&tv, reconnect_interval); +		lock_basic_lock(wait_lock); +		*reconnect_wait = tv; +		lock_basic_unlock(wait_lock); +		verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d", +			infostr, (int)tv.tv_sec, (int)tv.tv_usec); +	}  	return NULL;  } @@ -191,6 +261,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)  		log_err("out of memory");  		goto fail;  	} +	lock_basic_init(&moddata->wait_lock); +	lock_protect(&moddata->wait_lock, &moddata->reconnect_wait, +		sizeof(moddata->reconnect_wait)); +	lock_basic_init(&moddata->replica_wait_lock); +	lock_protect(&moddata->replica_wait_lock, +		&moddata->replica_reconnect_wait, +		sizeof(moddata->replica_reconnect_wait));  	moddata->numctxs = env->cfg->num_threads;  	/* note: server_host and similar string configuration options are  	 * shallow references to configured strings; we don't have to free them @@ -219,6 +296,8 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)  	set_timeout(&moddata->replica_connect_timeout,  		env->cfg->redis_replica_timeout,  		env->cfg->redis_replica_connect_timeout); +	set_timeout(&moddata->reconnect_interval, 1000, 0); +	set_timeout(&moddata->replica_reconnect_interval, 1000, 0);  	moddata->logical_db = env->cfg->redis_logical_db;  	moddata->replica_logical_db = env->cfg->redis_replica_logical_db; @@ -245,7 +324,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)  			moddata->server_password,  			moddata->logical_db,  			moddata->connect_timeout, -			moddata->command_timeout); +			moddata->command_timeout, +			&moddata->reconnect_interval, +			&moddata->reconnect_attempts, +			&moddata->reconnect_wait, +			&moddata->wait_lock, +			env->now_tv, +			"");  		if(!ctx) {  			log_err("redis_init: failed to init redis "  				"(for thread %d)", i); @@ -263,7 +348,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)  				moddata->replica_server_password,  				moddata->replica_logical_db,  				moddata->replica_connect_timeout, -				moddata->replica_command_timeout); +				moddata->replica_command_timeout, +				&moddata->replica_reconnect_interval, +				&moddata->replica_reconnect_attempts, +				&moddata->replica_reconnect_wait, +				&moddata->replica_wait_lock, +				env->now_tv, +				"replica ");  			if(!ctx) {  				log_err("redis_init: failed to init redis "  					"replica (for thread %d)", i); @@ -301,7 +392,7 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)  set_with_ex_fail:  	log_err("redis_init: failure during redis_init, the "  		"redis-expire-records option requires the SET with EX command " -		"(redis >= 2.6.2)"); +		"(redis >= 2.6.12)");  	return 1;  fail:  	moddata_clean(&moddata); @@ -364,7 +455,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,  				d->replica_server_password,  				d->replica_logical_db,  				d->replica_connect_timeout, -				d->replica_command_timeout); +				d->replica_command_timeout, +				&d->replica_reconnect_interval, +				&d->replica_reconnect_attempts, +				&d->replica_reconnect_wait, +				&d->replica_wait_lock, +				env->now_tv, +				"replica ");  		} else {  			ctx = redis_connect(  				d->server_host, @@ -373,7 +470,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,  				d->server_password,  				d->logical_db,  				d->connect_timeout, -				d->command_timeout); +				d->command_timeout, +				&d->reconnect_interval, +				&d->reconnect_attempts, +				&d->reconnect_wait, +				&d->wait_lock, +				env->now_tv, +				"");  		}  		ctx_selector[env->alloc->thread_num] = ctx;  	} @@ -405,7 +508,14 @@ redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,  	char* key, struct sldns_buffer* result_buffer)  {  	redisReply* rep; -	char cmdbuf[4+(CACHEDB_HASHSIZE/8)*2+1]; /* "GET " + key */ +	/* Supported commands: +	 * - "GET " + key +	 */ +#define REDIS_LOOKUP_MAX_BUF_LEN			\ +	4				/* "GET " */	\ +	+(CACHEDB_HASHSIZE/8)*2		/* key hash */	\ +	+ 1				/* \0 */ +	char cmdbuf[REDIS_LOOKUP_MAX_BUF_LEN];  	int n;  	int ret = 0; @@ -465,7 +575,13 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env,  	 *   older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b"  	 * - "EXPIRE " + key + " 0"  	 */ -	char cmdbuf[6+(CACHEDB_HASHSIZE/8)*2+11+3+1]; +#define REDIS_STORE_MAX_BUF_LEN				\ +	7			/* "EXPIRE " */		\ +	+(CACHEDB_HASHSIZE/8)*2	/* key hash */		\ +	+ 7			/* " %b EX " */		\ +	+ 20			/* ttl (uint64_t) */	\ +	+ 1			/* \0 */ +	char cmdbuf[REDIS_STORE_MAX_BUF_LEN];  	if (!set_ttl) {  		verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len); | 
