diff --git a/configure.ac b/configure.ac index 5bcf27b..b327949 100644 --- a/configure.ac +++ b/configure.ac @@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) AC_SEARCH_LIBS([timer_create], [rt]) AC_SEARCH_LIBS([pcap_open_live], [pcap]) +OVS_CHECK_THREADED OVS_CHECK_COVERAGE OVS_CHECK_NDEBUG OVS_CHECK_NETLINK diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index cade79e..304a4dc 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -32,6 +32,15 @@ #include #include +#ifdef THREADED +#include +#include + +#include "socket-util.h" +#include "fatal-signal.h" +#include "dispatch.h" +#endif + #include "csum.h" #include "dpif.h" #include "dpif-provider.h" @@ -55,6 +64,16 @@ #include "vlog.h" VLOG_DEFINE_THIS_MODULE(dpif_netdev); +/* We could use these macros instead of using #ifdef and #endif every time we + * need to call the pthread_mutex_lock/unlock. +#ifdef THREADED +#define LOCK(mutex) pthread_mutex_lock(mutex) +#define UNLOCK(mutex) pthread_mutex_unlock(mutex) +#else +#define LOCK(mutex) +#define UNLOCK(mutex) +#endif +*/ /* Configuration parameters. */ enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ @@ -82,6 +101,20 @@ struct dp_netdev { int open_cnt; bool destroyed; +#ifdef THREADED + /* The pipe is used to signal the presence of a packet on the queue. + * - dpif_netdev_recv_wait() waits on p[0] + * - dpif_netdev_recv() extract from queue and read p[0] + * - dp_netdev_output_control() send to queue and write p[1] + */ + + int pipe[2]; /* signal a packet on the queue */ + + pthread_mutex_t table_mutex; /* mutex for the flow table */ + pthread_mutex_t port_list_mutex; /* port list mutex */ + + /* The access to this queue is protected by the table_mutex mutex */ +#endif struct dp_netdev_queue queues[N_QUEUES]; struct hmap flow_table; /* Flow table. */ @@ -102,6 +135,9 @@ struct dp_netdev_port { struct list node; /* Element in dp_netdev's 'port_list'. */ struct netdev *netdev; char *type; /* Port type as requested by user. */ +#ifdef THREADED + struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ +#endif }; /* A flow in dp_netdev's 'flow_table'. */ @@ -127,6 +163,11 @@ struct dpif_netdev { unsigned int dp_serial; }; +#ifdef THREADED +/* XXX global Descriptor of the thread that manages the datapaths. */ +pthread_t thread_p; +#endif + /* All netdev-based datapaths. */ static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); @@ -204,6 +245,22 @@ create_dp_netdev(const char *name, const struct dpif_class *class, dp->class = class; dp->name = xstrdup(name); dp->open_cnt = 0; +#ifdef THREADED + error = pipe(dp->pipe); + if (error) { + VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); + return errno; + } + if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { + VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", + strerror(errno)); + return errno; + } + VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]); + + pthread_mutex_init(&dp->table_mutex, NULL); + pthread_mutex_init(&dp->port_list_mutex, NULL); +#endif for (i = 0; i < N_QUEUES; i++) { dp->queues[i].head = dp->queues[i].tail = 0; } @@ -221,6 +278,39 @@ create_dp_netdev(const char *name, const struct dpif_class *class, return 0; } +#ifdef THREADED +static void * dp_thread_body(void *args OVS_UNUSED); + +/* This is the function that is called in response of a fatal signal (e.g. + * SIGTERM) */ +static void +dpif_netdev_exit_hook(void *aux OVS_UNUSED) +{ + if (pthread_cancel(thread_p) == 0) { + pthread_join(thread_p, NULL); + } +} + +static int +thread_start(void) +{ + int error; + static int started = 0; + if (started == 0) { + fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); + error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); + if (error != 0) { + VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); + return errno; + } else { + VLOG_DBG("Datapath thread started"); + } + started = 1; + } + return 0; +} +#endif + static int dpif_netdev_open(const struct dpif_class *class, const char *name, bool create, struct dpif **dpifp) @@ -247,9 +337,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } *dpifp = create_dpif_netdev(dp); +#ifdef THREADED + thread_start(); +#endif return 0; } +/* table_mutex must be locked in THREADED mode. + */ static void dp_netdev_purge_queues(struct dp_netdev *dp) { @@ -273,11 +368,23 @@ dp_netdev_free(struct dp_netdev *dp) struct dp_netdev_port *port, *next; dp_netdev_flow_flush(dp); +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { do_del_port(dp, port->port_no); } +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_purge_queues(dp); hmap_destroy(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); + pthread_mutex_destroy(&dp->table_mutex); + pthread_mutex_destroy(&dp->port_list_mutex); +#endif free(dp->name); free(dp); } @@ -306,7 +413,13 @@ static int dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) { struct dp_netdev *dp = get_dp_netdev(dpif); +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif stats->n_flows = hmap_count(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif stats->n_hit = dp->n_hit; stats->n_missed = dp->n_missed; stats->n_lost = dp->n_lost; @@ -354,13 +467,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, port->port_no = port_no; port->netdev = netdev; port->type = xstrdup(type); +#ifdef THREADED + port->poll_fd = NULL; +#endif error = netdev_get_mtu(netdev, &mtu); if (!error) { max_mtu = mtu; } +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif list_push_back(&dp->port_list, &port->node); +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif dp->ports[port_no] = port; dp->serial++; @@ -448,12 +570,21 @@ get_port_by_name(struct dp_netdev *dp, { struct dp_netdev_port *port; +#ifdef THREADED + pthread_mutex_lock(&dp->port_list_mutex); +#endif LIST_FOR_EACH (port, node, &dp->port_list) { if (!strcmp(netdev_get_name(port->netdev), devname)) { *portp = port; +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif return 0; } } +#ifdef THREADED + pthread_mutex_unlock(&dp->port_list_mutex); +#endif return ENOENT; } @@ -464,6 +595,7 @@ do_del_port(struct dp_netdev *dp, uint16_t port_no) char *name; int error; + /* XXX why no semaphores?? */ error = get_port_by_number(dp, port_no, &port); if (error) { return error; @@ -531,7 +663,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) static void dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) { +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif hmap_remove(&dp->flow_table, &flow->node); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif free(flow->actions); free(flow); } @@ -620,15 +758,24 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) } static struct dp_netdev_flow * -dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) +dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) { struct dp_netdev_flow *flow; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif HMAP_FOR_EACH_WITH_HASH (flow, node, flow_hash(key, 0), &dp->flow_table) { if (flow_equal(&flow->key, key)) { +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return flow; } } +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return NULL; } @@ -729,7 +876,13 @@ add_flow(struct dpif *dpif, const struct flow *key, return error; } +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } @@ -749,6 +902,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) struct dp_netdev_flow *flow; struct flow key; int error; + int n_flows; error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); if (error) { @@ -758,7 +912,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) flow = dp_netdev_lookup_flow(dp, &key); if (!flow) { if (put->flags & DPIF_FP_CREATE) { - if (hmap_count(&dp->flow_table) < MAX_FLOWS) { +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif + n_flows = hmap_count(&dp->flow_table); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif + if (n_flows < MAX_FLOWS) { if (put->stats) { memset(put->stats, 0, sizeof *put->stats); } @@ -843,7 +1004,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, struct dp_netdev_flow *flow; struct hmap_node *node; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif if (!node) { return EOF; } @@ -949,7 +1116,13 @@ static int dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf) { - struct dp_netdev_queue *q = find_nonempty_queue(dpif); + struct dp_netdev_queue *q; +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + char c; + pthread_mutex_lock(&dp->table_mutex); +#endif + q = find_nonempty_queue(dpif); if (q) { struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK]; *upcall = *u; @@ -958,8 +1131,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, ofpbuf_uninit(buf); *buf = *upcall->packet; +#ifdef THREADED + /* Read a byte from the pipe to signal that a packet has been + * received. */ + if (read(dp->pipe[0], &c, 1) < 0) { + VLOG_ERR("Error reading from the pipe: %s", strerror(errno)); + } + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } else { +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif return EAGAIN; } } @@ -967,19 +1151,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, static void dpif_netdev_recv_wait(struct dpif *dpif) { +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + + poll_fd_wait(dp->pipe[0], POLLIN); +#else if (find_nonempty_queue(dpif)) { poll_immediate_wake(); } else { /* No messages ready to be received, and dp_wait() will ensure that we * wake up to queue new messages, so there is nothing to do. */ } +#endif } static void dpif_netdev_recv_purge(struct dpif *dpif) { struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); +#ifdef THREADED + struct dp_netdev *dp = get_dp_netdev(dpif); + pthread_mutex_lock(&dp->table_mutex); +#endif dp_netdev_purge_queues(dpif_netdev->dp); +#ifdef THREADED + pthread_mutex_unlock(&dp->table_mutex); +#endif } static void @@ -1015,6 +1212,17 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, } } +#ifdef THREADED +static void +dpif_netdev_run(struct dpif *dpif OVS_UNUSED) +{ +} + +static void +dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) +{ +} +#else static void dpif_netdev_run(struct dpif *dpif) { @@ -1053,6 +1261,135 @@ dpif_netdev_wait(struct dpif *dpif) netdev_recv_wait(port->netdev); } } +#endif + +#ifdef THREADED +/* + * pcap callback argument + */ +struct dispatch_arg { + struct dp_netdev *dp; /* update statistics */ + struct dp_netdev_port *port; /* argument to flow identifier function */ + struct ofpbuf buf; /* used to process the packet */ +}; + +/* Process a packet. + * + * The port_input function will send immediately if it finds a flow match and + * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. + * If a flow is not found or for the other actions, the packet is copied. + */ +static void +process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet) +{ + struct dispatch_arg *arg = (struct dispatch_arg *)arg_p; + struct ofpbuf *buf = &arg->buf; + + /* set packet size and data pointer */ + buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or + * hdr->caplen */ + buf->data = (void*)packet; + + dp_netdev_port_input(arg->dp, arg->port, buf); + + return; +} + +/* Body of the thread that manages the datapaths */ +static void* +dp_thread_body(void *args OVS_UNUSED) +{ + struct dp_netdev *dp; + struct dp_netdev_port *port; + struct dispatch_arg arg; + int error; + int n_fds; + uint32_t batch = 50; /* max number of pkts processed by the dispatch */ + int processed; /* actual number of pkts processed by the dispatch */ + + sigset_t sigmask; + + /*XXX Since the poll involves all ports of all datapaths, the right fds + * size should be MAX_PORTS * max_number_of_datapaths */ + struct pollfd fds[MAX_PORTS]; + + /* mask the fatal signals. In this way the main thread is delegate to + * manage this them. */ + sigemptyset(&sigmask); + sigaddset(&sigmask, SIGTERM); + sigaddset(&sigmask, SIGALRM); + sigaddset(&sigmask, SIGINT); + sigaddset(&sigmask, SIGHUP); + + if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { + VLOG_ERR("Error setting thread sigmask: %s", errno); + } + + ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu); + for(;;) { + struct shash_node *node; + n_fds = 0; + /* build the structure for poll */ + SHASH_FOR_EACH(node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + pthread_mutex_lock(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + /* insert an element in the fds structure */ + fds[n_fds].fd = netdev_get_fd(port->netdev); + fds[n_fds].events = POLLIN; + port->poll_fd = &fds[n_fds]; + n_fds++; + } + pthread_mutex_unlock(&dp->port_list_mutex); + } + + error = poll(fds, n_fds, 2000); + VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); + + if (error < 0) { + if (errno == EINTR) { + /* XXX get this case in detach mode */ + continue; + } + VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); + /* XXX terminating the thread is probably not right */ + break; + } + pthread_testcancel(); + + SHASH_FOR_EACH (node, &dp_netdevs) { + dp = (struct dp_netdev *)node->data; + arg.dp = dp; + pthread_mutex_lock(&dp->port_list_mutex); + LIST_FOR_EACH (port, node, &dp->port_list) { + arg.port = port; + arg.buf.size = 0; + arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM; + if (port->poll_fd) { + VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); + } + if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { + /* call the dispatch and process the packet into + * its callback. We process 'batch' packets at time */ + processed = netdev_dispatch(port->netdev, batch, + process_pkt, (u_char *)&arg); + if (processed < 0) { /* pcap returns error */ + static struct vlog_rate_limit rl = + VLOG_RATE_LIMIT_INIT(1, 5); + VLOG_ERR_RL(&rl, + "error receiving data from XXX \n"); + } + } /* end of if poll */ + } /* end of port loop */ + pthread_mutex_unlock(&dp->port_list_mutex); + } /* end of dp loop */ + } /* for ;; */ + + ofpbuf_uninit(&arg.buf); + return NULL; +} + +#endif /* THREADED */ static void dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) @@ -1081,6 +1418,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, struct dpif_upcall *upcall; struct ofpbuf *buf; size_t key_len; +#ifdef THREADED + char c; +#endif if (q->head - q->tail >= MAX_QUEUE_LEN) { dp->n_lost++; @@ -1101,7 +1441,17 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, upcall->key_len = key_len; upcall->userdata = arg; +#ifdef THREADED + pthread_mutex_lock(&dp->table_mutex); +#endif q->upcalls[q->head++ & QUEUE_MASK] = upcall; +#ifdef THREADED + /* Write a byte on the pipe to advertise that a packet is ready. */ + if (write(dp->pipe[1], &c, 1) < 0) { + VLOG_ERR("Error writing on the pipe: %s", strerror(errno)); + } + pthread_mutex_unlock(&dp->table_mutex); +#endif return 0; } diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c index db844be..24ba1ce 100644 --- a/lib/netdev-bsd.c +++ b/lib/netdev-bsd.c @@ -662,6 +662,66 @@ netdev_bsd_recv_wait(struct netdev *netdev_) } } +#ifdef THREADED +static int +netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + + ret = pcap_dispatch(netdev->pcap_handle, batch, (pcap_handler)h , user); + return ret; +} + +static int +netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, + u_char *user) +{ + int ret; + int i; + u_char buf[VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX]; + struct pkthdr hdr; + + for (i = 0; i < batch; i++) { + ret = netdev_bsd_recv_tap(netdev, buf, sizeof(buf)); + if (ret >= 0) { + /* XXX hdr.len should be set to the effective length of the packet */ + hdr.caplen = ret; + hdr.len = ret; + h(user, &hdr, buf); + } else if (ret != -EAGAIN) { + return -1; + } else { /* ret = EAGAIN */ + break; + } + } + return i; +} + +static int +netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, + u_char *user) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + struct netdev_dev_bsd * netdev_dev = + netdev_dev_bsd_cast(netdev_get_dev(netdev_)); + + if (!strcmp(netdev_get_type(netdev_), "tap") && + netdev->netdev_fd == netdev_dev->tap_fd) { + return netdev_bsd_dispatch_tap(netdev, batch, h, user); + } else { + return netdev_bsd_dispatch_system(netdev, batch, h, user); + } +} + +static int +netdev_bsd_get_fd(struct netdev *netdev_) +{ + struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); + return netdev->netdev_fd; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ static int netdev_bsd_drain(struct netdev *netdev_) @@ -1261,6 +1321,10 @@ const struct netdev_class netdev_bsd_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, @@ -1321,6 +1385,10 @@ const struct netdev_class netdev_tap_class = { netdev_bsd_recv, netdev_bsd_recv_wait, +#ifdef THREADED + netdev_bsd_dispatch, + netdev_bsd_get_fd, +#endif netdev_bsd_drain, netdev_bsd_send, diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index b8c23c5..4394f83 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -316,6 +316,10 @@ static const struct netdev_class dummy_class = { netdev_dummy_listen, netdev_dummy_recv, netdev_dummy_recv_wait, +#ifdef THREADED + NULL, /* dispatch */ + NULL, /* get_fd */ +#endif netdev_dummy_drain, NULL, /* send */ diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index efce9a7..09f5a08 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -2383,6 +2383,12 @@ netdev_linux_change_seq(const struct netdev *netdev) return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq; } +#ifdef THREADED +#define THREADED_NULL NULL, NULL, +#else +#define THREADED_NULL +#endif + #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \ GET_FEATURES, GET_STATUS) \ { \ @@ -2403,6 +2409,7 @@ netdev_linux_change_seq(const struct netdev *netdev) netdev_linux_listen, \ netdev_linux_recv, \ netdev_linux_recv_wait, \ + THREADED_NULL, \ netdev_linux_drain, \ \ netdev_linux_send, \ diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 94f60af..743e41c 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -25,6 +25,9 @@ #include "list.h" #include "shash.h" #include "smap.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -191,6 +194,22 @@ struct netdev_class { * implement packet reception through the 'recv' member function. */ void (*recv_wait)(struct netdev *netdev); +#ifdef THREADED + /* Attempts to receive 'batch' packets from 'netdev' and process them + * through the 'handler' callback. This function is used in the 'THREADED' + * version in order to optimize the forwarding process, since it permits to + * process packets directly in the netdev memory. + * + * Returns the number of packets processed on success; this can be 0 if no + * packets are available to be read. Returns -1 if an error occurred. + */ + int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler, + u_char *user); + + /* Return the file descriptor of the device */ + int (*get_fd)(struct netdev *netdev); +#endif + /* Discards all packets waiting to be received from 'netdev'. * * May be null if not needed, such as for a network device that does not diff --git a/lib/netdev.c b/lib/netdev.c index ac98cb5..a128820 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -424,6 +424,28 @@ netdev_recv_wait(struct netdev *netdev) } } +#ifdef THREADED +/* Attempts to receive and process 'batch' packets from 'netdev'. */ +int +netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user) +{ + int (*dispatch)(struct netdev*, int, pkt_handler, u_char *); + + dispatch = netdev_get_dev(netdev)->netdev_class->dispatch; + return dispatch ? dispatch(netdev, batch, h, user) : 0; +} + +/* Returns the file descriptor */ +int +netdev_get_fd(struct netdev *netdev) +{ + int (*get_fd)(struct netdev *); + + get_fd = netdev_get_dev(netdev)->netdev_class->get_fd; + return get_fd ? get_fd(netdev) : 0; +} +#endif + /* Discards all packets waiting to be received from 'netdev'. */ int netdev_drain(struct netdev *netdev) diff --git a/lib/netdev.h b/lib/netdev.h index d2cc8b5..f55a286 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -21,6 +21,9 @@ #include #include #include "openvswitch/types.h" +#ifdef THREADED +#include "dispatch.h" +#endif #ifdef __cplusplus extern "C" { @@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *); int netdev_listen(struct netdev *); int netdev_recv(struct netdev *, struct ofpbuf *); void netdev_recv_wait(struct netdev *); +#ifdef THREADED +int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *); +int netdev_get_fd(struct netdev *); +#endif int netdev_drain(struct netdev *); int netdev_send(struct netdev *, const struct ofpbuf *); diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c index c145091..1c29071 100644 --- a/lib/route-table-bsd.c +++ b/lib/route-table-bsd.c @@ -29,6 +29,8 @@ #include #include +#include "vlog.h" + VLOG_DEFINE_THIS_MODULE(route_table); static int pid; diff --git a/m4/openvswitch.m4 b/m4/openvswitch.m4 index dca9f5f..9de733b 100644 --- a/m4/openvswitch.m4 +++ b/m4/openvswitch.m4 @@ -14,6 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +dnl Check for --enable-threaded and updates CFLAGS. +AC_DEFUN([OVS_CHECK_THREADED], + [AC_REQUIRE([AC_PROG_CC]) + AC_ARG_ENABLE( + [threaded], + [AC_HELP_STRING([--enable-threaded], + [Enable threaded version of userspace implementation])], + [case "${enableval}" in + (yes) coverage=true ;; + (no) coverage=false ;; + (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;; + esac], + [threaded=false]) + if $threaded; then + AC_DEFINE([THREADED], [1], + [Define to 1 if the threaded version of userspace + implementation is enabled.]) + fi]) + dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately. AC_DEFUN([OVS_CHECK_COVERAGE], [AC_REQUIRE([AC_PROG_CC])