在实现缓存排序功能之前,必须先明白这一功能的合理性。不妨思考一下,既然可以在中排序,为什么还要把排序功能放在缓存中实现呢?这里简单总结了两个原因:首先,排序会增加数据库的负载,难以支撑高并发的应用;其次,在缓存中排序不会遇到表锁定的问题。恰好提供了排序功能,使我们可以方便地实现缓存排序。
中用于实现排序功能的是SORT命令。该命令提供了多种参数,可以对列表,集合和有序集合进行排序。SORT命令格式如下:
- SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC | DESC] [ALPHA] [STORE destination]
SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC | DESC] [ALPHA] [STORE destination]BY参数用于指定排序字段,功能类似于SQL中的order by。对于列表和集合而言,仅按照它们的值进行排序往往没有实际意义。以函数Cache2Hash返回的集合为例(实际上返回的是集合键),该集合中存储的是一系列完整的哈希键,只按照这些键进行排序,结果无非是按照数字或字典顺序排列,其用处显然不大。这是因为真正存储行数据的是哈希结构本身,而非哈希键。假设集合键为"resultset.hash:123456",集合中每个哈希键对应的哈希结构中都有一个名为“timestamp”的字段,现在要把集合中的所有哈希键按照timestamp字段进行排序,这时,只需执行以下命令:
- SORT resultset.hash:123456 BY *->timestamp
SORT resultset.hash:123456 BY *->timestamp从上例可以看出,BY的真正威力在于它可以让SORT命令按照一个指定的外部键的外部字段进行排序。SORT用集合resultset.hash:123456中的每个值(即每个哈希键)替换BY参数后的第一个“*”,并依据“->”后面给出的字段获取其值,最后根据这些字段值对哈希键进行排序。
LIMIT参数用于限制排序以后返回元素的数量,功能类似于SQL中的limit。该参数接受另外两个参数,即offset和count,LIMIT offset count表示跳过前offset个元素,返回之后的连续count个元素。可见,LIMIT参数可以用于实现分页功能。
GET参数用于返回指定的字段值。以集合resultset.hash:123456为例,使用BY参数对集合中的所有哈希键按照哈希结构中的timestamp字段排序后,SORT命令返回所有排序之后的哈希键。如果某个请求需要不是键而是某些字段值,这时就要使用GET参数,使SORT命令返回指定字段值。假设除timestamp字段以外,集合中每个哈希键对应的哈希结构中还有一个名为“id”的字段,通过以下命令可以使SORT返回按照timestamp排序以后的每个哈希键对应的哈希结构中的timestamp和id值:
- SORT resultset.hash:123456 BY *->timestamp GET *->timestamp GET *->id
SORT resultset.hash:123456 BY *->timestamp GET *->timestamp GET *->idSORT用集合resultset.hash:123456中的每个值(即每个哈希键)替换GET参数之后的第一个“*”,并将其作为返回值。值得注意的是,利用GET #能够得到集合中的哈希键本身。
ASC和DESC参数用于指定排序顺序(默认为ASC,即从低到高),ALPHA参数用于按照字典顺序排列非数字元素。
STORE参数用于将SORT命令的返回值,即排序结果存入一个指定的列表。加上STORE参数后,SORT命令的返回值就变为排序结果的个数。
下面的代码实现了按照哈希的某个字段对集合中的哈希键排序,并将结果存入列表的过程:
- // 该函数对集合中的所有HASH键进行排序,排序依据是HASH键所对应的HASH中的某个字段,
- // 排序结果被存入一个LIST结构,LIST键应当包含结果集标识符和排序字段标识符,
- // 形如“sorted:123456:1234”
- string SortHash(sql::Connection *mysql_connection,
- redisContext *redis_connection,
- const string &resultset_id,
- const string &sort_field,
- int offset, int count, int order, int ttl) {
- // 只考虑存储HASH键的SET
- string redis_row_set_key = "resultset.hash:" + resultset_id;
- redisReply *reply;
- // 检测SET是否存在
- reply = static_cast<redisReply*>(redisCommand(redis_connection,
- "EXISTS %s",
- redis_row_set_key.c_str()));
- if (reply->integer == 0) {
- freeReplyObject(reply);
- throw runtime_error("FAILURE - no resultsets");
- } else {
- freeReplyObject(reply);
- }
- string field_md5 = md5(sort_field); // 利用MD5排除排序字段中空格造成的影响
- // 将排序结果存入该LIST
- string redis_sorted_list_key = "sorted:" + resultset_id + ":" + field_md5;
- string by("*->" + sort_field); //确定排序字段
- string ord = (order == 1) ? "ASC" : "DESC"; //order==1时按照升序排列;否则为降序
- stringstream ofsstream, cntstream;
- ofsstream << offset;
- cntstream << count;
- // 执行排序命令,并把排序结果存入LIST
- reply = static_cast<redisReply*>(redisCommand(
- redis_connection,
- "SORT %s BY %s LIMIT %s %s GET %s ALPHA STORE %s",
- redis_row_set_key.c_str(),
- by.c_str(),
- ofsstream.str().c_str(),
- cntstream.str().c_str(),
- "#",
- redis_sorted_list_key.c_str()));
- freeReplyObject(reply);
- stringstream ttlstream;
- ttlstream << ttl;
- // 设置LIST的过期时间
- reply = static_cast<redisReply*>(redisCommand(redis_connection,
- "EXPIRE %s %s",
- redis_sorted_list_key.c_str(),
- ttlstream.str().c_str()));
- freeReplyObject(reply);
- return redis_sorted_list_key; // 返回LIST键,以便于其他函数获取该LIST中的内容
// 该函数对集合中的所有HASH键进行排序,排序依据是HASH键所对应的HASH中的某个字段,// 排序结果被存入一个LIST结构,LIST键应当包含结果集标识符和排序字段标识符,// 形如“sorted:123456:1234”string SortHash(sql::Connection *mysql_connection, redisContext *redis_connection, const string &resultset_id, const string &sort_field, int offset, int count, int order, int ttl) { // 只考虑存储HASH键的SET string redis_row_set_key = "resultset.hash:" + resultset_id; redisReply *reply; // 检测SET是否存在 reply = static_cast显然,对结果集中的哈希键进行排序要比对字符串键排序更加直观和方便。借助于排序函数,可以方便地实现在Redis中查询排序后的结果集,代码如下:(redisCommand(redis_connection, "EXISTS %s", redis_row_set_key.c_str())); if (reply->integer == 0) { freeReplyObject(reply); throw runtime_error("FAILURE - no resultsets"); } else { freeReplyObject(reply); } string field_md5 = md5(sort_field); // 利用MD5排除排序字段中空格造成的影响 // 将排序结果存入该LIST string redis_sorted_list_key = "sorted:" + resultset_id + ":" + field_md5; string by("*->" + sort_field); //确定排序字段 string ord = (order == 1) ? "ASC" : "DESC"; //order==1时按照升序排列;否则为降序 stringstream ofsstream, cntstream; ofsstream << offset; cntstream << count; // 执行排序命令,并把排序结果存入LIST reply = static_cast (redisCommand( redis_connection, "SORT %s BY %s LIMIT %s %s GET %s ALPHA STORE %s", redis_row_set_key.c_str(), by.c_str(), ofsstream.str().c_str(), cntstream.str().c_str(), "#", redis_sorted_list_key.c_str())); freeReplyObject(reply); stringstream ttlstream; ttlstream << ttl; // 设置LIST的过期时间 reply = static_cast (redisCommand(redis_connection, "EXPIRE %s %s", redis_sorted_list_key.c_str(), ttlstream.str().c_str())); freeReplyObject(reply); return redis_sorted_list_key; // 返回LIST键,以便于其他函数获取该LIST中的内容
- // 该函数根据sql语句和排序参数,在Redis中查询相应的结果集并进行排序,最后返回
- // 排序之后的HASH键
- vector<string> GetSortedCache(sql::Connection *mysql_connection,
- redisContext *redis_connection,
- const string &sql, const string &sort_field,
- int offset, int count, int order, int ttl) {
- vector<string> redis_row_key_vector;
- redisReply *reply;
- string resultset_id = md5(sql); // 结果集标识符
- string field_md5 = md5(sort_field); // 排序字段标识符
- // 尝试获取LIST中的所有HASH键
- string redis_sorted_list_key = "sorted:" + resultset_id + ":" + field_md5;
- // 尝试获取LIST中的所有HASH键
- reply = static_cast<redisReply*>(redisCommand(redis_connection,
- "LRANGE %s %s %s",
- redis_sorted_list_key.c_str(),
- "0",
- "-1"));
- if (reply->type == REDIS_REPLY_ARRAY) {
- // 如果LIST不存在,调用Cache2Hash函数从Mysql中拉取数据到Redis,然后调用SortHash函数
- // 对结果集进行排序并将排序后的HASH键存入LIST
- if (reply->elements == 0) {
- freeReplyObject(reply);
- sql::Statement *stmt = mysql_connection->createStatement();
- sql::ResultSet *resultset = stmt->executeQuery(sql);
- Cache2Hash(mysql_connection, redis_connection, resultset,
- resultset_id, ttl);
- redis_sorted_list_key = SortHash(mysql_connection, redis_connection,
- resultset_id, sort_field, offset,
- count, order, ttl);
- // 再次尝试获取LIST中的所有HASH键
- reply = static_cast<redisReply*>(redisCommand(
- redis_connection,
- "LRANGE %s %s %s",
- redis_sorted_list_key.c_str(),
- "0",
- "-1"));
- delete resultset;
- delete stmt;
- }
- // 将LIST中的所有HASH键存入redis_row_key_vector中
- string redis_row_key;
- for (int i = 0; i < reply->elements; ++i) {
- redis_row_key = reply->element[i]->str;
- redis_row_key_vector.push_back(redis_row_key);
- }
- freeReplyObject(reply);
- } else {
- freeReplyObject(reply);
- throw runtime_error("FAILURE - LRANGE error");
- }
- return redis_row_key_vector;
- }
// 该函数根据sql语句和排序参数,在Redis中查询相应的结果集并进行排序,最后返回// 排序之后的HASH键vector这样,在Redis中对结果集进行简单排序操作的功能就实现了。GetSortedCache(sql::Connection *mysql_connection, redisContext *redis_connection, const string &sql, const string &sort_field, int offset, int count, int order, int ttl) { vector redis_row_key_vector; redisReply *reply; string resultset_id = md5(sql); // 结果集标识符 string field_md5 = md5(sort_field); // 排序字段标识符 // 尝试获取LIST中的所有HASH键 string redis_sorted_list_key = "sorted:" + resultset_id + ":" + field_md5; // 尝试获取LIST中的所有HASH键 reply = static_cast (redisCommand(redis_connection, "LRANGE %s %s %s", redis_sorted_list_key.c_str(), "0", "-1")); if (reply->type == REDIS_REPLY_ARRAY) { // 如果LIST不存在,调用Cache2Hash函数从Mysql中拉取数据到Redis,然后调用SortHash函数 // 对结果集进行排序并将排序后的HASH键存入LIST if (reply->elements == 0) { freeReplyObject(reply); sql::Statement *stmt = mysql_connection->createStatement(); sql::ResultSet *resultset = stmt->executeQuery(sql); Cache2Hash(mysql_connection, redis_connection, resultset, resultset_id, ttl); redis_sorted_list_key = SortHash(mysql_connection, redis_connection, resultset_id, sort_field, offset, count, order, ttl); // 再次尝试获取LIST中的所有HASH键 reply = static_cast (redisCommand( redis_connection, "LRANGE %s %s %s", redis_sorted_list_key.c_str(), "0", "-1")); delete resultset; delete stmt; } // 将LIST中的所有HASH键存入redis_row_key_vector中 string redis_row_key; for (int i = 0; i < reply->elements; ++i) { redis_row_key = reply->element[i]->str; redis_row_key_vector.push_back(redis_row_key); } freeReplyObject(reply); } else { freeReplyObject(reply); throw runtime_error("FAILURE - LRANGE error"); } return redis_row_key_vector;}