diff --git configure.ac configure.ac index 558b708..3f43fab 100644 --- configure.ac +++ configure.ac @@ -44,6 +44,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) AC_SEARCH_LIBS([timer_create], [rt]) AC_SEARCH_LIBS([pcap_open_live], [pcap]) +OVS_CHECK_THREADED OVS_CHECK_COVERAGE OVS_CHECK_NDEBUG OVS_CHECK_NETLINK diff --git lib/automake.mk lib/automake.mk index 13bbb41..e52ba5e 100644 --- lib/automake.mk +++ lib/automake.mk @@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \ lib/daemon.c \ lib/daemon.h \ lib/dhcp.h \ + lib/dispatch.h \ lib/dummy.c \ lib/dummy.h \ lib/dhparams.h \ diff --git lib/dispatch.h lib/dispatch.h new file mode 100644 index 0000000..1bc1167 --- /dev/null +++ lib/dispatch.h @@ -0,0 +1,16 @@ +#include +#include + +#ifndef DISPATCH_H +#define DISPATCH_H 1 + +struct pkthdr { + struct timeval ts; /* time stamp */ + uint32_t caplen; /* length of portion present */ + uint32_t len; /* length this packet (off wire) */ +}; + +typedef void (*pkt_handler)(u_char *user, const struct pkthdr *h, + const u_char *pkt); + +#endif /* DISPATCH_H */ diff --git lib/dpif-netdev.c lib/dpif-netdev.c index 0ec35c1..e085105 100644 --- lib/dpif-netdev.c +++ lib/dpif-netdev.c @@ -32,6 +32,15 @@ #include #include +#ifdef THREADED +#include +#include + +#include "socket-util.h" +#include "fatal-signal.h" +#include "dispatch.h" +#endif + #include "csum.h" #include "dpif.h" #include "dpif-provider.h" @@ -55,6 +64,16 @@ #include "vlog.h" VLOG_DEFINE_THIS_MODULE(dpif_netdev); +/* We could use these macros instead of using #ifdef and #endif every time we + * need to call the pthread_mutex_lock/unlock. +#ifdef THREADED +#define LOCK(mutex) pthread_mutex_lock(mutex) +#define UNLOCK(mutex) pthread_mutex_unlock(mutex) +#else +#define LOCK(mutex) +#define UNLOCK(mutex) +#endif +*/ /* Configuration parameters. */ enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ @@ -82,6 +101,21 @@ struct dp_netdev { int open_cnt; bool destroyed; +#ifdef THREADED + /* The pipe is used to signal the presence of a packet on the queue. + * - dpif_netdev_recv_wait() waits on p[0] + * - dpif_netdev_recv() extract from queue and read p[0] + * - dp_netdev_output_control() send to queue and write p[1] + */ + + int pipe[2]; /* signal a packet on the queue */ + struct pollfd *pipe_fd; + + pthread_mutex_t table_mutex; /* mutex for the flow table */ + pthread_mutex_t port_list_mutex; /* port list mutex */ + + /* The access to this queue is protected by the table_mutex mutex */ +#endif struct dp_netdev_queue queues[N_QUEUES]; struct hmap flow_table; /* Flow table. */ @@ -102,6 +136,9 @@ struct dp_netdev_port { struct list node; /* Element in dp_netdev's 'port_list'. */ struct netdev *netdev; char *type; /* Port type as requested by user. */ +#ifdef THREADED + struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ +#endif }; /* A flow in dp_netdev's 'flow_table'. */ @@ -127,6 +164,11 @@ struct dpif_netdev { unsigned int dp_serial; }; +#ifdef THREADED +/* XXX global Descriptor of the thread that manages the datapaths. */ +pthread_t thread_p; +#endif + /* All netdev-based datapaths. */ static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); @@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class, dp->class = class; dp->name = xstrdup(name); dp->open_cnt = 0; +#ifdef THREADED + error = pipe(dp->pipe); + if (error) { + VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); + return errno; + } + if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { + VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", + strerror(errno)); + return errno; + } + dp->pipe_fd = NULL; + VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]); + + pthread_mutex_init(&dp->table_mutex, NULL); + pthread_mutex_init(&dp->port_list_mutex, NULL); +#endif for (i = 0; i < N_QUEUES; i++) { dp->queues[i].head = dp->queues[i].tail = 0; } @@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class, return 0; } +#ifdef THREADED +static void * dp_thread_body(void *args OVS_UNUSED); + +/* This is the function that is called in response of a fatal signal (e.g. + * SIGTERM) */ +static void +dpif_netdev_exit_hook(void *aux OVS_UNUSED) +{ + if (pthread_cancel(thread_p) == 0) { + pthread_join(thread_p, NULL); + } +} + +static int +dpif_netdev_init(void) +{ + static int error = -1; + + if (error < 0) { + fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); + error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); + if (error != 0) { + VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); + error = errno; + } else { + VLOG_DBG("Datapath thread started"); + } + } + return error; +} +#endif + static int dpif_netdev_open(const struct dpif_class *class, const char *name, bool create, struct dpif **dpifp) @@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } *dpifp = create_dpif_netdev(dp); +#ifdef THREADED + dpif_netdev_init(); +#endif return 0; } +/* table_mutex must be locked in THREADED mode. + */ static void dp_netdev_purge_queues(struct dp_netdev *dp) { @@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp) struct dp_netdev_port *port, *next; dp_netdev_flow_flush(dp); +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { do_del_port(dp, port->port_no); } +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_purge_queues(dp); hmap_destroy(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); + pthread_mutex_destroy(&dp->table_mutex); + pthread_mutex_destroy(&dp->port_list_mutex); +#endif free(dp->name); free(dp); } @@ -306,7 +414,13 @@ static int dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) { struct dp_netdev *dp = get_dp_netdev(dpif); +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif stats->n_flows = hmap_count(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif stats->n_hit = dp->n_hit; stats->n_missed = dp->n_missed; stats->n_lost = dp->n_lost; @@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, port->port_no = port_no; port->netdev = netdev; port->type = xstrdup(type); +#ifdef THREADED + port->poll_fd = NULL; +#endif error = netdev_get_mtu(netdev, &mtu); if (!error) { max_mtu = mtu; } +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif list_push_back(&dp->port_list, &port->node); +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif dp->ports[port_no] = port; dp->serial++; @@ -420,7 +543,20 @@ static int dpif_netdev_port_del(struct dpif *dpif, uint16_t port_no) { struct dp_netdev *dp = get_dp_netdev(dpif); - return port_no == OVSP_LOCAL ? EINVAL : do_del_port(dp, port_no); + int error; + + if (port_no == OVSP_LOCAL) { + return EINVAL; + } else { +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif + error = do_del_port(dp, port_no); +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif + } + return error; } static bool @@ -448,15 +584,25 @@ get_port_by_name(struct dp_netdev *dp, { struct dp_netdev_port *port; +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif LIST_FOR_EACH (port, node, &dp->port_list) { if (!strcmp(netdev_get_name(port->netdev), devname)) { *portp = port; +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif return 0; } } +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif return ENOENT; } +/* In THREADED mode, must be called with port_list_mutex held. */ static int do_del_port(struct dp_netdev *dp, uint16_t port_no) { @@ -531,7 +677,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) static void dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) { +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif hmap_remove(&dp->flow_table, &flow->node); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif free(flow->actions); free(flow); } @@ -620,7 +772,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) } static struct dp_netdev_flow * -dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) +#ifdef THREADED +dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key) +#else +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) +#endif { struct dp_netdev_flow *flow; @@ -632,6 +788,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) return NULL; } +#ifdef THREADED +static struct dp_netdev_flow * +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) +{ + struct dp_netdev_flow *flow; + + pthread_mutex_lock(&dp->table_mutex); + flow = dp_netdev_lookup_flow_locked(dp, key); + pthread_mutex_unlock(&dp->table_mutex); + return flow; +} +#endif + static void get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats) { @@ -729,7 +898,13 @@ add_flow(struct dpif *dpif, const struct flow *key, return error; } +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } @@ -749,6 +924,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) struct dp_netdev_flow *flow; struct flow key; int error; + int n_flows; error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); if (error) { @@ -758,7 +934,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) flow = dp_netdev_lookup_flow(dp, &key); if (!flow) { if (put->flags & DPIF_FP_CREATE) { - if (hmap_count(&dp->flow_table) < MAX_FLOWS) { +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif + n_flows = hmap_count(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif + if (n_flows < MAX_FLOWS) { if (put->stats) { memset(put->stats, 0, sizeof *put->stats); } @@ -845,7 +1028,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, struct dp_netdev_flow *flow; struct hmap_node *node; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif if (!node) { return EOF; } @@ -950,14 +1139,31 @@ find_nonempty_queue(struct dpif *dpif) static int dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall) { - struct dp_netdev_queue *q = find_nonempty_queue(dpif); + struct dp_netdev_queue *q; +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + char c; + pthread_mutex_lock(&dp->table_mutex); +#endif + q = find_nonempty_queue(dpif); if (q) { struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK]; *upcall = *u; free(u); +#ifdef THREADED + /* Read a byte from the pipe to signal that a packet has been + * received. */ + if (read(dp->pipe[0], &c, 1) < 0) { + VLOG_ERR("Error reading from the pipe: %s", strerror(errno)); + } + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } else { +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return EAGAIN; } } @@ -965,19 +1171,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall) static void dpif_netdev_recv_wait(struct dpif *dpif) { +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + + poll_fd_wait(dp->pipe[0], POLLIN); +#else if (find_nonempty_queue(dpif)) { poll_immediate_wake(); } else { /* No messages ready to be received, and dp_wait() will ensure that we * wake up to queue new messages, so there is nothing to do. */ } +#endif } static void dpif_netdev_recv_purge(struct dpif *dpif) { struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_purge_queues(dpif_netdev->dp); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif } static void @@ -1001,7 +1220,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, return; } flow_extract(packet, 0, 0, port->port_no, &key); +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); + flow = dp_netdev_lookup_flow_locked(dp, &key); +#else flow = dp_netdev_lookup_flow(dp, &key); +#endif if (flow) { dp_netdev_flow_used(flow, &key, packet); dp_netdev_execute_actions(dp, packet, &key, @@ -1011,9 +1235,23 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, dp->n_missed++; dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0); } +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif +} + +#ifdef THREADED +static void +dpif_netdev_run(struct dpif *dpif OVS_UNUSED) +{ } static void +dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) +{ +} +#else +static void dpif_netdev_run(struct dpif *dpif) { struct dp_netdev *dp = get_dp_netdev(dpif); @@ -1051,6 +1289,156 @@ dpif_netdev_wait(struct dpif *dpif) netdev_recv_wait(port->netdev); } } +#endif + +#ifdef THREADED +/* + * pcap callback argument + */ +struct dispatch_arg { + struct dp_netdev *dp; /* update statistics */ + struct dp_netdev_port *port; /* argument to flow identifier function */ + struct ofpbuf buf; /* used to process the packet */ +}; + +/* Process a packet. + * + * The port_input function will send immediately if it finds a flow match and + * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. + * If a flow is not found or for the other actions, the packet is copied. + */ +static void +process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet) +{ + struct dispatch_arg *arg = (struct dispatch_arg *)arg_p; + struct ofpbuf *buf = &arg->buf; + + /* set packet size and data pointer */ + buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or + * hdr->caplen */ + buf->data = (void*)packet; + + dp_netdev_port_input(arg->dp, arg->port, buf); + + return; +} + +/* Body of the thread that manages the datapaths */ +static void* +dp_thread_body(void *args OVS_UNUSED) +{ + struct dp_netdev *dp; + struct dp_netdev_port *port; + struct dispatch_arg arg; + int error; + int n_fds; + uint32_t batch = 50; /* max number of pkts processed by the dispatch */ + int processed; /* actual number of pkts processed by the dispatch */ + char readbuf[1024]; + + sigset_t sigmask; + + /*XXX Since the poll involves all ports of all datapaths, the right fds + * size should be MAX_PORTS * max_number_of_datapaths */ + struct pollfd fds[MAX_PORTS + 1]; + + /* mask the fatal signals. In this way the main thread is delegate to + * manage this them. */ + sigemptyset(&sigmask); + sigaddset(&sigmask, SIGTERM); + sigaddset(&sigmask, SIGALRM); + sigaddset(&sigmask, SIGINT); + sigaddset(&sigmask, SIGHUP); + + if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { + VLOG_ERR("Error setting thread sigmask: %s", strerror(errno)); + } + + ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu); + for(;;) { + struct shash_node *node; + n_fds = 0; + /* build the structure for poll */ + SHASH_FOR_EACH(node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + fds[n_fds].fd = dp->pipe[1]; + fds[n_fds].events = POLLIN; + dp->pipe_fd = &fds[n_fds]; + n_fds++; + if (n_fds >= sizeof(fds) / sizeof(fds[0])) { + VLOG_ERR("Too many fds for poll adding pipe_fd"); + break; + } + pthread_mutex_lock(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + /* insert an element in the fds structure */ + fds[n_fds].fd = netdev_get_fd(port->netdev); + fds[n_fds].events = POLLIN; + port->poll_fd = &fds[n_fds]; + n_fds++; + if (n_fds >= sizeof(fds) / sizeof(fds[0])) { + VLOG_ERR("Too many fds for poll adding port fd"); + break; + } + } + pthread_mutex_unlock(&dp->port_list_mutex); + } + + error = poll(fds, n_fds, 2000); + VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); + + if (error < 0) { + if (errno == EINTR) { + /* XXX get this case in detach mode */ + continue; + } + VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); + /* XXX terminating the thread is probably not right */ + break; + } + pthread_testcancel(); + + SHASH_FOR_EACH (node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) { + VLOG_DBG("Signalled from main thread"); + while ( (error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0) + ; + if (error < 0) { + VLOG_ERR("Error reading from the pipe: %s", strerror(errno)); + } + } + arg.dp = dp; + pthread_mutex_lock(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + arg.port = port; + arg.buf.size = 0; + arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM; + if (port->poll_fd) { + VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); + } + if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { + /* call the dispatch and process the packet into + * its callback. We process 'batch' packets at time */ + processed = netdev_dispatch(port->netdev, batch, + process_pkt, (u_char *)&arg); + if (processed < 0) { /* pcap returns error */ + static struct vlog_rate_limit rl = + VLOG_RATE_LIMIT_INIT(1, 5); + VLOG_ERR_RL(&rl, + "error receiving data from XXX \n"); + } + } /* end of if poll */ + } /* end of port loop */ + pthread_mutex_unlock(&dp->port_list_mutex); + } /* end of dp loop */ + } /* for ;; */ + + ofpbuf_uninit(&arg.buf); + return NULL; +} + +#endif /* THREADED */ static void dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) @@ -1066,11 +1454,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet, uint16_t out_port) { struct dp_netdev_port *p = dp->ports[out_port]; + char c = 0; + if (p) { netdev_send(p->netdev, packet); +#ifdef THREADED + if (write(dp->pipe[0], &c, 1) < 0) { + VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno)); + } +#endif } } +/* In THREADED mode, must be called with table_lock_mutex held. */ static int dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, int queue_no, const struct flow *flow, uint64_t arg) @@ -1079,6 +1475,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, struct dpif_upcall *upcall; struct ofpbuf *buf; size_t key_len; +#ifdef THREADED + char c = 0; +#endif if (q->head - q->tail >= MAX_QUEUE_LEN) { dp->n_lost++; @@ -1100,6 +1499,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, upcall->userdata = arg; q->upcalls[q->head++ & QUEUE_MASK] = upcall; +#ifdef THREADED + /* Write a byte on the pipe to advertise that a packet is ready. */ + if (write(dp->pipe[1], &c, 1) < 0) { + VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno)); + } +#endif return 0; } @@ -1148,7 +1553,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp, userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif } static void diff --git lib/netdev-bsd.c lib/netdev-bsd.c index 17d7f72..b399d75 100644 --- lib/netdev-bsd.c +++ lib/netdev-bsd.c @@ -667,6 +667,66 @@ netdev_bsd_recv_wait(struct netdev *netdev_) } } +#ifdef THREADED +static int +netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + + ret = pcap_dispatch(netdev->pcap_handle, batch, (pcap_handler)h , user); + return ret; +} + +static int +netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + int i; + u_char buf[VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX]; + struct pkthdr hdr; + + for (i = 0; i < batch; i++) { + ret = netdev_bsd_recv_tap(netdev, buf, sizeof(buf)); + if (ret >= 0) { + /* XXX hdr.len should be set to the effective length of the packet */ + hdr.caplen = ret; + hdr.len = ret; + h(user, &hdr, buf); + } else if (ret != -EAGAIN) { + return -1; + } else { /* ret = EAGAIN */ + break; + } + } + return i; +} + +static int +netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + struct netdev_dev_bsd * netdev_dev = + netdev_dev_bsd_cast(netdev_get_dev(netdev_)); + + if (!strcmp(netdev_get_type(netdev_), "tap") && + netdev->netdev_fd == netdev_dev->tap_fd) { + return netdev_bsd_dispatch_tap(netdev, batch, h, user); + } else { + return netdev_bsd_dispatch_system(netdev, batch, h, user); + } +} + +static int +netdev_bsd_get_fd(struct netdev *netdev_) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + return netdev->netdev_fd; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ static int netdev_bsd_drain(struct netdev *netdev_) @@ -1263,6 +1323,10 @@ const struct netdev_class netdev_bsd_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, @@ -1323,6 +1387,10 @@ const struct netdev_class netdev_tap_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, diff --git lib/netdev-dummy.c lib/netdev-dummy.c index 5c6563c..ec91f73 100644 --- lib/netdev-dummy.c +++ lib/netdev-dummy.c @@ -316,6 +316,10 @@ static const struct netdev_class dummy_class = { netdev_dummy_listen, netdev_dummy_recv, netdev_dummy_recv_wait, +#ifdef THREADED + NULL, /* dispatch */ + NULL, /* get_fd */ +#endif netdev_dummy_drain, NULL, /* send */ diff --git lib/netdev-linux.c lib/netdev-linux.c index ae5eb0a..3c9c961 100644 --- lib/netdev-linux.c +++ lib/netdev-linux.c @@ -2366,6 +2366,12 @@ netdev_linux_change_seq(const struct netdev *netdev) return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq; } +#ifdef THREADED +#define THREADED_NULL NULL, NULL, +#else +#define THREADED_NULL +#endif + #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \ GET_FEATURES, GET_STATUS) \ { \ @@ -2386,6 +2392,7 @@ netdev_linux_change_seq(const struct netdev *netdev) netdev_linux_listen, \ netdev_linux_recv, \ netdev_linux_recv_wait, \ + THREADED_NULL \ netdev_linux_drain, \ \ netdev_linux_send, \ diff --git lib/netdev-provider.h lib/netdev-provider.h index fc7c9d2..dc323d0 100644 --- lib/netdev-provider.h +++ lib/netdev-provider.h @@ -24,6 +24,9 @@ #include "netdev.h" #include "list.h" #include "shash.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -190,6 +193,22 @@ struct netdev_class { * implement packet reception through the 'recv' member function. */ void (*recv_wait)(struct netdev *netdev); +#ifdef THREADED + /* Attempts to receive 'batch' packets from 'netdev' and process them + * through the 'handler' callback. This function is used in the 'THREADED' + * version in order to optimize the forwarding process, since it permits to + * process packets directly in the netdev memory. + * + * Returns the number of packets processed on success; this can be 0 if no + * packets are available to be read. Returns -1 if an error occurred. + */ + int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler, + u_char *user); + + /* Return the file descriptor of the device */ + int (*get_fd)(struct netdev *netdev); +#endif + /* Discards all packets waiting to be received from 'netdev'. * * May be null if not needed, such as for a network device that does not diff --git lib/netdev-vport.c lib/netdev-vport.c index 55209e4..5f428c3 100644 --- lib/netdev-vport.c +++ lib/netdev-vport.c @@ -889,6 +889,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, return 0; } + +#ifdef THREADED +# define THREADED_NULL NULL, NULL, +#else +# define THREADED_NULL +#endif + #define VPORT_FUNCTIONS(GET_STATUS) \ NULL, \ netdev_vport_run, \ @@ -905,6 +912,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, NULL, /* listen */ \ NULL, /* recv */ \ NULL, /* recv_wait */ \ + THREADED_NULL \ NULL, /* drain */ \ \ netdev_vport_send, /* send */ \ diff --git lib/netdev.c lib/netdev.c index 5d2da4b..c61d82a 100644 --- lib/netdev.c +++ lib/netdev.c @@ -423,6 +423,28 @@ netdev_recv_wait(struct netdev *netdev) } } +#ifdef THREADED +/* Attempts to receive and process 'batch' packets from 'netdev'. */ +int +netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user) +{ + int (*dispatch)(struct netdev*, int, pkt_handler, u_char *); + + dispatch = netdev_get_dev(netdev)->netdev_class->dispatch; + return dispatch ? dispatch(netdev, batch, h, user) : 0; +} + +/* Returns the file descriptor */ +int +netdev_get_fd(struct netdev *netdev) +{ + int (*get_fd)(struct netdev *); + + get_fd = netdev_get_dev(netdev)->netdev_class->get_fd; + return get_fd ? get_fd(netdev) : 0; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ int netdev_drain(struct netdev *netdev) diff --git lib/netdev.h lib/netdev.h index 24a9b64..49ecb63 100644 --- lib/netdev.h +++ lib/netdev.h @@ -21,6 +21,9 @@ #include #include #include "openvswitch/types.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *); int netdev_listen(struct netdev *); int netdev_recv(struct netdev *, struct ofpbuf *); void netdev_recv_wait(struct netdev *); +#ifdef THREADED +int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *); +int netdev_get_fd(struct netdev *); +#endif int netdev_drain(struct netdev *); int netdev_send(struct netdev *, const struct ofpbuf *); diff --git lib/route-table-bsd.c lib/route-table-bsd.c index c145091..1c29071 100644 --- lib/route-table-bsd.c +++ lib/route-table-bsd.c @@ -29,6 +29,8 @@ #include #include +#include "vlog.h" + VLOG_DEFINE_THIS_MODULE(route_table); static int pid; diff --git lib/vlog.c lib/vlog.c index 6dd5ce8..8262c63 100644 --- lib/vlog.c +++ lib/vlog.c @@ -34,6 +34,9 @@ #include "timeval.h" #include "unixctl.h" #include "util.h" +#ifdef THREADED +#include +#endif VLOG_DEFINE_THIS_MODULE(vlog); @@ -89,6 +92,10 @@ static FILE *log_file; /* vlog initialized? */ static bool vlog_inited; +#ifdef THREADED +static pthread_mutex_t vlog_mutex; +#endif + static void format_log_message(const struct vlog_module *, enum vlog_level, enum vlog_facility, unsigned int msg_num, const char *message, va_list, struct ds *) @@ -475,6 +482,9 @@ vlog_init(void) return; } vlog_inited = true; +#ifdef THREADED + pthread_mutex_init(&vlog_mutex, NULL); +#endif /* openlog() is allowed to keep the pointer passed in, without making a * copy. The daemonize code sometimes frees and replaces 'program_name', @@ -682,6 +692,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, ds_init(&s); ds_reserve(&s, 1024); +#ifdef THREADED + pthread_mutex_lock(&vlog_mutex); +#endif msg_num++; if (log_to_console) { @@ -712,6 +725,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, fflush(log_file); } +#ifdef THREADED + pthread_mutex_unlock(&vlog_mutex); +#endif ds_destroy(&s); errno = save_errno; } diff --git m4/openvswitch.m4 m4/openvswitch.m4 index 9b2a5ba..6eb312f 100644 --- m4/openvswitch.m4 +++ m4/openvswitch.m4 @@ -14,6 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +dnl Check for --enable-threaded and updates CFLAGS. +AC_DEFUN([OVS_CHECK_THREADED], + [AC_REQUIRE([AC_PROG_CC]) + AC_ARG_ENABLE( + [threaded], + [AC_HELP_STRING([--enable-threaded], + [Enable threaded version of userspace implementation])], + [case "${enableval}" in + (yes) threaded=true ;; + (no) threaded=false ;; + (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;; + esac], + [threaded=false]) + if $threaded; then + AC_DEFINE([THREADED], [1], + [Define to 1 if the threaded version of userspace + implementation is enabled.]) + fi]) + dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately. AC_DEFUN([OVS_CHECK_COVERAGE], [AC_REQUIRE([AC_PROG_CC])