TCP协议之《Pacing功能》

TCP Pacing功能控制TCP的发包速率。

一、Pacing的初始化

TCP协议初始函数tcp_sk_init中,赋值了两个Pacing相关的参数,分别为sysctl_tcp_pacing_ss_ratio和sysctl_tcp_pacing_ca_ratio,都是控制pacing速率的倍数值。前者应用中慢启动阶段,默认值为200,即将速率提升200%;后者应用在拥塞避免阶段,默认值为120,即将速率提升120%。

static int __net_init tcp_sk_init(struct net *net)
{
    net->ipv4.sysctl_tcp_pacing_ss_ratio = 200;
    net->ipv4.sysctl_tcp_pacing_ca_ratio = 120;
}
 
$ cat /proc/sys/net/ipv4/tcp_pacing_ss_ratio
200
$ cat /proc/sys/net/ipv4/tcp_pacing_ca_ratio
120
另外,在TCP定时器初始化函数tcp_init_xmit_timers中,内核初始化一个高精度的pacing定时器,超时处理函数设定为tcp_pace_kick。

void tcp_init_xmit_timers(struct sock *sk)
{
    inet_csk_init_xmit_timers(sk, &tcp_write_timer, &tcp_delack_timer, &tcp_keepalive_timer);
    hrtimer_init(&tcp_sk(sk)->pacing_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS_PINNED);
    tcp_sk(sk)->pacing_timer.function = tcp_pace_kick;
}
最后在套接口初始化函数sock_init_data中,初始化三个pacing相关的参数。其中最大的速率sk_max_pacing_rate和当前速率sk_pacing_rate设置为最大的无符号整型值,而sk_pacing_shift设置为10。

void sock_init_data(struct socket *sock, struct sock *sk)
{
    sk->sk_max_pacing_rate = ~0U;
    sk->sk_pacing_rate = ~0U;
    sk->sk_pacing_shift = 10;
}

二、Pacing功能开启
TCP拥塞控制算法BBR需要pacing功能的支持,在其初始化函数bbr_init中,将pacing状态sk_pacing_status设置为SK_PACING_NEEDED使能pacing功能。另外,用户可通过setsockopt系统调用的选项SO_MAX_PACING_RATE设置pacing的最大速率,其隐含的打开套接口的pacing功能。

static void bbr_init(struct sock *sk)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct bbr *bbr = inet_csk_ca(sk);
 
    bbr->has_seen_rtt = 0;
    bbr_init_pacing_rate_from_rtt(sk);
    
    cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED);
}
int sock_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen)
{
    switch (optname) {
    case SO_MAX_PACING_RATE:
        if (val != ~0U)
            cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED);
        sk->sk_max_pacing_rate = val;
        sk->sk_pacing_rate = min(sk->sk_pacing_rate, sk->sk_max_pacing_rate);
        break;
    }
}

三、Pacing功能启用

内核网络中的流控算法Fair queue可以很好的完成数据报文的pacing功能,但是当前系统为其网络接口选用了sch_fq的算法。所以,不考虑流控系统,函数tcp_needs_internal_pacing检查是否要在TCP子系统中执行pacing功能。

static bool tcp_needs_internal_pacing(const struct sock *sk)
{
    return smp_load_acquire(&sk->sk_pacing_status) == SK_PACING_NEEDED;
}
函数tcp_internal_pacing负责开启TCP自身的pacing功能,前提是由pacing的需求,并且当前速率不为零而且不等于最大无符号整数值。按照当前的速率计算发送skb数据包所需要的时长,以纳秒为单位。启动pacing_timer,超时时间设置为当前数据包以设定的pacing速率发送完成所需的时长。但是,实际情况中,数据可能并不需要如此长的时间。

static void tcp_internal_pacing(struct sock *sk, const struct sk_buff *skb)
{
    u64 len_ns;
    u32 rate;
 
    if (!tcp_needs_internal_pacing(sk))
        return;
    rate = sk->sk_pacing_rate;
    if (!rate || rate == ~0U)
        return;
 
    /* Should account for header sizes as sch_fq does, but lets make things simple. */
    len_ns = (u64)skb->len * NSEC_PER_SEC;
    do_div(len_ns, rate);
    hrtimer_start(&tcp_sk(sk)->pacing_timer, ktime_add_ns(ktime_get(), len_ns), HRTIMER_MODE_ABS_PINNED);
}

