Redis::Distributed
is the Ruby implementation of Redis client side partitioning in Ruby.
Partitioning (also known as sharding) is the process of taking the dataset that would originally be held in a single Redis server and splitting it over multiple Redis servers. Partitioning allows you to distribute writes/reads over a set of nodes – horizontal scaling.
Client side partitioning requires additional functionality on the client side whenever commands are made. Most commands requires two steps:
- Finding the node that may hold the data;
- Sending a command to that node;
Below is a pseudo implementation for SET:
class Redis
class Distributed
def set(key)
node = node_for(key).
node.set(key, value, options)
end
end
end
r = Redis::Distributed.new(["redis://host1:6379", "redis://host2:6379"])
r.set("key")
The majority of Redis commands interacting with a single key work as described above in Redis::Distributed
. Examples: SET, GET, SETEX.
Some Redis commands are not implemented in Redis::Distributed
because they interact with multiple keys and are supposed to be atomic. Since the data is partitioned, there is no guarantee that all the key arguments will be stored in the same node, and invoking different nodes would require the operation to not be atomic. MSET is an example of a not implemented command in Redis::Distributed
due to this reason.:
MSET is atomic, so all given keys are set at once. It is not possible for clients to see that some of the keys were updated while others are unchanged.
Redis docs
All commands that raise theCannotDistribute
exception fall under this scenario.
class Redis
class Distributed
# Set multiple keys to multiple values.
def mset(*args)
raise CannotDistribute, :mset
end
end
end
Whenever atomicity is not a concern, commands involving multiple keys, make requests to all nodes for the given keys.
class Redis
class Distributed
# Delete a key.
def del(*args)
keys_per_node = args.group_by { |key| node_for(key) }
keys_per_node.inject(0) do |sum, (node, keys)|
sum + node.del(*keys)
end
end
end
end
If you don’t need atomicity guarantees for the commands that raise CannotDistribute
you may roll your own implementation with a similar approach as outlined above.
Some Redis commands without keys as arguments are applied to all nodes. PING is an example of that:
class Redis
class Distributed
# Ping the server.
def ping
on_each_node :ping
end
end
end
Some multi-key commands are only run if all keys are meant to be stored in the same node.
class Redis
class Distributed
# Rename a key.
def rename(old_name, new_name)
ensure_same_node(:rename, [old_name, new_name]) do |node|
node.rename(old_name, new_name)
end
end
end
end
A few commands are not implemented.
class Redis
class Distributed
# Listen for messages published to channels matching the given patterns.
def psubscribe(*channels, &block)
raise NotImplementedError
end
end
end
If you are considering using Redis::Distributed
I’d suggest looking at the source code to understand if the commands you intend to run are supported.
An important consideration in partitioning is the algorithm that is used to determine in which node each key resides. A naive implementation of a partitioning algorithm may involve hashing the key and applying the modulo function on the number of nodes (n) – partition = hash(key) % n
The problem with this naive implementation is that when a node is added/removed from the cluster the majority of keys need to be moved to a different node. That situation is undesirable if the cluster of Redis nodes is being used as a cache. We want to avoid a situation where the cache is cold and lots of computationally operations need to be run in the app servers and databases. To mitigate against that, Redis client side partitioning uses a consistent hashing algorithm. In consistent hashing on average only K/N of the keys need to change its location, where N is the number of nodes and K is the number of total keys.
Conceptually, consistent hashing works by mapping all possible hashes of keys into a point in a ring/circle. The cluster’s nodes are also mapped to points along the ring. In order to find out to which node a key belongs we take the following steps:
- Hash the key
- Place the hash in the ring
- From the position of the hash in the ring scan clockwise on the ring until a node is found.
The hashing algorithm is consistent because the hashes are always mapped to the same point in the ring and adding/removing nodes only affects a reduced amount of keys.
Redis::Distributed
uses a custom implementation of consistent hashing. The functionality around interacting with nodes is implemented through the HashRing class. HashRing stores a collection of (normal) Redis client objects and returns the appropriate Redis client object for the affected key. You may pass your own implementation of consistent hashing to the constructor of Redis::Distributed
.
class Redis
class Distributed
def initialize(node_configs, options = {})
# a custom consistent hashing implementation can be passed
# HashRing is the default consistent hashing implementation
@ring = options[:ring] || HashRing.new
# node_configs are the configuration meant to be passed to the normal Redis client. like the URL
@node_configs = node_configs.dup
node_configs.each { |node_config| add_node(node_config) }
end
def add_node(options)
options = { :url => options } if options.is_a?(String)
options = @default_options.merge(options)
# an regular Redis client object with the configuration is passed to the hash ring
@ring.add_node Redis.new( options )
end
def node_for(key)
# the hashring gives us the node client we need
@ring.get_node(key_tag(key.to_s) || key.to_s)
end
end
end
Redis::Distributed
does not natively mitigate against issues that arise when a node ceases to be available. That is something you will have to take care by ensuring that each of your nodes is part of a primary/replica group which handles failover automatically. For example in AWS, that can be achieved with a Redis (cluster mode disabled) Cluster for each node that is part of the “hash ring”.
Redis::Distributed
also does not natively re-distribute keys between nodes when there are changes to the cluster. This works acceptably whenever Redis is used as a cache because the lack of a key is “just” a cache-miss which eventually leads in the data being copied to the cache. Whenever Redis is used as a “regular” datastore you may have to manually redistribute keys between nodes during cluster changes.
But what about Twemproxy? Twemproxy is an alternative partitioning solution developed at Twitter. It works by putting a proxy server between the Redis client and the Redis servers. The Redis client sends Redis commands to the proxy server and the proxy server relays the command to the Redis server. The client does not even know about partitioning, the complexity is moved to the proxy server.
But what about Redis Cluster? Redis Cluster is an alternative partitioning solution which is a hybrid between client side partitioning and query routing. The clients are still responsible for forwarding the request to the correct node. Additionally the Redis nodes are aware of the cluster configuration and are able to tell the client in which node the data is. Redis Cluster does not use consistent hashing. Redis Cluster has built-in measures to redistribute keys between nodes when the cluster configuration changes. Redis Cluster is the recommended way to get automatic sharding and high availability. You may consider using Redis::Distributed
in cases where you can’t use Redis Cluster.
The Redis documentation on partitioning is an awesome resource to learn more about this topic.
I can send you my posts straight to your e-mail inbox if you sign up for my newsletter.