code show: 网络访问超时优化

在这篇blog中,我们将来探讨一个优化的案例,这个案例的背景为:在网络通信中,通常会有需求是从A发送了消息给B,然后同步或异步等待B的响应,但不可能无限期的等下去,因此会需要一个超时机制,通常在基于NIO这样的机制中,发送和接收会是异步方式,通常实现时会采用类似如下的方法,代码简单示例如下。

在这篇blog中,我们将来探讨一个优化的案例,这个案例的背景为:在网络通信中,通常会有需求是从A发送了消息给B,然后同步或异步等待B的响应,但不可能无限期的等下去,因此会需要一个超时机制,通常在基于NIO这样的机制中,发送和接收会是异步方式,通常实现时会采用类似如下的方法,代码简单示例如下:

invoke(){

send(request);

}

onMessageReceived(){

// 接收到响应后的处理

}

在不需要超时的情况下,这样就OK了,在有超时的情况下,则需要启动一个线程来扫描所有连接上(连接太多,所以不太可能每个连接就启一个线程来扫)的所有请求(每个请求的超时时间可能都不一样,有些是1s、有些是3s),这里就出现了一个问题,到底是多久扫描一次呢?简单的实现的话就可以是固定时间间隔扫描一次,这样扫描就显得有些傻了,因为其实很多请求根本就还没到超时时间,而随着连接数、请求数的增加,这里的效率就成问题了,高级点的话就演变为将最近超时的放到最顶端,类似Timer或DelayedQueue的实现,但这样的方式都存在一个问题,就是由于需要排序,所以在往queue里插入和删除对象的时候都需要加锁,这无疑会产生很大的影响,so无法接受,期望能做到的是:

1、单线程扫描请求是否超时;

2、在加入需要扫描的请求时无需加锁;

3、超时扫描等到达了最近有超时的请求后才启动,否则沉默;

4、当请求响应已回来后,可删除需要扫描的相应请求,避免在超时时间段内堆积太多扫描的请求,消耗内存。

我们之前想到的一个方法为

invoke(){

addToScanQueue(request.getId(),request.getTimeout());

send(request);

}

addToScanQueue{

// 判断是否有对应request.getTimeout的queue,如果没有则创建,并往queue里塞入这次需要扫描的请求的id,超时时间,塞入后检查其超时时间和扫描线程的睡眠时间,如超时时间小于睡眠时间,则唤醒扫描线程

}

扫描线程{

// 扫描所有的queue,并取出第一个对象,看看有没有超时,直到取到一个没超时的,比较所有queue中没超时的,看谁时间短,以短的为下次触发扫描的时间点

}

上面这个方法有效的借助了request.getTimeout种类不多的场景以及FIFO的特性,基本上可以做到插入无锁,而且扫描基本上可以等到有需要扫描的才扫描,但有个问题…就是已经有了响应的请求怎么从这里删除呢,一方面queue不好删,另一方面要删多数就要加锁了,如果转为用mark机制的话,由于大部分请求其实都不会超时的,这里会堆积很多的对象,消耗内存,纠结呀…

有同学有兴趣尝试下改造上面的方式吗,既然是code show,感兴趣的同学可直接在回复中贴上代码,:)

