Index: net/netisr.c =================================================================== --- net/netisr.c (revision 190341) +++ net/netisr.c (working copy) @@ -1,4 +1,5 @@ /*- + * Copyright (c) 2009, Ivan Voras * Copyright (c) 2001,2002,2003 Jonathan Lemon * Copyright (c) 1997, Stefan Esser * All rights reserved. @@ -37,12 +38,15 @@ #include #include #include +#include +#include #include #include #include #include #include #include +#include #include #include #include @@ -55,9 +59,15 @@ #include #include +#include +#include +#include +#include + volatile unsigned int netisr; /* scheduling bits for network */ struct netisr { + int ni_index; netisr_t *ni_handler; struct ifqueue *ni_queue; int ni_flags; @@ -82,6 +92,7 @@ netisrs[num].ni_handler = handler; netisrs[num].ni_queue = inq; netisrs[num].ni_flags = flags; + netisrs[num].ni_index = num; } void @@ -96,6 +107,7 @@ if (ni->ni_queue != NULL) IF_DRAIN(ni->ni_queue); ni->ni_queue = NULL; + ni->ni_index = 0; } struct isrstat { @@ -112,7 +124,7 @@ static int netisr_direct = 1; SYSCTL_INT(_net_isr, OID_AUTO, direct, CTLFLAG_RW, - &netisr_direct, 0, "enable direct dispatch"); + &netisr_direct, 0, "Enable direct dispatch"); TUNABLE_INT("net.isr.direct", &netisr_direct); SYSCTL_INT(_net_isr, OID_AUTO, count, CTLFLAG_RD, @@ -128,7 +140,96 @@ SYSCTL_INT(_net_isr, OID_AUTO, swi_count, CTLFLAG_RD, &isrstat.isrs_swi_count, 0, ""); +static int mtdispatch_n_threads = 2; +TUNABLE_INT("net.isr.mtdispatch_n_threads", &mtdispatch_n_threads); +SYSCTL_INT(_net_isr, OID_AUTO, mtdispatch_n_threads, CTLFLAG_RD, + &mtdispatch_n_threads, 0, "Number of input dispatch threads"); + +static unsigned int mtdispatch_stat_packets = 0; +SYSCTL_INT(_net_isr, OID_AUTO, mtdispatch_stat_packets, CTLFLAG_RD, + &mtdispatch_stat_packets, 0, "Packets processed in dispatch threads (approx)"); + +static unsigned int mtdispatch_debug = 0; +TUNABLE_INT("net.isr.mtdispatch_debug", &mtdispatch_debug); +SYSCTL_INT(_net_isr, OID_AUTO, mtdispatch_debug, CTLFLAG_RW, + &mtdispatch_debug, 0, "Debug multithreaded netisr dispatch?"); + +static unsigned int mtdispatch_max_queued = 0; +SYSCTL_INT(_net_isr, OID_AUTO, mtdispatch_max_queued, CTLFLAG_RD, + &mtdispatch_max_queued, 0, "Max number of packets queued per thread so far (approx)"); + +static unsigned int mtdispatch_stat_dropped = 0; +SYSCTL_INT(_net_isr, OID_AUTO, mtdispatch_stat_dropped, CTLFLAG_RD, + &mtdispatch_stat_dropped, 0, "Packets dropped in mtdispatch"); + +static inline int dispatch_queue(struct netisr *ni, struct mbuf *m); + +struct dispatch_mi { + struct netisr *ni; + struct mbuf *m; +}; + +struct dispatch_thread_sc { + unsigned int id; + unsigned int active; + struct dispatch_mi *mbcb; /* Circular buffer */ + unsigned int mbcb_size; + unsigned int mbcb_start; + unsigned int mbcb_end; + struct mtx cqmtx; + struct cv cqcv; + struct thread *td; +}; + +static struct dispatch_thread_sc *dispatch_threads = NULL; + +MALLOC_DEFINE(M_NETISR_DISPATCH, "netisr_dispatch", + "netisr mtdispatch private data"); + + /* + * NetISR dispatch thread + * XXX: de-dummify this comment + */ +static void +netisr_dispatch_thread(void *arg) +{ + struct dispatch_thread_sc *dt = arg; + struct dispatch_mi mi; + unsigned int mbcb_idx; + + mtx_lock(&dt->cqmtx); + while (1) { + if (mtdispatch_debug) + printf("** netisr_dispatch_thread %d: wait at %d, %d\n", dt->id, dt->mbcb_start, dt->mbcb_end); + cv_wait(&dt->cqcv, &dt->cqmtx); + if (mtdispatch_debug) + printf("** netisr_dispatch_thread %d: go at %d, %d\n", dt->id, dt->mbcb_start, dt->mbcb_end); + while (dt->mbcb_start != dt->mbcb_end) { + /* Bring out and process the mbufs */ + mbcb_idx = dt->mbcb_start % dt->mbcb_size; + if (mtdispatch_debug) + printf("** netisr_dispatch_thread %d: packet at %d\n", dt->id, mbcb_idx); + bcopy(&dt->mbcb[mbcb_idx], &mi, sizeof(mi)); + bzero(&dt->mbcb[mbcb_idx], sizeof(dt->mbcb[mbcb_idx])); + KASSERT(mi.m != NULL && mi.ni != NULL, + ("Something's wrong with netisr dispatch circular " + "queue: m=%p, ni=%p, mbcb_idx=%d", mi.m, mi.ni, + mbcb_idx)); + dt->mbcb_start++; + /* Process the mbuf while the mbcb is unlocked, to allow + * more packets to be queued. */ + mtx_unlock(&dt->cqmtx); + if (mtdispatch_debug) + printf("** netisr_dispatch_thread %d: handling packet %p from %p\n", dt->id, mi.m, dt); + mi.ni->ni_handler(mi.m); /* Godspeed, little packet */ + mtdispatch_stat_packets++; + mtx_lock(&dt->cqmtx); + } + } +} + +/* * Process all packets currently present in a netisr queue. Used to * drain an existing set of packets waiting for processing when we * begin direct dispatch, to avoid processing packets out of order. @@ -142,7 +243,10 @@ IF_DEQUEUE(ni->ni_queue, m); if (m == NULL) break; - ni->ni_handler(m); + if (mtdispatch_n_threads > 0) + dispatch_queue(ni, m); + else + ni->ni_handler(m); } } @@ -184,6 +288,103 @@ } /* + * Hash a mbuf and ni to a dispatch_thread. The intention is for this hash to + * uniformly distribute the mbufs across dispatch threads. + */ +static unsigned int +dispatch_hash_mbuf(struct netisr *ni, struct mbuf *m) +{ + struct ip *ip; + struct tcphdr *tcp; + struct udphdr *udp; + unsigned int hash; + static unsigned int dispatch_rr = 0; + + KASSERT(mtdispatch_n_threads > 0, ("dispatch_hash_mbuf: no threads?")); + if (mtdispatch_n_threads == 1) + return (0); + + M_ASSERTPKTHDR(m); + + /* + * For TCP and UDP: hash source ip+port. The idea is that the same + * thread handles individual flows. This is a layering violation. + */ + if (ni->ni_index == NETISR_IP && m->m_pkthdr.len >= sizeof(struct ip)) { + if (m->m_len < sizeof(struct ip) + sizeof(struct tcphdr) && + (m = m_pullup(m, + sizeof(struct ip) + sizeof(struct tcphdr))) == NULL) + goto roundrobin; + ip = mtod(m, struct ip *); + /* XXX: What about IPv6? */ + hash = ntohl(ip->ip_src.s_addr); + if (ip->ip_p == IPPROTO_TCP) { + tcp = (struct tcphdr *)((caddr_t)ip + (ip->ip_hl << 2)); + hash += ntohs(tcp->th_sport) + ntohs(tcp->th_dport); + if (mtdispatch_debug) + printf("** dispatch_hash_mbuf: TCP at %p hash of %x+%u=%u\n", m, ntohl(ip->ip_src.s_addr), ntohs(tcp->th_sport) + ntohs(tcp->th_dport), hash); + } else if (ip->ip_p == IPPROTO_UDP) { + udp = (struct udphdr*) ((char*)ip) + sizeof(struct ip); + hash += ntohs(udp->uh_sport); + } + return (hash % mtdispatch_n_threads); + } + +roundrobin: + if (++dispatch_rr >= mtdispatch_n_threads) + dispatch_rr = 0; + return (dispatch_rr); +} + +/* + * Queue the (ni, m) pair to a dispatch thread. + */ +static inline int +dispatch_queue(struct netisr *ni, struct mbuf *m) +{ + unsigned int dt_hash; + unsigned int mbcb_idx; + struct dispatch_thread_sc *dt; + struct dispatch_mi *mi; + + dt_hash = dispatch_hash_mbuf(ni, m); + + if (mtdispatch_debug) + printf("** dispatch_queue: start with nni=%d, ni=%p, m=%p -> to thread %d\n", ni->ni_index, ni, m, dt_hash); + + dt = &dispatch_threads[dt_hash]; + mtx_lock(&dt->cqmtx); + + if (dt->mbcb_end - dt->mbcb_start >= dt->mbcb_size) { + /* + * XXX: Hmmm, no more room to queue it? There should be a way + * to resize the mbcb. + */ + isrstat.isrs_drop++; + mtdispatch_stat_dropped++; + m_freem(m); + printf("dispatch_queue: Dropped packet, mbcb length=%u\n", + dt->mbcb_end - dt->mbcb_start); + cv_signal(&dt->cqcv); + return (ENOBUFS); + } + + mbcb_idx = dt->mbcb_end++ % dt->mbcb_size; + if (mtdispatch_debug) + printf("** dispatch_queue: queueing packet %p at thread %d index %d\n", m, dt_hash, mbcb_idx); + mi = &dt->mbcb[mbcb_idx]; + mi->m = m; + mi->ni = ni; + + if (dt->mbcb_end - dt->mbcb_start > mtdispatch_max_queued) + mtdispatch_max_queued = dt->mbcb_end - dt->mbcb_start; + + mtx_unlock(&dt->cqmtx); + cv_signal(&dt->cqcv); + return (0); +} + +/* * Same as above, but always queue. * This is either used in places where we are not confident that * direct dispatch is possible, or where queueing is required. @@ -198,6 +399,11 @@ KASSERT(!(num < 0 || num >= (sizeof(netisrs)/sizeof(*netisrs))), ("bad isr %d", num)); ni = &netisrs[num]; + + /* Queue mbufs on dispatch threads if possible */ + if (mtdispatch_n_threads > 0) + return (dispatch_queue(ni, m)); + if (ni->ni_queue == NULL) { isrstat.isrs_drop++; m_freem(m); @@ -247,7 +453,33 @@ start_netisr(void *dummy) { + int i; + const unsigned int mbcb_initial_length = 32768; /* * mtdispatch_n_threads */ + if (swi_add(NULL, "net", swi_net, NULL, SWI_NET, INTR_MPSAFE, &net_ih)) - panic("start_netisr"); + panic("start_netisr: swi_add"); + + dispatch_threads = + malloc(mtdispatch_n_threads * sizeof(struct dispatch_thread_sc), + M_NETISR_DISPATCH, M_WAITOK | M_ZERO); + + if (mtdispatch_debug) + printf("** start_netisr: starting %d dispatch threads.\n", mtdispatch_n_threads); + /* Initialize and start dispatch threads */ + for (i = 0; i < mtdispatch_n_threads; i++) { + dispatch_threads[i].id = i; + mtx_init(&dispatch_threads[i].cqmtx, "cqmtx", NULL, MTX_DEF); + cv_init(&dispatch_threads[i].cqcv, "cqcv"); + dispatch_threads[i].mbcb_size = mbcb_initial_length; + dispatch_threads[i].mbcb = + malloc(dispatch_threads[i].mbcb_size * + sizeof(struct dispatch_mi), M_NETISR_DISPATCH, + M_WAITOK | M_ZERO); + if (kthread_add(netisr_dispatch_thread, &dispatch_threads[i], + NULL, &dispatch_threads[i].td, 0, 0, "netd%d", i) != 0) + panic("start_netisr: kthread_add failed for " + "netisr_dispatch_thread %d", i); + } } + SYSINIT(start_netisr, SI_SUB_SOFTINTR, SI_ORDER_FIRST, start_netisr, NULL);