diff --git configure.ac configure.ac index 9bdffea..a70232f 100644 --- configure.ac +++ configure.ac @@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) AC_SEARCH_LIBS([timer_create], [rt]) AC_SEARCH_LIBS([pcap_open_live], [pcap]) +OVS_CHECK_THREADED OVS_CHECK_COVERAGE OVS_CHECK_NDEBUG OVS_CHECK_NETLINK diff --git lib/automake.mk lib/automake.mk index 8180517..f24ce5b 100644 --- lib/automake.mk +++ lib/automake.mk @@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \ lib/daemon.c \ lib/daemon.h \ lib/dhcp.h \ + lib/dispatch.h \ lib/dummy.c \ lib/dummy.h \ lib/dhparams.h \ diff --git lib/dispatch.h lib/dispatch.h new file mode 100644 index 0000000..80ac9c7 --- /dev/null +++ lib/dispatch.h @@ -0,0 +1,9 @@ +#include +#include "ofpbuf.h" + +#ifndef DISPATCH_H +#define DISPATCH_H 1 + +typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf); + +#endif /* DISPATCH_H */ diff --git lib/dpif-netdev.c lib/dpif-netdev.c index 60fae5f..e1c3b69 100644 --- lib/dpif-netdev.c +++ lib/dpif-netdev.c @@ -32,6 +32,15 @@ #include #include +#ifdef THREADED +#include +#include + +#include "socket-util.h" +#include "fatal-signal.h" +#include "dispatch.h" +#endif + #include "csum.h" #include "dpif.h" #include "dpif-provider.h" @@ -55,6 +64,16 @@ #include "vlog.h" VLOG_DEFINE_THIS_MODULE(dpif_netdev); +/* We could use these macros instead of using #ifdef and #endif every time we + * need to call the pthread_mutex_lock/unlock. +#ifdef THREADED +#define LOCK(mutex) pthread_mutex_lock(mutex) +#define UNLOCK(mutex) pthread_mutex_unlock(mutex) +#else +#define LOCK(mutex) +#define UNLOCK(mutex) +#endif +*/ /* Configuration parameters. */ enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ @@ -82,6 +101,21 @@ struct dp_netdev { int open_cnt; bool destroyed; +#ifdef THREADED + /* The pipe is used to signal the presence of a packet on the queue. + * - dpif_netdev_recv_wait() waits on p[0] + * - dpif_netdev_recv() extract from queue and read p[0] + * - dp_netdev_output_control() send to queue and write p[1] + */ + + int pipe[2]; /* signal a packet on the queue */ + struct pollfd *pipe_fd; + + pthread_mutex_t table_mutex; /* mutex for the flow table */ + pthread_mutex_t port_list_mutex; /* port list mutex */ + + /* The access to this queue is protected by the table_mutex mutex */ +#endif struct dp_netdev_queue queues[N_QUEUES]; struct hmap flow_table; /* Flow table. */ @@ -102,6 +136,9 @@ struct dp_netdev_port { struct list node; /* Element in dp_netdev's 'port_list'. */ struct netdev *netdev; char *type; /* Port type as requested by user. */ +#ifdef THREADED + struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ +#endif }; /* A flow in dp_netdev's 'flow_table'. */ @@ -127,6 +164,11 @@ struct dpif_netdev { unsigned int dp_serial; }; +#ifdef THREADED +/* XXX global Descriptor of the thread that manages the datapaths. */ +pthread_t thread_p; +#endif + /* All netdev-based datapaths. */ static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); @@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class, dp->class = class; dp->name = xstrdup(name); dp->open_cnt = 0; +#ifdef THREADED + error = pipe(dp->pipe); + if (error) { + VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); + return errno; + } + if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { + VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", + strerror(errno)); + return errno; + } + dp->pipe_fd = NULL; + VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]); + + pthread_mutex_init(&dp->table_mutex, NULL); + pthread_mutex_init(&dp->port_list_mutex, NULL); +#endif for (i = 0; i < N_QUEUES; i++) { dp->queues[i].head = dp->queues[i].tail = 0; } @@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class, return 0; } +#ifdef THREADED +static void * dp_thread_body(void *args OVS_UNUSED); + +/* This is the function that is called in response of a fatal signal (e.g. + * SIGTERM) */ +static void +dpif_netdev_exit_hook(void *aux OVS_UNUSED) +{ + if (pthread_cancel(thread_p) == 0) { + pthread_join(thread_p, NULL); + } +} + +static int +dpif_netdev_init(void) +{ + static int error = -1; + + if (error < 0) { + fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); + error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); + if (error != 0) { + VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); + error = errno; + } else { + VLOG_DBG("Datapath thread started"); + } + } + return error; +} +#endif + static int dpif_netdev_open(const struct dpif_class *class, const char *name, bool create, struct dpif **dpifp) @@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } *dpifp = create_dpif_netdev(dp); +#ifdef THREADED + dpif_netdev_init(); +#endif return 0; } +/* table_mutex must be locked in THREADED mode. + */ static void dp_netdev_purge_queues(struct dp_netdev *dp) { @@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp) struct dp_netdev_port *port, *next; dp_netdev_flow_flush(dp); +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { do_del_port(dp, port->port_no); } +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_purge_queues(dp); hmap_destroy(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); + pthread_mutex_destroy(&dp->table_mutex); + pthread_mutex_destroy(&dp->port_list_mutex); +#endif free(dp->name); free(dp); } @@ -306,7 +414,13 @@ static int dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) { struct dp_netdev *dp = get_dp_netdev(dpif); +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif stats->n_flows = hmap_count(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif stats->n_hit = dp->n_hit; stats->n_missed = dp->n_missed; stats->n_lost = dp->n_lost; @@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, port->port_no = port_no; port->netdev = netdev; port->type = xstrdup(type); +#ifdef THREADED + port->poll_fd = NULL; +#endif error = netdev_get_mtu(netdev, &mtu); if (!error) { max_mtu = mtu; } +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif list_push_back(&dp->port_list, &port->node); +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif dp->ports[port_no] = port; dp->serial++; @@ -429,7 +552,20 @@ static int dpif_netdev_port_del(struct dpif *dpif, uint16_t port_no) { struct dp_netdev *dp = get_dp_netdev(dpif); - return port_no == OVSP_LOCAL ? EINVAL : do_del_port(dp, port_no); + int error; + + if (port_no == OVSP_LOCAL) { + return EINVAL; + } else { +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif + error = do_del_port(dp, port_no); +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif + } + return error; } static bool @@ -457,15 +593,25 @@ get_port_by_name(struct dp_netdev *dp, { struct dp_netdev_port *port; +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif LIST_FOR_EACH (port, node, &dp->port_list) { if (!strcmp(netdev_get_name(port->netdev), devname)) { *portp = port; +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif return 0; } } +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif return ENOENT; } +/* In THREADED mode, must be called with port_list_mutex held. */ static int do_del_port(struct dp_netdev *dp, uint16_t port_no) { @@ -540,7 +686,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) static void dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) { +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif hmap_remove(&dp->flow_table, &flow->node); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif free(flow->actions); free(flow); } @@ -629,7 +781,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) } static struct dp_netdev_flow * -dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) +#ifdef THREADED +dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key) +#else +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) +#endif { struct dp_netdev_flow *flow; @@ -641,6 +797,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) return NULL; } +#ifdef THREADED +static struct dp_netdev_flow * +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) +{ + struct dp_netdev_flow *flow; + + pthread_mutex_lock(&dp->table_mutex); + flow = dp_netdev_lookup_flow_locked(dp, key); + pthread_mutex_unlock(&dp->table_mutex); + return flow; +} +#endif + static void get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats) { @@ -737,7 +906,13 @@ dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *key, return error; } +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } @@ -757,6 +932,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) struct dp_netdev_flow *flow; struct flow key; int error; + int n_flows; error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); if (error) { @@ -766,7 +942,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) flow = dp_netdev_lookup_flow(dp, &key); if (!flow) { if (put->flags & DPIF_FP_CREATE) { - if (hmap_count(&dp->flow_table) < MAX_FLOWS) { +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif + n_flows = hmap_count(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif + if (n_flows < MAX_FLOWS) { if (put->stats) { memset(put->stats, 0, sizeof *put->stats); } @@ -852,7 +1035,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, struct dp_netdev_flow *flow; struct hmap_node *node; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif if (!node) { return EOF; } @@ -958,7 +1147,13 @@ static int dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf) { - struct dp_netdev_queue *q = find_nonempty_queue(dpif); + struct dp_netdev_queue *q; +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + char c; + pthread_mutex_lock(&dp->table_mutex); +#endif + q = find_nonempty_queue(dpif); if (q) { struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK]; *upcall = *u; @@ -967,8 +1162,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, ofpbuf_uninit(buf); *buf = *upcall->packet; +#ifdef THREADED + /* Read a byte from the pipe to signal that a packet has been + * received. */ + if (read(dp->pipe[0], &c, 1) < 0) { + VLOG_ERR("Error reading from the pipe: %s", strerror(errno)); + } + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } else { +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return EAGAIN; } } @@ -976,19 +1182,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, static void dpif_netdev_recv_wait(struct dpif *dpif) { +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + + poll_fd_wait(dp->pipe[0], POLLIN); +#else if (find_nonempty_queue(dpif)) { poll_immediate_wake(); } else { /* No messages ready to be received, and dp_wait() will ensure that we * wake up to queue new messages, so there is nothing to do. */ } +#endif } static void dpif_netdev_recv_purge(struct dpif *dpif) { struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_purge_queues(dpif_netdev->dp); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif } static void @@ -1012,7 +1231,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, return; } flow_extract(packet, 0, 0, port->port_no, &key); +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); + flow = dp_netdev_lookup_flow_locked(dp, &key); +#else flow = dp_netdev_lookup_flow(dp, &key); +#endif if (flow) { dp_netdev_flow_used(flow, &key, packet); dp_netdev_execute_actions(dp, packet, &key, @@ -1022,8 +1246,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, dp->n_missed++; dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0); } +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif } +#ifdef THREADED +static void +dpif_netdev_run(struct dpif *dpif OVS_UNUSED) +{ +} + +static void +dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) +{ +} +#else static void dpif_netdev_run(struct dpif *dpif) { @@ -1062,6 +1300,144 @@ dpif_netdev_wait(struct dpif *dpif) netdev_recv_wait(port->netdev); } } +#endif + +#ifdef THREADED +/* + * pcap callback argument + */ +struct dispatch_arg { + struct dp_netdev *dp; /* update statistics */ + struct dp_netdev_port *port; /* argument to flow identifier function */ +}; + +/* Process a packet. + * + * The port_input function will send immediately if it finds a flow match and + * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. + * If a flow is not found or for the other actions, the packet is copied. + */ +static void +process_pkt(u_char *user, struct ofpbuf *buf) +{ + struct dispatch_arg *arg = (struct dispatch_arg *)user; + + ofpbuf_padto(buf, ETH_TOTAL_MIN); + dp_netdev_port_input(arg->dp, arg->port, buf); +} + +/* Body of the thread that manages the datapaths */ +static void* +dp_thread_body(void *args OVS_UNUSED) +{ + struct dp_netdev *dp; + struct dp_netdev_port *port; + struct dispatch_arg arg; + int error; + int n_fds; + uint32_t batch = 50; /* max number of pkts processed by the dispatch */ + int processed; /* actual number of pkts processed by the dispatch */ + char readbuf[1024]; + + sigset_t sigmask; + + /*XXX Since the poll involves all ports of all datapaths, the right fds + * size should be MAX_PORTS * max_number_of_datapaths */ + struct pollfd fds[MAX_PORTS + 1]; + + /* mask the fatal signals. In this way the main thread is delegate to + * manage this them. */ + sigemptyset(&sigmask); + sigaddset(&sigmask, SIGTERM); + sigaddset(&sigmask, SIGALRM); + sigaddset(&sigmask, SIGINT); + sigaddset(&sigmask, SIGHUP); + + if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { + VLOG_ERR("Error setting thread sigmask: %s", strerror(errno)); + } + + for(;;) { + struct shash_node *node; + n_fds = 0; + /* build the structure for poll */ + SHASH_FOR_EACH(node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + fds[n_fds].fd = dp->pipe[1]; + fds[n_fds].events = POLLIN; + dp->pipe_fd = &fds[n_fds]; + n_fds++; + if (n_fds >= sizeof(fds) / sizeof(fds[0])) { + VLOG_ERR("Too many fds for poll adding pipe_fd"); + break; + } + pthread_mutex_lock(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + /* insert an element in the fds structure */ + fds[n_fds].fd = netdev_get_fd(port->netdev); + fds[n_fds].events = POLLIN; + port->poll_fd = &fds[n_fds]; + n_fds++; + if (n_fds >= sizeof(fds) / sizeof(fds[0])) { + VLOG_ERR("Too many fds for poll adding port fd"); + break; + } + } + pthread_mutex_unlock(&dp->port_list_mutex); + } + + error = poll(fds, n_fds, 2000); + VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); + + if (error < 0) { + if (errno == EINTR) { + /* XXX get this case in detach mode */ + continue; + } + VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); + /* XXX terminating the thread is probably not right */ + break; + } + pthread_testcancel(); + + SHASH_FOR_EACH (node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) { + VLOG_DBG("Signalled from main thread"); + while ( (error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0) + ; + if (error < 0) { + VLOG_ERR("Error reading from the pipe: %s", strerror(errno)); + } + } + arg.dp = dp; + pthread_mutex_lock(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + arg.port = port; + if (port->poll_fd) { + VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); + } + if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { + /* call the dispatch and process the packet into + * its callback. We process 'batch' packets at time */ + processed = netdev_dispatch(port->netdev, batch, + process_pkt, (u_char *)&arg); + if (processed < 0) { /* pcap returns error */ + static struct vlog_rate_limit rl = + VLOG_RATE_LIMIT_INIT(1, 5); + VLOG_ERR_RL(&rl, + "error receiving data from XXX \n"); + } + } /* end of if poll */ + } /* end of port loop */ + pthread_mutex_unlock(&dp->port_list_mutex); + } /* end of dp loop */ + } /* for ;; */ + + return NULL; +} + +#endif /* THREADED */ static void dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) @@ -1077,11 +1453,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet, uint16_t out_port) { struct dp_netdev_port *p = dp->ports[out_port]; + char c = 0; + if (p) { netdev_send(p->netdev, packet); +#ifdef THREADED + if (write(dp->pipe[0], &c, 1) < 0) { + VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno)); + } +#endif } } +/* In THREADED mode, must be called with table_lock_mutex held. */ static int dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, int queue_no, const struct flow *flow, uint64_t arg) @@ -1090,6 +1474,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, struct dpif_upcall *upcall; struct ofpbuf *buf; size_t key_len; +#ifdef THREADED + char c = 0; +#endif if (q->head - q->tail >= MAX_QUEUE_LEN) { dp->n_lost++; @@ -1111,6 +1498,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, upcall->userdata = arg; q->upcalls[q->head++ & QUEUE_MASK] = upcall; +#ifdef THREADED + /* Write a byte on the pipe to advertise that a packet is ready. */ + if (write(dp->pipe[1], &c, 1) < 0) { + VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno)); + } +#endif return 0; } @@ -1159,7 +1552,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp, userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif } static void diff --git lib/netdev-bsd.c lib/netdev-bsd.c index f8b1188..eb4281f 100644 --- lib/netdev-bsd.c +++ lib/netdev-bsd.c @@ -667,6 +667,88 @@ netdev_bsd_recv_wait(struct netdev *netdev_) } } +#ifdef THREADED + +struct dispatch_arg { + pkt_handler h; + u_char *user; +}; + +static void +dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata) +{ + struct ofpbuf buf; + struct dispatch_arg *parg = (struct dispatch_arg*)user; + + ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen); + (*parg->h)(parg->user, &buf); + ofpbuf_uninit(&buf); +} + +static int +netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + struct dispatch_arg arg; + + arg.h = h; + arg.user = user; + ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg); + return ret; +} + +static int +netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + int i; + const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; + OFPBUF_STACK_BUFFER(buf_, size); + + struct ofpbuf buf; + ofpbuf_use_stub(&buf, buf_, size); + for (i = 0; i < batch; i++) { + ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf)); + if (ret >= 0) { + buf.size += ret; + h(user, &buf); + } else if (ret != -EAGAIN) { + return -1; + } else { /* ret = EAGAIN */ + break; + } + ofpbuf_clear(&buf); + } + ofpbuf_uninit(&buf); + return i; +} + +static int +netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + struct netdev_dev_bsd * netdev_dev = + netdev_dev_bsd_cast(netdev_get_dev(netdev_)); + + if (!strcmp(netdev_get_type(netdev_), "tap") && + netdev->netdev_fd == netdev_dev->tap_fd) { + return netdev_bsd_dispatch_tap(netdev, batch, h, user); + } else { + return netdev_bsd_dispatch_system(netdev, batch, h, user); + } +} + +static int +netdev_bsd_get_fd(struct netdev *netdev_) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + return netdev->netdev_fd; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ static int netdev_bsd_drain(struct netdev *netdev_) @@ -1263,6 +1345,10 @@ const struct netdev_class netdev_bsd_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, @@ -1323,6 +1409,10 @@ const struct netdev_class netdev_tap_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, diff --git lib/netdev-dummy.c lib/netdev-dummy.c index b8c23c5..4e4801c 100644 --- lib/netdev-dummy.c +++ lib/netdev-dummy.c @@ -20,6 +20,12 @@ #include +#ifdef THREADED +#include +#include +#include "socket-util.h" +#endif + #include "flow.h" #include "list.h" #include "netdev-provider.h" @@ -51,6 +57,10 @@ struct netdev_dummy { struct list node; /* In netdev_dev_dummy's "devs" list. */ struct list recv_queue; bool listening; +#ifdef THREADED + pthread_mutex_t queue_mutex; + int s_pipe[2]; /* used to signal packet arrivals */ +#endif }; static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs); @@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp) { struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_); struct netdev_dummy *netdev; +#ifdef THREADED + int error; +#endif netdev = xmalloc(sizeof *netdev); netdev_init(&netdev->netdev, netdev_dev_); list_init(&netdev->recv_queue); netdev->listening = false; +#ifdef THREADED + error = pipe(netdev->s_pipe); + if (error) { + VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno)); + free(netdev); + return errno; + } + if (set_nonblocking(netdev->s_pipe[0]) || + set_nonblocking(netdev->s_pipe[1])) { + VLOG_ERR("Unable to set nonblocking on dummy pipe: %s", + strerror(errno)); + free(netdev); + return errno; + } + pthread_mutex_init(&netdev->queue_mutex, NULL); +#endif *netdevp = &netdev->netdev; list_push_back(&netdev_dev->devs, &netdev->node); @@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_) struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); list_remove(&netdev->node); ofpbuf_list_delete(&netdev->recv_queue); +#ifdef THREADED + if (netdev->listening) { + close(netdev->s_pipe[0]); + close(netdev->s_pipe[1]); + } + pthread_mutex_destroy(&netdev->queue_mutex); +#endif free(netdev); } @@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size) struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); struct ofpbuf *packet; size_t packet_size; +#ifdef THREADED + char c; +#endif +#ifdef THREADED + pthread_mutex_lock(&netdev->queue_mutex); +#endif if (list_is_empty(&netdev->recv_queue)) { +#ifdef THREADED + pthread_mutex_unlock(&netdev->queue_mutex); +#endif return -EAGAIN; } +#ifdef THREADED + if (read(netdev->s_pipe[0], &c, 1) < 0) { + VLOG_ERR("Error reading dummy pipe: %s", strerror(errno)); + } +#endif packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); +#ifdef THREADED + pthread_mutex_unlock(&netdev->queue_mutex); +#endif if (packet->size > size) { return -EMSGSIZE; } @@ -179,11 +232,60 @@ static void netdev_dummy_recv_wait(struct netdev *netdev_) { struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); - if (!list_is_empty(&netdev->recv_queue)) { + int empty; + +#ifdef THREADED + pthread_mutex_lock(&netdev->queue_mutex); +#endif + empty = list_is_empty(&netdev->recv_queue); +#ifdef THREADED + pthread_mutex_unlock(&netdev->queue_mutex); +#endif + if (!empty) { poll_immediate_wake(); } } +#ifdef THREADED +static int +netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + int i; + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + struct ofpbuf *packet; + VLOG_DBG("dispatch %d", batch); + + for (i = 0; i < batch; i++) { + char c; + if (read(netdev->s_pipe[0], &c, 1) < 0) { + if (errno == EAGAIN) + break; + VLOG_ERR("%s: error reading from the pipe: %s", + netdev_get_name(netdev_), strerror(errno)); + return -1; + } + pthread_mutex_lock(&netdev->queue_mutex); + if (list_is_empty(&netdev->recv_queue)) { + pthread_mutex_unlock(&netdev->queue_mutex); + return -EAGAIN; + } + packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); + pthread_mutex_unlock(&netdev->queue_mutex); + h(user, packet); + ofpbuf_delete(packet); + } + return i; +} + +static int +netdev_dummy_get_fd(struct netdev *netdev_) +{ + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + return netdev->s_pipe[0]; +} +#endif + static int netdev_dummy_drain(struct netdev *netdev_) { @@ -316,6 +418,10 @@ static const struct netdev_class dummy_class = { netdev_dummy_listen, netdev_dummy_recv, netdev_dummy_recv_wait, +#ifdef THREADED + netdev_dummy_dispatch, /* dispatch */ + netdev_dummy_get_fd, /* get_fd */ +#endif netdev_dummy_drain, NULL, /* send */ @@ -407,6 +513,9 @@ netdev_dummy_receive(struct unixctl_conn *conn, struct netdev_dev_dummy *dummy_dev; int n_listeners; int i; +#ifdef THREADED + char c = 0; +#endif dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]); if (!dummy_dev) { @@ -414,6 +523,7 @@ netdev_dummy_receive(struct unixctl_conn *conn, return; } + n_listeners = 0; for (i = 2; i < argc; i++) { struct netdev_dummy *dev; @@ -429,7 +539,16 @@ netdev_dummy_receive(struct unixctl_conn *conn, LIST_FOR_EACH (dev, node, &dummy_dev->devs) { if (dev->listening) { struct ofpbuf *copy = ofpbuf_clone(packet); +#ifdef THREADED + pthread_mutex_lock(&dev->queue_mutex); +#endif list_push_back(&dev->recv_queue, ©->list_node); +#ifdef THREADED + pthread_mutex_unlock(&dev->queue_mutex); + if (write(dev->s_pipe[1], &c, 1) < 0) { + VLOG_ERR("Error writing dummy pipe: %s", strerror(errno)); + } +#endif n_listeners++; } } diff --git lib/netdev-linux.c lib/netdev-linux.c index 412a92d..aba2164 100644 --- lib/netdev-linux.c +++ lib/netdev-linux.c @@ -893,6 +893,43 @@ netdev_linux_recv_wait(struct netdev *netdev_) } } +#ifdef THREADED +static int +netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + int ret; + int i; + const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; + OFPBUF_STACK_BUFFER(buf_, size); + struct ofpbuf buf; + VLOG_DBG("dispatch %d", batch); + + ofpbuf_use_stub(&buf, buf_, size); + for (i = 0; i < batch; i++) { + ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf)); + if (ret >= 0) { + buf.size += ret; + h(user, &buf); + } else if (ret != -EAGAIN) { + return -1; + } else { + break; + } + ofpbuf_clear(&buf); + } + ofpbuf_uninit(&buf); + return i; +} + +static int +netdev_linux_get_fd(struct netdev *netdev_) +{ + struct netdev_linux *netdev = netdev_linux_cast(netdev_); + return netdev->fd; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ static int netdev_linux_drain(struct netdev *netdev_) @@ -2383,6 +2420,12 @@ netdev_linux_change_seq(const struct netdev *netdev) return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq; } +#ifdef THREADED +#define THREADED_NULL NULL, NULL, +#else +#define THREADED_NULL +#endif + #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \ GET_FEATURES, GET_STATUS) \ { \ @@ -2403,6 +2446,7 @@ netdev_linux_change_seq(const struct netdev *netdev) netdev_linux_listen, \ netdev_linux_recv, \ netdev_linux_recv_wait, \ + THREADED_NULL \ netdev_linux_drain, \ \ netdev_linux_send, \ diff --git lib/netdev-provider.h lib/netdev-provider.h index 94f60af..743e41c 100644 --- lib/netdev-provider.h +++ lib/netdev-provider.h @@ -25,6 +25,9 @@ #include "list.h" #include "shash.h" #include "smap.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -191,6 +194,22 @@ struct netdev_class { * implement packet reception through the 'recv' member function. */ void (*recv_wait)(struct netdev *netdev); +#ifdef THREADED + /* Attempts to receive 'batch' packets from 'netdev' and process them + * through the 'handler' callback. This function is used in the 'THREADED' + * version in order to optimize the forwarding process, since it permits to + * process packets directly in the netdev memory. + * + * Returns the number of packets processed on success; this can be 0 if no + * packets are available to be read. Returns -1 if an error occurred. + */ + int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler, + u_char *user); + + /* Return the file descriptor of the device */ + int (*get_fd)(struct netdev *netdev); +#endif + /* Discards all packets waiting to be received from 'netdev'. * * May be null if not needed, such as for a network device that does not diff --git lib/netdev-vport.c lib/netdev-vport.c index db5c3db..987636b 100644 --- lib/netdev-vport.c +++ lib/netdev-vport.c @@ -899,6 +899,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, return 0; } + +#ifdef THREADED +# define THREADED_NULL NULL, NULL, +#else +# define THREADED_NULL +#endif + #define VPORT_FUNCTIONS(GET_STATUS) \ NULL, \ netdev_vport_run, \ @@ -915,6 +922,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, NULL, /* listen */ \ NULL, /* recv */ \ NULL, /* recv_wait */ \ + THREADED_NULL \ NULL, /* drain */ \ \ netdev_vport_send, /* send */ \ diff --git lib/netdev.c lib/netdev.c index 394d895..3359b4c 100644 --- lib/netdev.c +++ lib/netdev.c @@ -424,6 +424,28 @@ netdev_recv_wait(struct netdev *netdev) } } +#ifdef THREADED +/* Attempts to receive and process 'batch' packets from 'netdev'. */ +int +netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user) +{ + int (*dispatch)(struct netdev*, int, pkt_handler, u_char *); + + dispatch = netdev_get_dev(netdev)->netdev_class->dispatch; + return dispatch ? dispatch(netdev, batch, h, user) : 0; +} + +/* Returns the file descriptor */ +int +netdev_get_fd(struct netdev *netdev) +{ + int (*get_fd)(struct netdev *); + + get_fd = netdev_get_dev(netdev)->netdev_class->get_fd; + return get_fd ? get_fd(netdev) : 0; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ int netdev_drain(struct netdev *netdev) diff --git lib/netdev.h lib/netdev.h index d2cc8b5..f55a286 100644 --- lib/netdev.h +++ lib/netdev.h @@ -21,6 +21,9 @@ #include #include #include "openvswitch/types.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *); int netdev_listen(struct netdev *); int netdev_recv(struct netdev *, struct ofpbuf *); void netdev_recv_wait(struct netdev *); +#ifdef THREADED +int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *); +int netdev_get_fd(struct netdev *); +#endif int netdev_drain(struct netdev *); int netdev_send(struct netdev *, const struct ofpbuf *); diff --git lib/vlog.c lib/vlog.c index 0bd9bd1..affd09e 100644 --- lib/vlog.c +++ lib/vlog.c @@ -37,6 +37,9 @@ #include "unixctl.h" #include "util.h" #include "worker.h" +#ifdef THREADED +#include +#endif VLOG_DEFINE_THIS_MODULE(vlog); @@ -92,6 +95,10 @@ static int log_fd = -1; /* vlog initialized? */ static bool vlog_inited; +#ifdef THREADED +static pthread_mutex_t vlog_mutex; +#endif + static void format_log_message(const struct vlog_module *, enum vlog_level, enum vlog_facility, unsigned int msg_num, const char *message, va_list, struct ds *) @@ -492,6 +499,9 @@ vlog_init(void) return; } vlog_inited = true; +#ifdef THREADED + pthread_mutex_init(&vlog_mutex, NULL); +#endif /* openlog() is allowed to keep the pointer passed in, without making a * copy. The daemonize code sometimes frees and replaces 'program_name', @@ -707,6 +717,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, ds_init(&s); ds_reserve(&s, 1024); +#ifdef THREADED + pthread_mutex_lock(&vlog_mutex); +#endif msg_num++; if (log_to_console) { @@ -736,6 +749,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, vlog_write_file(&s); } +#ifdef THREADED + pthread_mutex_unlock(&vlog_mutex); +#endif ds_destroy(&s); errno = save_errno; } diff --git m4/openvswitch.m4 m4/openvswitch.m4 index 939f296..086a2cf 100644 --- m4/openvswitch.m4 +++ m4/openvswitch.m4 @@ -14,6 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +dnl Check for --enable-threaded and updates CFLAGS. +AC_DEFUN([OVS_CHECK_THREADED], + [AC_REQUIRE([AC_PROG_CC]) + AC_ARG_ENABLE( + [threaded], + [AC_HELP_STRING([--enable-threaded], + [Enable threaded version of userspace implementation])], + [case "${enableval}" in + (yes) threaded=true ;; + (no) threaded=false ;; + (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;; + esac], + [threaded=false]) + if $threaded; then + AC_DEFINE([THREADED], [1], + [Define to 1 if the threaded version of userspace + implementation is enabled.]) + fi]) + dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately. AC_DEFUN([OVS_CHECK_COVERAGE], [AC_REQUIRE([AC_PROG_CC])