TCP Pacing功能控制TCP的发包速率。
一、Pacing的初始化
TCP协议初始函数tcp_sk_init中,赋值了两个Pacing相关的参数,分别为sysctl_tcp_pacing_ss_ratio和sysctl_tcp_pacing_ca_ratio,都是控制pacing速率的倍数值。前者应用中慢启动阶段,默认值为200,即将速率提升200%;后者应用在拥塞避免阶段,默认值为120,即将速率提升120%。
static int __net_init tcp_sk_init(struct net *net)
{
net->ipv4.sysctl_tcp_pacing_ss_ratio = 200;
net->ipv4.sysctl_tcp_pacing_ca_ratio = 120;
}
$ cat /proc/sys/net/ipv4/tcp_pacing_ss_ratio
200
$ cat /proc/sys/net/ipv4/tcp_pacing_ca_ratio
120
另外,在TCP定时器初始化函数tcp_init_xmit_timers中,内核初始化一个高精度的pacing定时器,超时处理函数设定为tcp_pace_kick。
void tcp_init_xmit_timers(struct sock *sk)
{
inet_csk_init_xmit_timers(sk, &tcp_write_timer, &tcp_delack_timer, &tcp_keepalive_timer);
hrtimer_init(&tcp_sk(sk)->pacing_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS_PINNED);
tcp_sk(sk)->pacing_timer.function = tcp_pace_kick;
}
最后在套接口初始化函数sock_init_data中,初始化三个pacing相关的参数。其中最大的速率sk_max_pacing_rate和当前速率sk_pacing_rate设置为最大的无符号整型值,而sk_pacing_shift设置为10。
void sock_init_data(struct socket *sock, struct sock *sk)
{
sk->sk_max_pacing_rate = ~0U;
sk->sk_pacing_rate = ~0U;
sk->sk_pacing_shift = 10;
}
二、Pacing功能开启
TCP拥塞控制算法BBR需要pacing功能的支持,在其初始化函数bbr_init中,将pacing状态sk_pacing_status设置为SK_PACING_NEEDED使能pacing功能。另外,用户可通过setsockopt系统调用的选项SO_MAX_PACING_RATE设置pacing的最大速率,其隐含的打开套接口的pacing功能。
static void bbr_init(struct sock *sk)
{
struct tcp_sock *tp = tcp_sk(sk);
struct bbr *bbr = inet_csk_ca(sk);
bbr->has_seen_rtt = 0;
bbr_init_pacing_rate_from_rtt(sk);
cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED);
}
int sock_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen)
{
switch (optname) {
case SO_MAX_PACING_RATE:
if (val != ~0U)
cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED);
sk->sk_max_pacing_rate = val;
sk->sk_pacing_rate = min(sk->sk_pacing_rate, sk->sk_max_pacing_rate);
break;
}
}
三、Pacing功能启用
内核网络中的流控算法Fair queue可以很好的完成数据报文的pacing功能,但是当前系统为其网络接口选用了sch_fq的算法。所以,不考虑流控系统,函数tcp_needs_internal_pacing检查是否要在TCP子系统中执行pacing功能。
static bool tcp_needs_internal_pacing(const struct sock *sk)
{
return smp_load_acquire(&sk->sk_pacing_status) == SK_PACING_NEEDED;
}
函数tcp_internal_pacing负责开启TCP自身的pacing功能,前提是由pacing的需求,并且当前速率不为零而且不等于最大无符号整数值。按照当前的速率计算发送skb数据包所需要的时长,以纳秒为单位。启动pacing_timer,超时时间设置为当前数据包以设定的pacing速率发送完成所需的时长。但是,实际情况中,数据可能并不需要如此长的时间。
static void tcp_internal_pacing(struct sock *sk, const struct sk_buff *skb)
{
u64 len_ns;
u32 rate;
if (!tcp_needs_internal_pacing(sk))
return;
rate = sk->sk_pacing_rate;
if (!rate || rate == ~0U)
return;
/* Should account for header sizes as sch_fq does, but lets make things simple. */
len_ns = (u64)skb->len * NSEC_PER_SEC;
do_div(len_ns, rate);
hrtimer_start(&tcp_sk(sk)->pacing_timer, ktime_add_ns(ktime_get(), len_ns), HRTIMER_MODE_ABS_PINNED);
}
以上pacing使能函数tcp_internal_pacing在TCP传输函数tcp_transmit_skb中调用,条件是当前发送的数据包有数据,并非是SYN或者ACK类的控制报文。鉴于此时skb还没有添加网络层以及链路层的头部信息,tcp_internal_pacing在计算数据包发送时长算法中,也仅是TCP头部和数据的长度。但是流控系统的sch_fq算法不同,其可获取到最终的完整数据包长度。
static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it, gfp_t gfp_mask)
{
if (skb->len != tcp_header_size) {
tcp_event_data_sent(tp, sk);
tp->data_segs_out += tcp_skb_pcount(skb);
tcp_internal_pacing(sk, skb);
}
}
四、Pacing检查
Pacing功能的检查一个是之前的tcp_needs_internal_pacing函数,检查TCP pacing是否开启;另一个条件是pacing定时器是否启动,由函数hrtimer_active实现。两个条件同时成立,表明pacing正在工作,暂停数据包的发送。
static bool tcp_pacing_check(const struct sock *sk)
{
return tcp_needs_internal_pacing(sk) && hrtimer_active(&tcp_sk(sk)->pacing_timer);
}
参见以下的TCP发送队列处理函数tcp_write_xmit和重传队列处理函数tcp_xmit_retransmit_queue,在tcp_pacing_check返回真表明pacing已在工作时,退出发送处理流程。
static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle, int push_one, gfp_t gfp)
{
max_segs = tcp_tso_segs(sk, mss_now);
while ((skb = tcp_send_head(sk))) {
if (tcp_pacing_check(sk))
break;
}
void tcp_xmit_retransmit_queue(struct sock *sk)
{
rtx_head = tcp_rtx_queue_head(sk);
skb = tp->retransmit_skb_hint ?: rtx_head;
max_segs = tcp_tso_segs(sk, tcp_current_mss(sk));
skb_rbtree_walk_from(skb) {
if (tcp_pacing_check(sk))
break;
}
五、Pacing处理
由于tcp_pacing_check函数和tcp_internal_pacing函数配合,已经实现了TCP数据包的pacing功能。在pacing定时器超时之后,pacing超时处理函数tcp_pace_kick其实并不需要做什么处理了。
但是在实现中,tcp_pace_kick函数将处理被TSQ(TCP Small Queue)功能设置为阻塞状态的套接口。关于TSQ参见 https://blog.csdn.net/sinat_20184565/article/details/89341370。
static bool tcp_small_queue_check(struct sock *sk, const struct sk_buff *skb, unsigned int factor)
{
limit = max(2 * skb->truesize, sk->sk_pacing_rate >> sk->sk_pacing_shift);
limit = min_t(u32, limit, sock_net(sk)->ipv4.sysctl_tcp_limit_output_bytes);
limit <<= factor;
}
由于在TSQ检查时,可能由于当前的pacing速率sk_pacing_rate过高,TSQ限制了数据报文的发送,将套接口设置为阻塞状态。所以,在tcp_pace_kick函数中处理TSQ队列,必要时调用TSQ的tasklet处理。
enum hrtimer_restart tcp_pace_kick(struct hrtimer *timer)
{
struct tcp_sock *tp = container_of(timer, struct tcp_sock, pacing_timer);
struct sock *sk = (struct sock *)tp;
for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) {
struct tsq_tasklet *tsq;
bool empty;
if (oval & TSQF_QUEUED)
break;
nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED;
nval = cmpxchg(&sk->sk_tsq_flags, oval, nval);
if (nval != oval)
continue;
if (!refcount_inc_not_zero(&sk->sk_wmem_alloc))
break;
/* queue this socket to tasklet queue */
tsq = this_cpu_ptr(&tsq_tasklet);
empty = list_empty(&tsq->head);
list_add(&tp->tsq_node, &tsq->head);
if (empty)
tasklet_schedule(&tsq->tasklet);
break;
}
return HRTIMER_NORESTART;
}
六、Pacing速率
速率更新的基础函数为tcp_update_pacing_rate。如下,当前pacing速率的计算由三个变量组成。当前发送MSS缓存值mss_cache乘以拥塞窗口cwnd的结果,除以平滑往返时间srtt,及最大可发送的数据长度除以srtt得到当前pacing速率。拥塞窗口函数中是取自发送拥塞窗口值snd_cwnd与已发出数据包数量packets_out两者之中的最大值。对于处在慢启动阶段的套接口,将得到的速率值默认增加200%倍(sysctl_tcp_pacing_ss_ratio);反之对于处在拥塞避免阶段的套接口,将速率默认增加120%倍(sysctl_tcp_pacing_ca_ratio)。最终的pacing速率不能大于限定的最大值sk_max_pacing_rate。
static void tcp_update_pacing_rate(struct sock *sk)
{
/* set sk_pacing_rate to 200 % of current rate (mss * cwnd / srtt) */
rate = (u64)tp->mss_cache * ((USEC_PER_SEC / 100) << 3);
/* current rate is (cwnd * mss) / srtt
* In Slow Start [1], set sk_pacing_rate to 200 % the current rate.
* In Congestion Avoidance phase, set it to 120 % the current rate.
*
* [1] : Normal Slow Start condition is (tp->snd_cwnd < tp->snd_ssthresh)
* If snd_cwnd >= (tp->snd_ssthresh / 2), we are approaching end of slow start and should slow down.
*/
if (tp->snd_cwnd < tp->snd_ssthresh / 2)
rate *= sock_net(sk)->ipv4.sysctl_tcp_pacing_ss_ratio;
else
rate *= sock_net(sk)->ipv4.sysctl_tcp_pacing_ca_ratio;
rate *= max(tp->snd_cwnd, tp->packets_out);
if (likely(tp->srtt_us))
do_div(rate, tp->srtt_us);
/* WRITE_ONCE() is needed because sch_fq fetches sk_pacing_rate
* without any lock. We want to make sure compiler wont store intermediate values in this location.
*/
WRITE_ONCE(sk->sk_pacing_rate, min_t(u64, rate, sk->sk_max_pacing_rate));
}
Pacing速率更新的入口有两个。一个位于TCP服务端接收到客户端的三次握手的ACK报文之后,初始化pacing速率。前提是TCP当前采用的拥塞避免算法没有实现cong_control回调函数,目前仅有BBR算法实现了此回调。
int tcp_rcv_state_process(struct sock *sk, struct sk_buff *skb)
{
switch (sk->sk_state) {
case TCP_SYN_RECV:
if (!inet_csk(sk)->icsk_ca_ops->cong_control)
tcp_update_pacing_rate(sk);
}
static struct tcp_congestion_ops tcp_bbr_cong_ops __read_mostly = {
.flags = TCP_CONG_NON_RESTRICTED,
.name = "bbr",
.cong_control = bbr_main,
};
如果采用BBR算法,将不在tcp_rcv_state_process函数中初始化pacing速率。BBR拥塞算法在cong_control回调(bbr_main)中设置pacing速率,如下的tcp_cong_control函数,如果cong_control有值,执行完之后就结束函数。只有在采用除BBR算法之外的其它拥塞算法时(cong_control为空指针),才会往后执行,调用pacing速率更新函数。tcp_cong_control函数在处理ACK确认报文的最后被调用。
static void tcp_cong_control(struct sock *sk, u32 ack, u32 acked_sacked, int flag, const struct rate_sample *rs)
{
const struct inet_connection_sock *icsk = inet_csk(sk);
if (icsk->icsk_ca_ops->cong_control) {
icsk->icsk_ca_ops->cong_control(sk, rs);
return;
}
if (tcp_in_cwnd_reduction(sk)) {
tcp_cwnd_reduction(sk, acked_sacked, flag); /* Reduce cwnd if state mandates */
} else if (tcp_may_raise_cwnd(sk, flag)) {
tcp_cong_avoid(sk, ack, acked_sacked); /* Advance cwnd if state allows */
}
tcp_update_pacing_rate(sk);
}
static int tcp_ack(struct sock *sk, const struct sk_buff *skb, int flag)
{
tcp_cong_control(sk, ack, delivered, flag, sack_state.rate);
tcp_xmit_recovery(sk, rexmit);
return 1;
}
七、BBR调整pacing速率
在拥塞控制算法BBR中,将cong_control回调初始化为指向bbr_main函数的指针,其调用bbr_set_pacing_rate函数更新pacing速率。
static void bbr_main(struct sock *sk, const struct rate_sample *rs)
{
struct bbr *bbr = inet_csk_ca(sk);
bbr_update_model(sk, rs);
bw = bbr_bw(sk);
bbr_set_pacing_rate(sk, bw, bbr->pacing_gain);
}
BBR中核心的pacing速率计算函数如下的bbr_rate_bytes_per_sec。其中rate参数为BBR算法估算出的当前带宽值,其乘以MTU值,再乘以一个增益值gain,类似于之前接收的函数tcp_update_pacing_rate中计算pacing速率的过程,差别在与这里使用的报文长度为MTU(不包括链路层长度),并且将之前的拥塞窗口换成了带宽值,而且将之前的递增倍率(sysctl_tcp_pacing_ss_ratio/sysctl_tcp_pacing_ca_ratio)换成了BBR计算的增益值gain。
函数bbr_bw_to_pacing_rate确保pacing值不超过最大的限定值sk_max_pacing_rate。
static u64 bbr_rate_bytes_per_sec(struct sock *sk, u64 rate, int gain)
{
rate *= tcp_mss_to_mtu(sk, tcp_sk(sk)->mss_cache);
rate *= gain;
rate >>= BBR_SCALE;
rate *= USEC_PER_SEC;
return rate >> BW_SCALE;
}
/* Convert a BBR bw and gain factor to a pacing rate in bytes per second. */
static u32 bbr_bw_to_pacing_rate(struct sock *sk, u32 bw, int gain)
{
u64 rate = bw;
rate = bbr_rate_bytes_per_sec(sk, rate, gain);
rate = min_t(u64, rate, sk->sk_max_pacing_rate);
return rate;
}
在主回调函数bbr_main中,bbr_set_pacing_rate调用以上bbr_bw_to_pacing_rate获取到pacing的速率值。通常情况下has_seen_rtt变量已经置位,在初始化函数bbr_init中已调用过函数bbr_init_pacing_rate_from_rtt。如果带宽已占满,或者计算的pacing速率大于当前使用的速率sk_pacing_rate,更新当前速率。
为了在保证较少队列的同时维持网络的高利用率和低延时,计算所得的pacing速率略低于估算带宽的百分之一左右,为达到此目的,在计算pacing速率时,使用了链路的MTU值,未将链路层头部数据长度包含在内。
static void bbr_set_pacing_rate(struct sock *sk, u32 bw, int gain)
{
struct tcp_sock *tp = tcp_sk(sk);
struct bbr *bbr = inet_csk_ca(sk);
u32 rate = bbr_bw_to_pacing_rate(sk, bw, gain);
if (unlikely(!bbr->has_seen_rtt && tp->srtt_us))
bbr_init_pacing_rate_from_rtt(sk);
if (bbr_full_bw_reached(sk) || rate > sk->sk_pacing_rate)
sk->sk_pacing_rate = rate;
}
函数bbr_init_pacing_rate_from_rtt已在br_init初始化时调用,带宽的值由发送拥塞窗口乘以BW_UNIT,在除以rtt_us值得到,通过以上的bbr_bw_to_pacing_rate函数计算pacing速率,增益值使用bbr_high_gain。如果平滑往返时间为零,rtt_us使用缺省的USEC_PER_MSEC(1000),反之,使用srtt_us值除以8(BBR_SCALE)的值。
#define BW_SCALE 24
#define BW_UNIT (1 << BW_SCALE)
#define BBR_SCALE 8 /* scaling factor for fractions in BBR (e.g. gains) */
#define BBR_UNIT (1 << BBR_SCALE)
static const int bbr_high_gain = BBR_UNIT * 2885 / 1000 + 1;
/* Initialize pacing rate to: high_gain * init_cwnd / RTT. */
static void bbr_init_pacing_rate_from_rtt(struct sock *sk)
{
struct tcp_sock *tp = tcp_sk(sk);
struct bbr *bbr = inet_csk_ca(sk);
u64 bw;
u32 rtt_us;
if (tp->srtt_us) { /* any RTT sample yet? */
rtt_us = max(tp->srtt_us >> 3, 1U);
bbr->has_seen_rtt = 1;
} else { /* no RTT sample yet */
rtt_us = USEC_PER_MSEC; /* use nominal default RTT */
}
bw = (u64)tp->snd_cwnd * BW_UNIT;
do_div(bw, rtt_us);
sk->sk_pacing_rate = bbr_bw_to_pacing_rate(sk, bw, bbr_high_gain);
}