Lettuce is a scalable thread-safe Redis client providing synchronous, asynchronous and reactive APIs. Multiple threads may share one connection if they avoid blocking and transactional operations such as BLPOP and MULTI/EXEC. Multiple connections are efficiently managed by the excellent netty NIO framework.
Redis客户端与服务端通过TCP协议连接,而TCP协议本身会保证数据传输的顺序性。
Lettuce is designed to operate in a pipelining way. Multiple threads can share one connection. While one Thread may process one command, the other Thread can send a new command... Lettuce is built on top of netty decouple reading from writing and to provide thread-safe connections. The result is, that reading and writing can be handled by different threads and commands are written and read independent of each other but in sequence.
CommandHandler写操作
// CommandHandler写操作相关核心代码,其中部分代码省略
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
private final ArrayDeque<RedisCommand , ?, > stack = new ArrayDeque<>();
// 覆写ChannelDuplexHandler的write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 若msg为单个Redis指令
if (msg instanceof RedisCommand) {
writeSingleCommand(ctx, (RedisCommand , ?, ) msg, promise);
return;
}
}
private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand , ?, command, ChannelPromise promise) {
// 向promise注册事件监听器
addToStack(command, promise);
// 将command传递给下个ChannelHandler(此处即CommandEncoder)处理
ctx.write(command, promise);
}
private void addToStack(RedisCommand , ?, command, ChannelPromise promise) {
try {
RedisCommand , ?, redisCommand = potentiallyWrapLatencyCommand(command);
if (promise.isVoid()) {
stack.add(redisCommand);
} else {
// 正常情况下,分支流程会走到这里,向promise注册监听器
promise.addListener(AddToStack.newInstance(stack, redisCommand));
}
} catch (Exception e) {
command.completeExceptionally(e);
throw e;
}
}
// 事件监听器
static class AddToStack implements GenericFutureListener<Future<Void>> {
private ArrayDeque<Object> stack;
private RedisCommand , ?, command;
static AddToStack newInstance(ArrayDeque > stack, RedisCommand , ?, command) {
AddToStack entry = RECYCLER.get();
entry.stack = (ArrayDeque<Object>) stack;
entry.command = command;
return entry;
}
// 当Redis指令被成功写入套接字缓冲后,该方法会被回调
@Override
public void operationComplete(Future<Void> future) {
try {
if (future.isSuccess()) {
// 将Redis指令加入队列末端
stack.add(command);
}
} finally {
recycle();
}
}
}
}
CommandHandler读操作
// CommandHandler读操作相关核心代码,其中部分代码省略
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();
private ByteBuf buffer;
// 覆写ChannelDuplexHandler父类ChannelInboundHandlerAdapter的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
try {
// 将input数据写入buffer
buffer.writeBytes(input);
// 数据解析
decode(ctx, buffer);
} finally {
input.release();
}
}
// 响应解析
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
while (canDecode(buffer)) {
if (isPushDecode(buffer)) {
// Pub/Sub模式下的Redis消息处理,代码省略
} else {
// 常规Redis响应处理
// 以FIFO的方式从队列头部读取(但不取出)Redis指令
RedisCommand<?, ?, ?> command = stack.peek();
try {
// 响应解析
if (!decode(ctx, buffer, command)) {
decodeBufferPolicy.afterPartialDecode(buffer);
return;
}
} catch (Exception e) {
// 异常处理代码省略
}
if (isProtectedMode(command)) {
// 省略
} else {
if (canComplete(command)) {
// 取出Redis指令
stack.poll();
try {
// 完成指令
complete(command);
} catch (Exception e) {
// 异常处理代码省略
}
}
}
// buffer数据清理
afterDecode(ctx, command);
}
}
}
}
从Redis服务端的角度比较分析
Pipelining is not just a way to reduce the latency cost associated with the round trip time, it actually greatly improves the number of operations you can perform per second in a given Redis server. This is the result of the fact that, without using pipelining, serving each command is very cheap from the point of view of accessing the data structures and producing the reply, but it is very costly from the point of view of doing the socket I/O. This involves calling the read() and write() syscall, that means going from user land to kernel land. The context switch is a huge speed penalty.
Benchmarking及综合比较分析
Jedis连接池 - 50连接,200并发线程
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/**
* Signals a waiting take.
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 唤醒在notEmpty条件上等待的EventExecutor线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// * 因篇幅所限,部分代码省略,且Lettuce和Jedis在各模式下的测试代码在此合并展示。
public class RedisClientBenchmark {
// 属性字段代码省略
// benchmark启动前执行
public void setup() {
RedisURI redisURI = RedisURI.create(host, port);
// Lettuce单连接模式
lettuceClient4SingleConnection = RedisClient.create(redisURI);
lettuceClient4SingleConnection.setOptions(ClientOptions.builder()
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(3)).build()).build());
lettuceSingleConnection = lettuceClient4SingleConnection.connect(ByteArrayCodec.INSTANCE);
// Lettuce连接池模式
lettuceClient4ConnectionPool = RedisClient.create(redisURI);
lettuceClient4ConnectionPool.setOptions(ClientOptions.builder()
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(3)).build()).build());
GenericObjectPoolConfig lettucePoolConfig = new GenericObjectPoolConfig();
lettucePoolConfig.setMaxTotal(200);
lettucePoolConfig.setMaxIdle(200);
lettuceConnectionPool = ConnectionPoolSupport.createGenericObjectPool(
() -> lettuceClient4ConnectionPool.connect(ByteArrayCodec.INSTANCE), lettucePoolConfig);
// Lettuce多连接模式
lettuceClient4MultipleConnection = RedisClient.create(redisURI);
lettuceClient4MultipleConnection.setOptions(ClientOptions.builder()
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(3)).build()).build());
processors = Runtime.getRuntime().availableProcessors(); // 笔者机器上为8个
lettuceMultipleConnection = new ArrayList<>(processors);
for (int i = 0; i < processors; ++i) {
lettuceMultipleConnection.add(lettuceClient4MultipleConnection.connect(ByteArrayCodec.INSTANCE));
}
// Jedis连接池模式
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(50);
jedisPoolConfig.setMaxIdle(50);
jedisConnectionPool = new JedisPool(jedisPoolConfig, host, port, 3000);
}
// benchmark结束后执行
public void tearDown() {
// 主要执行关闭连接或连接池等操作,具体代码省略
}
// Lettuce单连接模式执行set方法性能测试
// 200并发线程
public void lettuceSet4SingleConnection() {
lettuceSingleConnection.sync().set("lettu".getBytes(), "lettu".getBytes());
}
// Lettuce连接池模式执行set方法性能测试
public void lettuceSet4ConnectionPool() throws Exception {
try (StatefulRedisConnection<byte[], byte[]> lettuceConnection = lettuceConnectionPool.borrowObject()) {
lettuceConnection.sync().set("lettu".getBytes(), "lettu".getBytes());
}
}
// Lettuce多连接模式执行set方法性能测试
public void lettuceSet4MultipleConnection() {
ThreadLocalRandom random = ThreadLocalRandom.current();
lettuceMultipleConnection.get(random.nextInt(0, processors))
.sync().set("lettu".getBytes(), "lettu".getBytes());
}
// Jedis连接池模式执行set方法性能测试
public void jedisSet4ConnectionPool() {
try (Jedis jedisConnection = jedisConnectionPool.getResource()) {
jedisConnection.set("jedis", "jedis");
}
}
// 通过执行main方法启动benchmark
public static void main(String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}
}
1、参见《Spring Boot Reference Documentation》:Spring Boot offers basic auto-configuration for the Lettuce and Jedis client libraries and the abstractions on top of them provided by Spring Data Redis... By default, it uses Lettuce
2、参见《Lettuce Wiki - About Lettuce》
3、参见《Redis Clients Handling》:The client socket is put in the non-blocking state since Redis uses multiplexing and non-blocking I/O... Once new data is read from a client, all the queries contained in the current buffers are processed sequentially.
4、参见《Introduction to HTTP/2》:With HTTP/1.x, if the client wants to make multiple parallel requests to improve performance, then multiple TCP connections must be used. This behavior is a direct consequence of the HTTP/1.x delivery model, which ensures that only one response can be delivered at a time per connection... The new binary framing layer in HTTP/2 removes these limitations, and enables full request and response multiplexing, by allowing the client and server to break down an HTTP message into independent frames, interleave them, and then reassemble them on the other end.
5、详见《Using pipelining to speedup Redis queries》
6、参见《Java并发编程实战》:在多线程程序中,当线程调度器临时挂起活跃线程并转而运行另一个线程时,就会频繁地出现上下文切换操作(Context Switch),这种操作将带来极大的开销:保存和恢复执行上下文,丢失局部性,并且CPU时间将更多地花在线程调度而不是线程运行上。
7、参见《Netty in Action》:Overall, this model (non-blocking I/O) provides much better resource management than the blocking I/O model: Many connections can be handled with fewer threads, and thus with far less overhead due to memory management and context-switching.
8、参见《深入理解Java虚拟机》:互斥同步对性能最大的影响是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给Java虚拟机的并发性能带来了很大的压力。同时,虚拟机的开发团队也注意到在许多应用上,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。……如果物理机器有一个以上的处理器或者处理器核心,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程“稍等一会”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只须让线程执行一个忙循环(自旋),这项技术就是所谓的自旋锁。
9、参见《Go语言学习指南》:在goroutine之间切换比在线程之间切换更快,因为goroutine之间的切换完全发生在进程内部(即用户态),避免了操作系统(相对)缓慢的调用……这些优势使得Go程序可以同时生成数百、数千甚至数万个goroutine。
10、参见《Using pipelining to speedup Redis queries》:When pipelining is used, many commands are usually read with a single read() system call, and multiple replies are delivered with a single write() system call. Consequently, the number of total queries performed per second initially increases almost linearly with longer pipelines, and eventually reaches 10 times the baseline obtained without pipelining.
11、参见《On Sockets and System Calls Minimizing Context Switches for the Socket API》:To avoid needless system entries, system calls like select, poll, and their better optimized variants like epoll or kqueue let the application ask the system whether and when it is possible to carry out a set of operations successfully. Although select greatly reduces the number of system calls, the application needs to use it first to query the system which of the potential writes and reads the system will accept. This still leaves many system calls which cross the protection boundary between the applications and the system.
12、参见《What is io_uring?》:The very name io_uring comes from the fact that the interfaces uses ring buffers as the main interface for kernel-user space communication. While there are system calls involved, they are kept to a minimum......avoiding system calls as much as possible is a fantastic idea in high-performance applications indeed.
13、参见《Netty/Incubator/Transport/Native/io_uring 0.0.1.Final released》一文
14、参见《13 Years Later – Does Redis Need a New Architecture?》:It’s even possible that some of these concepts may make their way into Redis in the future (like io_uring which we have already started looking into, more modern dictionaries, more tactical use of threads, etc.)
15、参见《GitHub Jedis WiKi》:You shouldn't use the same instance from different threads because you'll have strange errors... A single Jedis instance is not threadsafe! To avoid these problems, you should use JedisPool, which is a threadsafe pool of network connections.
16、详见《Redis 漫谈 —— 分布式布隆过滤及内存使用问题分析》
17、详见《Java并发编程实战》5.3.2 串行线程封闭章节
18、参见《Lettuce Wiki - Connection Pooling》:Lettuce is thread-safe by design which is sufficient for most cases. All Redis user operations are executed single-threaded. Using multiple connections does not impact the performance of an application in a positive way.
19、两者都基于连接池与Redis交互,且活跃线程数均为8个 —— 对Lettuce来说为8个EventLoop,对Jedis来说为8个连接数。
20、参见《The java.util.concurrent Synchronizer Framework》:...in the absence of cancellation, each component of acquire and release is a constant-time O(1) operation, amortized across threads, disregarding any OS thread scheduling occuring within park......it appears likely that further tuning of blocking (park/unpark) support to reduce context switching and related overhead could provide small but noticeable improvements in this framework.
21、参见《Java Concurrency Utility with JCTools》:The library offers a number of queues to use in a multi-threaded environment, i.e. one or more threads write to a queue and one or more threads read from it in a thread-safe lock-free manner.
22、详见《Possibility of optimizing the command expiring mechanism》
《Spring Boot Reference Documentation》:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/
《Netty in Action》:https://book.douban.com/subject/24700704/
《Lettuce Wiki - About Lettuce》:https://github.com/lettuce-io/lettuce-core/wiki/About-Lettuce
《Redis Clients Handling》:https://redis.io/topics/clients
《Introduction to HTTP/2》:https://developers.google.com/web/fundamentals/performance/http2/#streams_messages_and_frames
《Using pipelining to speedup Redis queries》:https://redis.io/topics/pipelining
《Lettuce Wiki - Pipelining and command flushing》:https://github.com/lettuce-io/lettuce-core/wiki/Pipelining-and-command-flushing
《深入理解计算机系统》:https://book.douban.com/subject/27000879/
《Java并发编程实战》:https://book.douban.com/subject/10484692/
《深入理解Java虚拟机》:https://book.douban.com/subject/34907497/
《Go语言学习指南》:https://book.douban.com/subject/35902219/
《On Sockets and System Calls Minimizing Context Switches for the Socket API》:https://www.usenix.org/system/files/conference/trios14/trios14-paper-hruby.pdf
《What is io_uring?》:https://unixism.net/loti/what_is_io_uring.html#what-is-io-uring
《Netty/Incubator/Transport/Native/io_uring 0.0.1.Final released》:https://netty.io/news/2020/11/16/io_uring-0-0-1-Final.html
《13 Years Later – Does Redis Need a New Architecture?》:https://redis.com/blog/redis-architecture-13-years-later/
《GitHub Jedis WiKi》:https://github.com/redis/jedis/wiki/Getting-started
《Redis 漫谈 —— 分布式布隆过滤及内存使用问题分析》:https://blog.csdn.net/yangty8927/article/details/92701330
《性能之巅》:https://book.douban.com/subject/35934902/
《Java性能权威指南》:https://book.douban.com/subject/35867531/
《Lettuce Wiki - Connection Pooling》:https://github.com/lettuce-io/lettuce-core/wiki/Connection-Pooling
《The java.util.concurrent Synchronizer Framework》:https://gee.cs.oswego.edu/dl/papers/aqs.pdf
《Java Concurrency Utility with JCTools》:https://www.baeldung.com/java-concurrency-jc-tools
《Possibility of optimizing the command expiring mechanism》:https://github.com/lettuce-io/lettuce-core/discussions/2214
《Spring Data Redis》:https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/
往期推荐