diff --git a/configure.ac b/configure.ac index 9bdffea..a70232f 100644 --- a/configure.ac +++ b/configure.ac @@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) AC_SEARCH_LIBS([timer_create], [rt]) AC_SEARCH_LIBS([pcap_open_live], [pcap]) +OVS_CHECK_THREADED OVS_CHECK_COVERAGE OVS_CHECK_NDEBUG OVS_CHECK_NETLINK diff --git a/lib/automake.mk b/lib/automake.mk index 8180517..f24ce5b 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \ lib/daemon.c \ lib/daemon.h \ lib/dhcp.h \ + lib/dispatch.h \ lib/dummy.c \ lib/dummy.h \ lib/dhparams.h \ diff --git a/lib/dispatch.h b/lib/dispatch.h new file mode 100644 index 0000000..80ac9c7 --- /dev/null +++ b/lib/dispatch.h @@ -0,0 +1,9 @@ +#include +#include "ofpbuf.h" + +#ifndef DISPATCH_H +#define DISPATCH_H 1 + +typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf); + +#endif /* DISPATCH_H */ diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 144b6b6..3aff320 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -32,6 +32,15 @@ #include #include +#ifdef THREADED +#include +#include + +#include "socket-util.h" +#include "fatal-signal.h" +#include "dispatch.h" +#endif + #include "csum.h" #include "dpif.h" #include "dpif-provider.h" @@ -55,6 +64,14 @@ #include "vlog.h" VLOG_DEFINE_THIS_MODULE(dpif_netdev); +/* Pthread lock macros, nops in the non-threaded case. */ +#ifdef THREADED +#define LOCK(mutex) pthread_mutex_lock(mutex) +#define UNLOCK(mutex) pthread_mutex_unlock(mutex) +#else +#define LOCK(mutex) +#define UNLOCK(mutex) +#endif /* Configuration parameters. */ enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ @@ -87,6 +104,22 @@ struct dp_netdev { int open_cnt; bool destroyed; +#ifdef THREADED + /* The pipe is used to signal the presence of a packet on the queue. + * - dpif_netdev_recv_wait() waits on p[0] + * - dpif_netdev_recv() extract from queue and read p[0] + * - dp_netdev_output_control() send to queue and write p[1] + */ + + int pipe[2]; /* signal a packet on the queue */ + int rpipe[2]; /* signal a packet on the queue (reverse direction) */ + struct pollfd *pipe_fd; + + pthread_mutex_t table_mutex; /* mutex for the flow table */ + pthread_mutex_t port_list_mutex; /* port list mutex */ + + /* The access to this queue is protected by the table_mutex mutex */ +#endif struct dp_netdev_queue queues[N_QUEUES]; struct hmap flow_table; /* Flow table. */ @@ -107,6 +140,9 @@ struct dp_netdev_port { struct list node; /* Element in dp_netdev's 'port_list'. */ struct netdev *netdev; char *type; /* Port type as requested by user. */ +#ifdef THREADED + struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ +#endif }; /* A flow in dp_netdev's 'flow_table'. */ @@ -132,6 +168,11 @@ struct dpif_netdev { unsigned int dp_serial; }; +#ifdef THREADED +/* XXX global Descriptor of the thread that manages the datapaths. */ +pthread_t thread_p; +#endif + /* All netdev-based datapaths. */ static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); @@ -209,6 +250,34 @@ create_dp_netdev(const char *name, const struct dpif_class *class, dp->class = class; dp->name = xstrdup(name); dp->open_cnt = 0; +#ifdef THREADED + error = pipe(dp->pipe); + if (error) { + VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); + return errno; + } + if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { + VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", + strerror(errno)); + return errno; + } + error = pipe(dp->rpipe); + if (error) { + VLOG_ERR("Unable to create datapath thread reverse pipe: %s", strerror(errno)); + return errno; + } + if (set_nonblocking(dp->rpipe[0]) || set_nonblocking(dp->rpipe[1])) { + VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", + strerror(errno)); + return errno; + } + dp->pipe_fd = NULL; + VLOG_DBG("Datapath thread pipes created (%d, %d) (%d, %d)", + dp->pipe[0], dp->pipe[1], dp->rpipe[0], dp->rpipe[1]); + + pthread_mutex_init(&dp->table_mutex, NULL); + pthread_mutex_init(&dp->port_list_mutex, NULL); +#endif for (i = 0; i < N_QUEUES; i++) { dp->queues[i].head = dp->queues[i].tail = 0; } @@ -226,6 +295,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class, return 0; } +#ifdef THREADED +static void * dp_thread_body(void *args OVS_UNUSED); + +/* This is the function that is called in response of a fatal signal (e.g. + * SIGTERM) */ +static void +dpif_netdev_exit_hook(void *aux OVS_UNUSED) +{ + if (pthread_cancel(thread_p) == 0) { + pthread_join(thread_p, NULL); + } +} + +static int +dpif_netdev_init(void) +{ + static int error = -1; + + if (error < 0) { + fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); + error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); + if (error != 0) { + VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); + error = errno; + } else { + VLOG_DBG("Datapath thread started"); + } + } + return error; +} +#endif + static int dpif_netdev_open(const struct dpif_class *class, const char *name, bool create, struct dpif **dpifp) @@ -252,9 +353,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } *dpifp = create_dpif_netdev(dp); +#ifdef THREADED + dpif_netdev_init(); +#endif return 0; } +/* table_mutex must be locked in THREADED mode. + */ static void dp_netdev_purge_queues(struct dp_netdev *dp) { @@ -276,11 +382,19 @@ dp_netdev_free(struct dp_netdev *dp) struct dp_netdev_port *port, *next; dp_netdev_flow_flush(dp); + LOCK(&dp->port_list_mutex); LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { do_del_port(dp, port->port_no); } + UNLOCK(&dp->port_list_mutex); + LOCK(&dp->table_mutex); dp_netdev_purge_queues(dp); hmap_destroy(&dp->flow_table); +#ifdef THREADED + UNLOCK(&dp->table_mutex); + pthread_mutex_destroy(&dp->table_mutex); + pthread_mutex_destroy(&dp->port_list_mutex); +#endif free(dp->name); free(dp); } @@ -309,7 +423,9 @@ static int dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) { struct dp_netdev *dp = get_dp_netdev(dpif); + LOCK(&dp->table_mutex); stats->n_flows = hmap_count(&dp->flow_table); + UNLOCK(&dp->table_mutex); stats->n_hit = dp->n_hit; stats->n_missed = dp->n_missed; stats->n_lost = dp->n_lost; @@ -357,13 +473,18 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, port->port_no = port_no; port->netdev = netdev; port->type = xstrdup(type); +#ifdef THREADED + port->poll_fd = NULL; +#endif error = netdev_get_mtu(netdev, &mtu); if (!error) { max_mtu = mtu; } + LOCK(&dp->port_list_mutex); list_push_back(&dp->port_list, &port->node); + UNLOCK(&dp->port_list_mutex); dp->ports[port_no] = port; dp->serial++; @@ -432,7 +553,16 @@ static int dpif_netdev_port_del(struct dpif *dpif, uint16_t port_no) { struct dp_netdev *dp = get_dp_netdev(dpif); - return port_no == OVSP_LOCAL ? EINVAL : do_del_port(dp, port_no); + int error; + + if (port_no == OVSP_LOCAL) { + return EINVAL; + } else { + LOCK(&dp->port_list_mutex); + error = do_del_port(dp, port_no); + UNLOCK(&dp->port_list_mutex); + } + return error; } static bool @@ -460,15 +590,19 @@ get_port_by_name(struct dp_netdev *dp, { struct dp_netdev_port *port; + LOCK(&dp->port_list_mutex); LIST_FOR_EACH (port, node, &dp->port_list) { if (!strcmp(netdev_get_name(port->netdev), devname)) { *portp = port; + UNLOCK(&dp->port_list_mutex); return 0; } } + UNLOCK(&dp->port_list_mutex); return ENOENT; } +/* In THREADED mode, must be called with port_list_mutex held. */ static int do_del_port(struct dp_netdev *dp, uint16_t port_no) { @@ -543,7 +677,9 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) static void dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) { + LOCK(&dp->table_mutex); hmap_remove(&dp->flow_table, &flow->node); + UNLOCK(&dp->table_mutex); free(flow->actions); free(flow); } @@ -632,7 +768,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) } static struct dp_netdev_flow * -dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) +#ifdef THREADED +dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key) +#else +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) +#endif { struct dp_netdev_flow *flow; @@ -644,6 +784,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) return NULL; } +#ifdef THREADED +static struct dp_netdev_flow * +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) +{ + struct dp_netdev_flow *flow; + + LOCK(&dp->table_mutex); + flow = dp_netdev_lookup_flow_locked(dp, key); + UNLOCK(&dp->table_mutex); + return flow; +} +#endif + static void get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats) { @@ -740,7 +893,9 @@ dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *key, return error; } + LOCK(&dp->table_mutex); hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); + UNLOCK(&dp->table_mutex); return 0; } @@ -760,6 +915,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) struct dp_netdev_flow *flow; struct flow key; int error; + int n_flows; error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); if (error) { @@ -769,7 +925,10 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) flow = dp_netdev_lookup_flow(dp, &key); if (!flow) { if (put->flags & DPIF_FP_CREATE) { - if (hmap_count(&dp->flow_table) < MAX_FLOWS) { + LOCK(&dp->table_mutex); + n_flows = hmap_count(&dp->flow_table); + UNLOCK(&dp->table_mutex); + if (n_flows < MAX_FLOWS) { if (put->stats) { memset(put->stats, 0, sizeof *put->stats); } @@ -855,7 +1014,9 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, struct dp_netdev_flow *flow; struct hmap_node *node; + LOCK(&dp->table_mutex); node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); + UNLOCK(&dp->table_mutex); if (!node) { return EOF; } @@ -961,7 +1122,13 @@ static int dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf) { - struct dp_netdev_queue *q = find_nonempty_queue(dpif); + struct dp_netdev_queue *q; +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + char c; + LOCK(&dp->table_mutex); +#endif + q = find_nonempty_queue(dpif); if (q) { struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; @@ -971,8 +1138,17 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, ofpbuf_uninit(buf); *buf = u->buf; +#ifdef THREADED + /* Read a byte from the pipe to signal that a packet has been + * received. */ + if (read(dp->pipe[0], &c, 1) < 0) { + VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno)); + } + UNLOCK(&dp->table_mutex); +#endif return 0; } else { + UNLOCK(&dp->table_mutex); return EAGAIN; } } @@ -980,19 +1156,30 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, static void dpif_netdev_recv_wait(struct dpif *dpif) { +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + + poll_fd_wait(dp->pipe[0], POLLIN); +#else if (find_nonempty_queue(dpif)) { poll_immediate_wake(); } else { /* No messages ready to be received, and dp_wait() will ensure that we * wake up to queue new messages, so there is nothing to do. */ } +#endif } static void dpif_netdev_recv_purge(struct dpif *dpif) { struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + LOCK(&dp->table_mutex); +#endif dp_netdev_purge_queues(dpif_netdev->dp); + UNLOCK(&dp->table_mutex); } static void @@ -1016,7 +1203,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, return; } flow_extract(packet, 0, 0, odp_port_to_ofp_port(port->port_no), &key); +#ifdef THREADED + LOCK(&dp->table_mutex); + flow = dp_netdev_lookup_flow_locked(dp, &key); +#else flow = dp_netdev_lookup_flow(dp, &key); +#endif if (flow) { dp_netdev_flow_used(flow, &key, packet); dp_netdev_execute_actions(dp, packet, &key, @@ -1026,8 +1218,20 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, dp->n_missed++; dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0); } + UNLOCK(&dp->table_mutex); } +#ifdef THREADED +static void +dpif_netdev_run(struct dpif *dpif OVS_UNUSED) +{ +} + +static void +dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) +{ +} +#else static void dpif_netdev_run(struct dpif *dpif) { @@ -1066,6 +1270,144 @@ dpif_netdev_wait(struct dpif *dpif) netdev_recv_wait(port->netdev); } } +#endif + +#ifdef THREADED +/* + * pcap callback argument + */ +struct dispatch_arg { + struct dp_netdev *dp; /* update statistics */ + struct dp_netdev_port *port; /* argument to flow identifier function */ +}; + +/* Process a packet. + * + * The port_input function will send immediately if it finds a flow match and + * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. + * If a flow is not found or for the other actions, the packet is copied. + */ +static void +process_pkt(u_char *user, struct ofpbuf *buf) +{ + struct dispatch_arg *arg = (struct dispatch_arg *)user; + + ofpbuf_padto(buf, ETH_TOTAL_MIN); + dp_netdev_port_input(arg->dp, arg->port, buf); +} + +/* Body of the thread that manages the datapaths */ +static void* +dp_thread_body(void *args OVS_UNUSED) +{ + struct dp_netdev *dp; + struct dp_netdev_port *port; + struct dispatch_arg arg; + int error; + int n_fds; + uint32_t batch = 50; /* max number of pkts processed by the dispatch */ + int processed; /* actual number of pkts processed by the dispatch */ + char readbuf[1024]; + + sigset_t sigmask; + + /*XXX Since the poll involves all ports of all datapaths, the right fds + * size should be MAX_PORTS * max_number_of_datapaths */ + struct pollfd fds[MAX_PORTS + 1]; + + /* mask the fatal signals. In this way the main thread is delegate to + * manage this them. */ + sigemptyset(&sigmask); + sigaddset(&sigmask, SIGTERM); + sigaddset(&sigmask, SIGALRM); + sigaddset(&sigmask, SIGINT); + sigaddset(&sigmask, SIGHUP); + + if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { + VLOG_ERR("Error setting thread sigmask: %s", strerror(errno)); + } + + for(;;) { + struct shash_node *node; + n_fds = 0; + /* build the structure for poll */ + SHASH_FOR_EACH(node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + fds[n_fds].fd = dp->rpipe[0]; + fds[n_fds].events = POLLIN; + dp->pipe_fd = &fds[n_fds]; + n_fds++; + if (n_fds >= sizeof(fds) / sizeof(fds[0])) { + VLOG_ERR("Too many fds for poll adding pipe_fd"); + break; + } + LOCK(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + /* insert an element in the fds structure */ + fds[n_fds].fd = netdev_get_fd(port->netdev); + fds[n_fds].events = POLLIN; + port->poll_fd = &fds[n_fds]; + n_fds++; + if (n_fds >= sizeof(fds) / sizeof(fds[0])) { + VLOG_ERR("Too many fds for poll adding port fd"); + break; + } + } + UNLOCK(&dp->port_list_mutex); + } + + error = poll(fds, n_fds, 2000); + VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); + + if (error < 0) { + if (errno == EINTR) { + /* XXX get this case in detach mode */ + continue; + } + VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); + /* XXX terminating the thread is probably not right */ + break; + } + pthread_testcancel(); + + SHASH_FOR_EACH (node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) { + VLOG_DBG("Signalled from main thread"); + while ((error = read(dp->rpipe[0], readbuf, sizeof(readbuf))) > 0) + ; + if (error < 0 && errno != EAGAIN) { + VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno)); + } + } + arg.dp = dp; + LOCK(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + arg.port = port; + if (port->poll_fd) { + VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); + } + if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { + /* call the dispatch and process the packet into + * its callback. We process 'batch' packets at time */ + processed = netdev_dispatch(port->netdev, batch, + process_pkt, (u_char *)&arg); + if (processed < 0) { /* pcap returns error */ + static struct vlog_rate_limit rl = + VLOG_RATE_LIMIT_INIT(1, 5); + VLOG_ERR_RL(&rl, + "error receiving data from XXX \n"); + } + } /* end of if poll */ + } /* end of port loop */ + UNLOCK(&dp->port_list_mutex); + } /* end of dp loop */ + } /* for ;; */ + + return NULL; +} + +#endif /* THREADED */ static void dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) @@ -1081,11 +1423,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet, uint16_t out_port) { struct dp_netdev_port *p = dp->ports[out_port]; + char c = 0; + if (p) { netdev_send(p->netdev, packet); +#ifdef THREADED + if (write(dp->rpipe[1], &c, 1) < 0) { + VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno)); + } +#endif } } +/* In THREADED mode, must be called with table_lock_mutex held. */ static int dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, int queue_no, const struct flow *flow, uint64_t arg) @@ -1095,6 +1445,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, struct dpif_upcall *upcall; struct ofpbuf *buf; size_t key_len; +#ifdef THREADED + char c = 0; +#endif if (q->head - q->tail >= MAX_QUEUE_LEN) { dp->n_lost++; @@ -1118,6 +1471,13 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, upcall->key_len = key_len; upcall->userdata = arg; +#ifdef THREADED + /* Write a byte on the pipe to advertise that a packet is ready. */ + if (write(dp->pipe[1], &c, 1) < 0 && errno != EAGAIN) { + VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno)); + } +#endif + return 0; } @@ -1165,7 +1525,9 @@ dp_netdev_action_userspace(struct dp_netdev *dp, userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0; + LOCK(&dp->table_mutex); dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata); + UNLOCK(&dp->table_mutex); } static void diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c index f8b1188..9efb55d 100644 --- a/lib/netdev-bsd.c +++ b/lib/netdev-bsd.c @@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_) } } +#ifdef THREADED + +struct dispatch_arg { + pkt_handler h; + u_char *user; +}; + +static void +dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata) +{ + struct ofpbuf buf; + struct dispatch_arg *parg = (struct dispatch_arg*)user; + + ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen); + buf.size = phdr->caplen; + (*parg->h)(parg->user, &buf); + ofpbuf_uninit(&buf); +} + +static int +netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + struct dispatch_arg arg; + + arg.h = h; + arg.user = user; + ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg); + return ret; +} + +static int +netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + int i; + const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; + OFPBUF_STACK_BUFFER(buf_, size); + + struct ofpbuf buf; + ofpbuf_use_stub(&buf, buf_, size); + for (i = 0; i < batch; i++) { + ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf)); + if (ret >= 0) { + buf.size += ret; + h(user, &buf); + } else if (ret != -EAGAIN) { + return -1; + } else { /* ret = EAGAIN */ + break; + } + ofpbuf_clear(&buf); + } + ofpbuf_uninit(&buf); + return i; +} + +static int +netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + struct netdev_dev_bsd * netdev_dev = + netdev_dev_bsd_cast(netdev_get_dev(netdev_)); + + if (!strcmp(netdev_get_type(netdev_), "tap") && + netdev->netdev_fd == netdev_dev->tap_fd) { + return netdev_bsd_dispatch_tap(netdev, batch, h, user); + } else { + return netdev_bsd_dispatch_system(netdev, batch, h, user); + } +} + +static int +netdev_bsd_get_fd(struct netdev *netdev_) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + return netdev->netdev_fd; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ static int netdev_bsd_drain(struct netdev *netdev_) @@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, @@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index 6aa4084..91ff8b0 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -20,6 +20,12 @@ #include +#ifdef THREADED +#include +#include +#include "socket-util.h" +#endif + #include "flow.h" #include "list.h" #include "netdev-provider.h" @@ -51,6 +57,10 @@ struct netdev_dummy { struct list node; /* In netdev_dev_dummy's "devs" list. */ struct list recv_queue; bool listening; +#ifdef THREADED + pthread_mutex_t queue_mutex; + int s_pipe[2]; /* used to signal packet arrivals */ +#endif }; static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs); @@ -146,6 +156,13 @@ netdev_dummy_close(struct netdev *netdev_) struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); list_remove(&netdev->node); ofpbuf_list_delete(&netdev->recv_queue); +#ifdef THREADED + if (netdev->listening) { + close(netdev->s_pipe[0]); + close(netdev->s_pipe[1]); + pthread_mutex_destroy(&netdev->queue_mutex); + } +#endif free(netdev); } @@ -153,6 +170,27 @@ static int netdev_dummy_listen(struct netdev *netdev_) { struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); +#ifdef THREADED + int error; + + if (netdev->listening) + return 0; + + error = pipe(netdev->s_pipe); + if (error) { + VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno)); + return errno; + } + if (set_nonblocking(netdev->s_pipe[0]) || + set_nonblocking(netdev->s_pipe[1])) { + VLOG_ERR("Unable to set nonblocking on dummy pipe: %s", + strerror(errno)); + close(netdev->s_pipe[0]); + close(netdev->s_pipe[1]); + return errno; + } + pthread_mutex_init(&netdev->queue_mutex, NULL); +#endif netdev->listening = true; return 0; } @@ -163,12 +201,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size) struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); struct ofpbuf *packet; size_t packet_size; +#ifdef THREADED + char c; +#endif +#ifdef THREADED + pthread_mutex_lock(&netdev->queue_mutex); +#endif if (list_is_empty(&netdev->recv_queue)) { +#ifdef THREADED + pthread_mutex_unlock(&netdev->queue_mutex); +#endif return -EAGAIN; } +#ifdef THREADED + if (read(netdev->s_pipe[0], &c, 1) < 0) { + VLOG_ERR("Error reading dummy pipe: %s", strerror(errno)); + } +#endif packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); +#ifdef THREADED + pthread_mutex_unlock(&netdev->queue_mutex); +#endif if (packet->size > size) { return -EMSGSIZE; } @@ -184,11 +239,60 @@ static void netdev_dummy_recv_wait(struct netdev *netdev_) { struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); - if (!list_is_empty(&netdev->recv_queue)) { + int empty; + +#ifdef THREADED + pthread_mutex_lock(&netdev->queue_mutex); +#endif + empty = list_is_empty(&netdev->recv_queue); +#ifdef THREADED + pthread_mutex_unlock(&netdev->queue_mutex); +#endif + if (!empty) { poll_immediate_wake(); } } +#ifdef THREADED +static int +netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + int i; + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + struct ofpbuf *packet; + VLOG_DBG("dispatch %d", batch); + + for (i = 0; i < batch; i++) { + char c; + if (read(netdev->s_pipe[0], &c, 1) < 0) { + if (errno == EAGAIN) + break; + VLOG_ERR("%s: error reading from the pipe: %s", + netdev_get_name(netdev_), strerror(errno)); + return -1; + } + pthread_mutex_lock(&netdev->queue_mutex); + if (list_is_empty(&netdev->recv_queue)) { + pthread_mutex_unlock(&netdev->queue_mutex); + return -EAGAIN; + } + packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); + pthread_mutex_unlock(&netdev->queue_mutex); + h(user, packet); + ofpbuf_delete(packet); + } + return i; +} + +static int +netdev_dummy_get_fd(struct netdev *netdev_) +{ + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + return netdev->s_pipe[0]; +} +#endif + static int netdev_dummy_drain(struct netdev *netdev_) { @@ -326,6 +430,10 @@ static const struct netdev_class dummy_class = { netdev_dummy_listen, netdev_dummy_recv, netdev_dummy_recv_wait, +#ifdef THREADED + netdev_dummy_dispatch, /* dispatch */ + netdev_dummy_get_fd, /* get_fd */ +#endif netdev_dummy_drain, NULL, /* send */ @@ -417,6 +525,9 @@ netdev_dummy_receive(struct unixctl_conn *conn, struct netdev_dev_dummy *dummy_dev; int n_listeners; int i; +#ifdef THREADED + char c = 0; +#endif dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]); if (!dummy_dev) { @@ -424,6 +535,7 @@ netdev_dummy_receive(struct unixctl_conn *conn, return; } + n_listeners = 0; for (i = 2; i < argc; i++) { struct netdev_dummy *dev; @@ -439,7 +551,16 @@ netdev_dummy_receive(struct unixctl_conn *conn, LIST_FOR_EACH (dev, node, &dummy_dev->devs) { if (dev->listening) { struct ofpbuf *copy = ofpbuf_clone(packet); +#ifdef THREADED + pthread_mutex_lock(&dev->queue_mutex); +#endif list_push_back(&dev->recv_queue, ©->list_node); +#ifdef THREADED + pthread_mutex_unlock(&dev->queue_mutex); + if (write(dev->s_pipe[1], &c, 1) < 0) { + VLOG_ERR("Error writing dummy pipe: %s", strerror(errno)); + } +#endif n_listeners++; } } diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index 412a92d..7d25ce5 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -893,6 +893,43 @@ netdev_linux_recv_wait(struct netdev *netdev_) } } +#ifdef THREADED +static int +netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + int ret; + int i; + const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; + OFPBUF_STACK_BUFFER(buf_, size); + struct ofpbuf buf; + VLOG_DBG("dispatch %d", batch); + + ofpbuf_use_stub(&buf, buf_, size); + for (i = 0; i < batch; i++) { + ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf)); + if (ret >= 0) { + buf.size += ret; + h(user, &buf); + } else if (ret != -EAGAIN) { + return -1; + } else { + break; + } + ofpbuf_clear(&buf); + } + ofpbuf_uninit(&buf); + return i; +} + +static int +netdev_linux_get_fd(struct netdev *netdev_) +{ + struct netdev_linux *netdev = netdev_linux_cast(netdev_); + return netdev->fd; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ static int netdev_linux_drain(struct netdev *netdev_) @@ -2383,6 +2420,12 @@ netdev_linux_change_seq(const struct netdev *netdev) return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq; } +#ifdef THREADED +# define THREADED_METHODS netdev_linux_dispatch, netdev_linux_get_fd, +#else +# define THREADED_METHODS +#endif + #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \ GET_FEATURES, GET_STATUS) \ { \ @@ -2403,6 +2446,7 @@ netdev_linux_change_seq(const struct netdev *netdev) netdev_linux_listen, \ netdev_linux_recv, \ netdev_linux_recv_wait, \ + THREADED_METHODS \ netdev_linux_drain, \ \ netdev_linux_send, \ diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 94f60af..743e41c 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -25,6 +25,9 @@ #include "list.h" #include "shash.h" #include "smap.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -191,6 +194,22 @@ struct netdev_class { * implement packet reception through the 'recv' member function. */ void (*recv_wait)(struct netdev *netdev); +#ifdef THREADED + /* Attempts to receive 'batch' packets from 'netdev' and process them + * through the 'handler' callback. This function is used in the 'THREADED' + * version in order to optimize the forwarding process, since it permits to + * process packets directly in the netdev memory. + * + * Returns the number of packets processed on success; this can be 0 if no + * packets are available to be read. Returns -1 if an error occurred. + */ + int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler, + u_char *user); + + /* Return the file descriptor of the device */ + int (*get_fd)(struct netdev *netdev); +#endif + /* Discards all packets waiting to be received from 'netdev'. * * May be null if not needed, such as for a network device that does not diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c index db5c3db..987636b 100644 --- a/lib/netdev-vport.c +++ b/lib/netdev-vport.c @@ -899,6 +899,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, return 0; } + +#ifdef THREADED +# define THREADED_NULL NULL, NULL, +#else +# define THREADED_NULL +#endif + #define VPORT_FUNCTIONS(GET_STATUS) \ NULL, \ netdev_vport_run, \ @@ -915,6 +922,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, NULL, /* listen */ \ NULL, /* recv */ \ NULL, /* recv_wait */ \ + THREADED_NULL \ NULL, /* drain */ \ \ netdev_vport_send, /* send */ \ diff --git a/lib/netdev.c b/lib/netdev.c index 394d895..3359b4c 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -424,6 +424,28 @@ netdev_recv_wait(struct netdev *netdev) } } +#ifdef THREADED +/* Attempts to receive and process 'batch' packets from 'netdev'. */ +int +netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user) +{ + int (*dispatch)(struct netdev*, int, pkt_handler, u_char *); + + dispatch = netdev_get_dev(netdev)->netdev_class->dispatch; + return dispatch ? dispatch(netdev, batch, h, user) : 0; +} + +/* Returns the file descriptor */ +int +netdev_get_fd(struct netdev *netdev) +{ + int (*get_fd)(struct netdev *); + + get_fd = netdev_get_dev(netdev)->netdev_class->get_fd; + return get_fd ? get_fd(netdev) : 0; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ int netdev_drain(struct netdev *netdev) diff --git a/lib/netdev.h b/lib/netdev.h index d2cc8b5..f55a286 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -21,6 +21,9 @@ #include #include #include "openvswitch/types.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *); int netdev_listen(struct netdev *); int netdev_recv(struct netdev *, struct ofpbuf *); void netdev_recv_wait(struct netdev *); +#ifdef THREADED +int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *); +int netdev_get_fd(struct netdev *); +#endif int netdev_drain(struct netdev *); int netdev_send(struct netdev *, const struct ofpbuf *); diff --git a/lib/vlog.c b/lib/vlog.c index 0bd9bd1..affd09e 100644 --- a/lib/vlog.c +++ b/lib/vlog.c @@ -37,6 +37,9 @@ #include "unixctl.h" #include "util.h" #include "worker.h" +#ifdef THREADED +#include +#endif VLOG_DEFINE_THIS_MODULE(vlog); @@ -92,6 +95,10 @@ static int log_fd = -1; /* vlog initialized? */ static bool vlog_inited; +#ifdef THREADED +static pthread_mutex_t vlog_mutex; +#endif + static void format_log_message(const struct vlog_module *, enum vlog_level, enum vlog_facility, unsigned int msg_num, const char *message, va_list, struct ds *) @@ -492,6 +499,9 @@ vlog_init(void) return; } vlog_inited = true; +#ifdef THREADED + pthread_mutex_init(&vlog_mutex, NULL); +#endif /* openlog() is allowed to keep the pointer passed in, without making a * copy. The daemonize code sometimes frees and replaces 'program_name', @@ -707,6 +717,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, ds_init(&s); ds_reserve(&s, 1024); +#ifdef THREADED + pthread_mutex_lock(&vlog_mutex); +#endif msg_num++; if (log_to_console) { @@ -736,6 +749,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, vlog_write_file(&s); } +#ifdef THREADED + pthread_mutex_unlock(&vlog_mutex); +#endif ds_destroy(&s); errno = save_errno; } diff --git a/m4/openvswitch.m4 b/m4/openvswitch.m4 index 939f296..086a2cf 100644 --- a/m4/openvswitch.m4 +++ b/m4/openvswitch.m4 @@ -14,6 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +dnl Check for --enable-threaded and updates CFLAGS. +AC_DEFUN([OVS_CHECK_THREADED], + [AC_REQUIRE([AC_PROG_CC]) + AC_ARG_ENABLE( + [threaded], + [AC_HELP_STRING([--enable-threaded], + [Enable threaded version of userspace implementation])], + [case "${enableval}" in + (yes) threaded=true ;; + (no) threaded=false ;; + (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;; + esac], + [threaded=false]) + if $threaded; then + AC_DEFINE([THREADED], [1], + [Define to 1 if the threaded version of userspace + implementation is enabled.]) + fi]) + dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately. AC_DEFUN([OVS_CHECK_COVERAGE], [AC_REQUIRE([AC_PROG_CC])