以上pacing使能函数tcp_internal_pacing在TCP传输函数tcp_transmit_skb中调用,条件是当前发送的数据包有数据,并非是SYN或者ACK类的控制报文。鉴于此时skb还没有添加网络层以及链路层的头部信息,tcp_internal_pacing在计算数据包发送时长算法中,也仅是TCP头部和数据的长度。但是流控系统的sch_fq算法不同,其可获取到最终的完整数据包长度。

static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it, gfp_t gfp_mask)
{
    if (skb->len != tcp_header_size) {
        tcp_event_data_sent(tp, sk);
        tp->data_segs_out += tcp_skb_pcount(skb);
        tcp_internal_pacing(sk, skb);
    }
}

四、Pacing检查

Pacing功能的检查一个是之前的tcp_needs_internal_pacing函数,检查TCP pacing是否开启;另一个条件是pacing定时器是否启动,由函数hrtimer_active实现。两个条件同时成立,表明pacing正在工作,暂停数据包的发送。

static bool tcp_pacing_check(const struct sock *sk)
{
    return tcp_needs_internal_pacing(sk) && hrtimer_active(&tcp_sk(sk)->pacing_timer);
}
参见以下的TCP发送队列处理函数tcp_write_xmit和重传队列处理函数tcp_xmit_retransmit_queue,在tcp_pacing_check返回真表明pacing已在工作时,退出发送处理流程。