《code show: 网络访问超时优化》有14个想法

  1. 一般来说超时都是客户端配置的,对于服务端无论采用oio、nio还是以后的aio,其实客户端是不应该有区别和差异的。
    对于客户端设置超时,常理来说,由于客户端的数量远远多于服务端的数量,并且客户端更多的还是基于多线程的模式。
    我们基于netty实现了一套远程方式的调用,客户端采用的是线程等待方式来进行的。
    客户端waitForResponse(long timeoutSeconds) ,然后在客户端的Channel实现中调用handleResponse()。waitForResponse采用wait或者采用Condition的await,handleResponse采用notify或者Condition的signal,来回响应通过ConcurrentHashmap进行控制,客户端有多少个线程工作ConcurrentHashmap中就最多有这么多个对象,超时在waitForResponse进行控制。这样也会有锁的发生,只不过发生在客户端,对于服务端而言还是无锁的情况。对于这样的场景,锁是无法避免的,关键是减少锁的粒度和无谓的锁开销。
    查看websphere和weblogic的底层通讯,几乎都是采用这种方式,并且都是采用一个连接进行多路复用的方式。
    这样的处理可能有以下问题,客户端超时了,服务端执行完成了,交易是成功还是失败?客户端正在执行超时的处理,这个时候服务端返回执行完成的处理,是继续超时处理还是标记为正常完成?

    public boolean handleResponse(Object rsp) {
    this.rsp = rsp;
    synchronized(this){
    this.notify();
    }
    return true;
    }

    public Object waitForResponse(long timeoutSeconds) throws ReadTimeOutException,ChannelConnectException {
    long timeout = timeoutSeconds*1000;
    long starttime = System.currentTimeMillis();

    synchronized (this) {
    while (this.rsp == null) {

    try {
    if (timeout 0) {
    wait(waitTime);
    }
    }
    }
    catch (InterruptedException e) {
    // ignored
    }

    if (timeout > 0 && ( (System.currentTimeMillis() – starttime) >= timeout)) {
    throw new ReadTimeOutException(“Timeout read data”);
    }
    else {
    continue; // keep looping
    }

    }
    }

    if(rsp instanceof ChannelConnectException){
    throw (ChannelConnectException)rsp;
    }
    return rsp;
    }

  2. 常见的做法:
    1、利用select(timeout)组织大规模的定时器活动,基本的流程跟你的描述一样,但是用select(timeout)代替sleep(timeout),那就无需一个独立的扫描线程,并且可以将请求分配到几个reactor上。
    2、搞一个线程池,请求阻塞,通过CountDownLatch.await等待,回掉线程池负责通知。缺点是回调线程池可能繁忙,拒绝请求。
    3、其他的timer机制,如timerwheel。

  3. @ cauherk
    同步的时候确实可以用wait或await,但是异步的话就不行了,还是得靠扫描线程来做了…
    @伯岩
    要么再详细点?贴伪代码也行,:)
    目前来看Timerwheel比较靠谱…

  4. 1、
    从heap中取出距离现在最近的event的超时时间timeout
    select(timeout) 阻塞或者马上返回
    返回后遍历heap,直到遇到一个不超时的event为止,超时的event都派发出去处理

    heap采用二叉堆实现,也就是优先队列。将sleep替换成select。

    加锁的问题,如果协议设计上有opaque用于映射请求响应,那么就可以用map替代队列,ConcurrnetHashMap的效率还是不用太担心。
    2、
    //回调加入队列
    final SingleRequestCallBack requestCallBack = new SingleRequestCallBack(this, requestCommand, time, listener);
    this.addRequestCallBack(requestCallBack);
    //发送消息
    this.session.write(requestCommand);
    //提价check任务
    this.remotingContext
    .submitTask(new SingleRequestCallBackRunner(listener, requestCallBack, this, time, timeUnit));

    在callback任务中通过CountDownLatch.await阻塞等待响应

    public void run() {
    try {
    this.requestCallBack.getResult(this.timeout, this.timeUnit);
    }
    catch (TimeoutException e) {
    if (this.listener.getExecutor() != null) {
    this.listener.getExecutor().execute(new Runnable() {
    public void run() {
    SingleRequestCallBackRunner.this.listener.onResponse(((DefaultConnection)SingleRequestCallBackRunner.this.connection).createTimeoutCommand(
    SingleRequestCallBackRunner.this.requestCallBack.getRequestCommand(),
    SingleRequestCallBackRunner.this.connection.getRemoteSocketAddress()),
    SingleRequestCallBackRunner.this.connection);
    }
    });
    }
    else {
    this.listener.onResponse(((DefaultConnection)this.connection).createTimeoutCommand(this.requestCallBack.getRequestCommand(),
    this.connection.getRemoteSocketAddress()), this.connection);
    }
    }

    缺点就是回调线程池有限,可能是check任务不能提交,需要单独的扫描线程回收那些没有提交的callback

  5. @伯岩
    第一个方案没怎么看明白。
    第二个方案貌似解决不了问题:
    1、提交check任务,那么check任务是多久扫描一次呢,又回到了需要计算时间的点上了;
    2、callback任务没有办法通过latch.await挂起,除非每个callback都起一个线程,那这肯定是无法接受的。

  6. 伯岩已经在notify中按照第一种方案完成了实现

    简单的说,定时任务被分配到每个reactor上,由reactor线程执行,每次select后都会处理超时的任务。 而select(timeout)的timeout由最近的超时时间决定。

    欢迎来notify观摩。

  7. 请问一个问题,系统CPU有一段时间90%左右,但是running的线程很少,从top的线程模式看cpu均衡到20-30个thread上,但是dump thread处理的running线程很少,
    大部分都是在ReentrantLock.lock上,这个会导致cpu高吗?

  8. dump的时候不一定还能看到现场了,lock不会导致cpu us高,但有可能导致sy高,如果是us高的话,可以考虑借助下intel vtune或jip(jiprof.sourceforge.net)来分析下,sy高的话就得看看能不能降低锁了。

  9. “由于大部分请求其实都不会超时的,这里会堆积很多的对象,消耗内存”
    这个问题是不是可以通过引入二级队列来缓解一下?
    假设一类超时时间为5s的请求,绝大部分情况下1s内就能得响应。
    请求发生时,将请求id及超时时间等信息加入一级队列,一级队列的扫描线程负责清除1s已经得到响应的请求,同时把超过1s还没响应的请求转到二级队列。

  10. 想到jvm 年轻代内存结构的方式,请求第一次放到Eden队列。线程定时扫面,如果请求响应已经完成了,直接删除,没有完成的放入Survivor队列,如果超时就结束。

发表评论

电子邮件地址不会被公开。 必填项已用*标注


*