static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle, int push_one, gfp_t gfp)
{
    max_segs = tcp_tso_segs(sk, mss_now);
    while ((skb = tcp_send_head(sk))) {
 
        if (tcp_pacing_check(sk))
            break;
}
void tcp_xmit_retransmit_queue(struct sock *sk)
{
    rtx_head = tcp_rtx_queue_head(sk);
    skb = tp->retransmit_skb_hint ?: rtx_head;
    max_segs = tcp_tso_segs(sk, tcp_current_mss(sk));
    skb_rbtree_walk_from(skb) {
 
        if (tcp_pacing_check(sk))
            break;
}

五、Pacing处理

由于tcp_pacing_check函数和tcp_internal_pacing函数配合,已经实现了TCP数据包的pacing功能。在pacing定时器超时之后,pacing超时处理函数tcp_pace_kick其实并不需要做什么处理了。

但是在实现中,tcp_pace_kick函数将处理被TSQ(TCP Small Queue)功能设置为阻塞状态的套接口。关于TSQ参见 https://blog.csdn.net/sinat_20184565/article/details/89341370。

static bool tcp_small_queue_check(struct sock *sk, const struct sk_buff *skb, unsigned int factor)
{
    limit = max(2 * skb->truesize, sk->sk_pacing_rate >> sk->sk_pacing_shift);
    limit = min_t(u32, limit, sock_net(sk)->ipv4.sysctl_tcp_limit_output_bytes);
    limit <<= factor;
}
由于在TSQ检查时,可能由于当前的pacing速率sk_pacing_rate过高,TSQ限制了数据报文的发送,将套接口设置为阻塞状态。所以,在tcp_pace_kick函数中处理TSQ队列,必要时调用TSQ的tasklet处理。

enum hrtimer_restart tcp_pace_kick(struct hrtimer *timer)
{
    struct tcp_sock *tp = container_of(timer, struct tcp_sock, pacing_timer);
    struct sock *sk = (struct sock *)tp;
 
    for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) {
        struct tsq_tasklet *tsq;
        bool empty;
 
        if (oval & TSQF_QUEUED)
            break;
 
        nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED;
        nval = cmpxchg(&sk->sk_tsq_flags, oval, nval);
        if (nval != oval)
            continue;
 
        if (!refcount_inc_not_zero(&sk->sk_wmem_alloc))
            break;
        /* queue this socket to tasklet queue */
        tsq = this_cpu_ptr(&tsq_tasklet);
        empty = list_empty(&tsq->head);
        list_add(&tp->tsq_node, &tsq->head);
        if (empty)
            tasklet_schedule(&tsq->tasklet);
        break;
    }
    return HRTIMER_NORESTART;
}

六、Pacing速率
速率更新的基础函数为tcp_update_pacing_rate。如下,当前pacing速率的计算由三个变量组成。当前发送MSS缓存值mss_cache乘以拥塞窗口cwnd的结果,除以平滑往返时间srtt,及最大可发送的数据长度除以srtt得到当前pacing速率。拥塞窗口函数中是取自发送拥塞窗口值snd_cwnd与已发出数据包数量packets_out两者之中的最大值。对于处在慢启动阶段的套接口,将得到的速率值默认增加200%倍(sysctl_tcp_pacing_ss_ratio);反之对于处在拥塞避免阶段的套接口,将速率默认增加120%倍(sysctl_tcp_pacing_ca_ratio)。最终的pacing速率不能大于限定的最大值sk_max_pacing_rate。

static void tcp_update_pacing_rate(struct sock *sk)
{
    /* set sk_pacing_rate to 200 % of current rate (mss * cwnd / srtt) */
    rate = (u64)tp->mss_cache * ((USEC_PER_SEC / 100) << 3);
 
    /* current rate is (cwnd * mss) / srtt
     * In Slow Start [1], set sk_pacing_rate to 200 % the current rate.
     * In Congestion Avoidance phase, set it to 120 % the current rate.
     *
     * [1] : Normal Slow Start condition is (tp->snd_cwnd < tp->snd_ssthresh)
     *   If snd_cwnd >= (tp->snd_ssthresh / 2), we are approaching end of slow start and should slow down.
     */
    if (tp->snd_cwnd < tp->snd_ssthresh / 2)
        rate *= sock_net(sk)->ipv4.sysctl_tcp_pacing_ss_ratio;
    else
        rate *= sock_net(sk)->ipv4.sysctl_tcp_pacing_ca_ratio;
 
    rate *= max(tp->snd_cwnd, tp->packets_out);
 
    if (likely(tp->srtt_us))
        do_div(rate, tp->srtt_us);
 
    /* WRITE_ONCE() is needed because sch_fq fetches sk_pacing_rate
     * without any lock. We want to make sure compiler wont store intermediate values in this location.
     */
    WRITE_ONCE(sk->sk_pacing_rate, min_t(u64, rate, sk->sk_max_pacing_rate));
}

Pacing速率更新的入口有两个。一个位于TCP服务端接收到客户端的三次握手的ACK报文之后,初始化pacing速率。前提是TCP当前采用的拥塞避免算法没有实现cong_control回调函数,目前仅有BBR算法实现了此回调。

int tcp_rcv_state_process(struct sock *sk, struct sk_buff *skb)
{
    switch (sk->sk_state) {
    case TCP_SYN_RECV:
        if (!inet_csk(sk)->icsk_ca_ops->cong_control)
            tcp_update_pacing_rate(sk);
}
static struct tcp_congestion_ops tcp_bbr_cong_ops __read_mostly = {
    .flags      = TCP_CONG_NON_RESTRICTED,
    .name       = "bbr",
    .cong_control   = bbr_main,
};
如果采用BBR算法,将不在tcp_rcv_state_process函数中初始化pacing速率。BBR拥塞算法在cong_control回调(bbr_main)中设置pacing速率,如下的tcp_cong_control函数,如果cong_control有值,执行完之后就结束函数。只有在采用除BBR算法之外的其它拥塞算法时(cong_control为空指针),才会往后执行,调用pacing速率更新函数。tcp_cong_control函数在处理ACK确认报文的最后被调用。

static void tcp_cong_control(struct sock *sk, u32 ack, u32 acked_sacked, int flag, const struct rate_sample *rs)
{
    const struct inet_connection_sock *icsk = inet_csk(sk);
 
    if (icsk->icsk_ca_ops->cong_control) {
        icsk->icsk_ca_ops->cong_control(sk, rs);
        return;
    }
    if (tcp_in_cwnd_reduction(sk)) {  
        tcp_cwnd_reduction(sk, acked_sacked, flag);   /* Reduce cwnd if state mandates */
    } else if (tcp_may_raise_cwnd(sk, flag)) {
        tcp_cong_avoid(sk, ack, acked_sacked);        /* Advance cwnd if state allows */
    }
    tcp_update_pacing_rate(sk);
}
static int tcp_ack(struct sock *sk, const struct sk_buff *skb, int flag)
{
    tcp_cong_control(sk, ack, delivered, flag, sack_state.rate);
    tcp_xmit_recovery(sk, rexmit);
    return 1;
}

七、BBR调整pacing速率
在拥塞控制算法BBR中,将cong_control回调初始化为指向bbr_main函数的指针,其调用bbr_set_pacing_rate函数更新pacing速率。

static void bbr_main(struct sock *sk, const struct rate_sample *rs)
{
    struct bbr *bbr = inet_csk_ca(sk);
 
    bbr_update_model(sk, rs);
    bw = bbr_bw(sk);
    bbr_set_pacing_rate(sk, bw, bbr->pacing_gain);
}
BBR中核心的pacing速率计算函数如下的bbr_rate_bytes_per_sec。其中rate参数为BBR算法估算出的当前带宽值,其乘以MTU值,再乘以一个增益值gain,类似于之前接收的函数tcp_update_pacing_rate中计算pacing速率的过程,差别在与这里使用的报文长度为MTU(不包括链路层长度),并且将之前的拥塞窗口换成了带宽值,而且将之前的递增倍率(sysctl_tcp_pacing_ss_ratio/sysctl_tcp_pacing_ca_ratio)换成了BBR计算的增益值gain。

函数bbr_bw_to_pacing_rate确保pacing值不超过最大的限定值sk_max_pacing_rate。

static u64 bbr_rate_bytes_per_sec(struct sock *sk, u64 rate, int gain)
{
    rate *= tcp_mss_to_mtu(sk, tcp_sk(sk)->mss_cache);
    rate *= gain;
    rate >>= BBR_SCALE;
    rate *= USEC_PER_SEC;
    return rate >> BW_SCALE;
}
/* Convert a BBR bw and gain factor to a pacing rate in bytes per second. */
static u32 bbr_bw_to_pacing_rate(struct sock *sk, u32 bw, int gain)
{
    u64 rate = bw;
 
    rate = bbr_rate_bytes_per_sec(sk, rate, gain);
    rate = min_t(u64, rate, sk->sk_max_pacing_rate);
    return rate;
}  

在主回调函数bbr_main中,bbr_set_pacing_rate调用以上bbr_bw_to_pacing_rate获取到pacing的速率值。通常情况下has_seen_rtt变量已经置位,在初始化函数bbr_init中已调用过函数bbr_init_pacing_rate_from_rtt。如果带宽已占满,或者计算的pacing速率大于当前使用的速率sk_pacing_rate,更新当前速率。

为了在保证较少队列的同时维持网络的高利用率和低延时,计算所得的pacing速率略低于估算带宽的百分之一左右,为达到此目的,在计算pacing速率时,使用了链路的MTU值,未将链路层头部数据长度包含在内。

static void bbr_set_pacing_rate(struct sock *sk, u32 bw, int gain)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct bbr *bbr = inet_csk_ca(sk);
    u32 rate = bbr_bw_to_pacing_rate(sk, bw, gain);
 
    if (unlikely(!bbr->has_seen_rtt && tp->srtt_us))
        bbr_init_pacing_rate_from_rtt(sk);
    if (bbr_full_bw_reached(sk) || rate > sk->sk_pacing_rate)
        sk->sk_pacing_rate = rate;
}
函数bbr_init_pacing_rate_from_rtt已在br_init初始化时调用,带宽的值由发送拥塞窗口乘以BW_UNIT,在除以rtt_us值得到,通过以上的bbr_bw_to_pacing_rate函数计算pacing速率,增益值使用bbr_high_gain。如果平滑往返时间为零,rtt_us使用缺省的USEC_PER_MSEC(1000),反之,使用srtt_us值除以8(BBR_SCALE)的值。
 

#define BW_SCALE 24
#define BW_UNIT (1 << BW_SCALE)
#define BBR_SCALE 8                /* scaling factor for fractions in BBR (e.g. gains) */
#define BBR_UNIT (1 << BBR_SCALE)
static const int bbr_high_gain  = BBR_UNIT * 2885 / 1000 + 1;
 
/* Initialize pacing rate to: high_gain * init_cwnd / RTT. */
static void bbr_init_pacing_rate_from_rtt(struct sock *sk)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct bbr *bbr = inet_csk_ca(sk);
    u64 bw;
    u32 rtt_us;
 
    if (tp->srtt_us) {      /* any RTT sample yet? */
        rtt_us = max(tp->srtt_us >> 3, 1U);
        bbr->has_seen_rtt = 1;
    } else {             /* no RTT sample yet */
        rtt_us = USEC_PER_MSEC;  /* use nominal default RTT */
    }
    bw = (u64)tp->snd_cwnd * BW_UNIT;
    do_div(bw, rtt_us);
    sk->sk_pacing_rate = bbr_bw_to_pacing_rate(sk, bw, bbr_high_gain);
}

Published by

风君子

独自遨游何稽首 揭天掀地慰生平