Index: jdk/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java =================================================================== --- jdk/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java (revision 0) +++ jdk/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java (revision 16) @@ -0,0 +1,17 @@ +package sun.nio.ch; + +import java.io.IOException; +import java.nio.channels.*; +import java.nio.channels.spi.*; + +public class KqueueSelectorProvider + extends SelectorProviderImpl +{ + public AbstractSelector openSelector() throws IOException { + return new KqueueSelectorImpl(this); + } + + public Channel inheritedChannel() throws IOException { + return InheritedChannel.getChannel(); + } +} Index: jdk/src/solaris/native/sun/nio/ch/KqueuePort.c =================================================================== --- jdk/src/solaris/native/sun/nio/ch/KqueuePort.c (revision 0) +++ jdk/src/solaris/native/sun/nio/ch/KqueuePort.c (revision 16) @@ -0,0 +1,67 @@ +/* + * Scratched by davidxu@freebsd.org + */ + +#include "jni.h" +#include "jni_util.h" +#include "jvm.h" +#include "jlong.h" +#include "nio_util.h" + +#include "sun_nio_ch_KqueuePort.h" + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif +JNIEXPORT void JNICALL +Java_sun_nio_ch_KqueuePort_socketpair + (JNIEnv *env, jclass cls, jintArray sv) +{ + int sp[2]; + if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) == -1) { + JNU_ThrowIOExceptionWithLastError(env, "socketpair failed"); + } else { + jint res[2]; + res[0] = (jint)sp[0]; + res[1] = (jint)sp[1]; + (*env)->SetIntArrayRegion(env, sv, 0, 2, &res[0]); + } +} + +JNIEXPORT void JNICALL Java_sun_nio_ch_KqueuePort_interrupt + (JNIEnv *env, jclass cls, jint fd) +{ + int res; + int buf[1]; + buf[0] = 1; + RESTARTABLE(write(fd, buf, 1), res); + if (res < 0) { + JNU_ThrowIOExceptionWithLastError(env, "write failed"); + } +} + +JNIEXPORT void JNICALL Java_sun_nio_ch_KqueuePort_drain1 + (JNIEnv *env, jclass cls, jint fd) +{ + int res; + char buf[1]; + RESTARTABLE(read(fd, buf, 1), res); + if (res < 0) { + JNU_ThrowIOExceptionWithLastError(env, "drain1 failed"); + } +} + +JNIEXPORT void JNICALL Java_sun_nio_ch_KqueuePort_close0 + (JNIEnv *env, jclass cls, jint fd) +{ + int res; + RESTARTABLE(close(fd), res); +} + +#ifdef __cplusplus +} +#endif Index: jdk/src/solaris/native/sun/nio/ch/Kqueue.c =================================================================== --- jdk/src/solaris/native/sun/nio/ch/Kqueue.c (revision 0) +++ jdk/src/solaris/native/sun/nio/ch/Kqueue.c (revision 16) @@ -0,0 +1,174 @@ +/* + * Scratched by davidxu@freebsd.org + */ + +#include "jni.h" +#include "jni_util.h" +#include "jvm.h" +#include "jlong.h" + +#include "sun_nio_ch_Kqueue.h" + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +static int +restartable_kevent(int kqfd, struct kevent *changelist, int nchanges, + struct kevent *eventlist, int nevents); + +static int +timeout_kevent(int kqfd, struct kevent *changelist, int nchanges, + struct kevent *eventlist, int nevents, int timo); + +JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_kqueue + (JNIEnv *env, jclass cls) +{ + int kqfd = kqueue(); + if (kqfd < 0) { + JNU_ThrowIOExceptionWithLastError(env, "Error opening kqueue"); + return -1; + } + return kqfd; +} + +JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_keventChange + (JNIEnv *env, jclass cls, jint kqfd, jint fd, jshort flags, jshort filter) +{ + struct kevent ev; + struct timespec ts; + + ev.ident = fd; + ev.flags = flags; + ev.filter = filter; + ev.fflags = 0; + ev.data = 0; + ev.udata = NULL; + ts.tv_sec = 0; + ts.tv_nsec = 0; + if (kevent(kqfd, &ev, 1, NULL, 0, &ts) < 0) + JNU_ThrowIOExceptionWithLastError(env, "Error changing kevent"); + return (0); +} + +JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_kevent + (JNIEnv *env, jclass cls, jint kqfd , jlong changelist_addr, jint nchanges, + jlong eventlist_addr, jint nevents, jlong timeout) +{ + struct kevent *changelist = (struct kevent *)jlong_to_ptr(changelist_addr); + struct kevent *eventlist = (struct kevent *)jlong_to_ptr(eventlist_addr); + int result; + + if (timeout < 0) { + result = restartable_kevent(kqfd, changelist, nchanges, + eventlist, nevents); + } else { + result = timeout_kevent(kqfd, changelist, nchanges, eventlist, + nevents, timeout); + } + + if (result < 0) { + JNU_ThrowIOExceptionWithLastError(env, "Error reading driver"); + return -1; + } + return result; +} + +static int +restartable_kevent(int kqfd, struct kevent *changelist, int nchanges, + struct kevent *eventlist, int nevents) +{ + int result; + + for (;;) { + result = kevent(kqfd, changelist, nchanges, eventlist, + nevents, NULL); + if (result == -1 && errno == EINTR) { + continue; + } else { + return result; + } + } +} + +static int +timeout_kevent(int kqfd, struct kevent *changelist, int nchanges, + struct kevent *eventlist, int nevents, int timo) +{ + struct timeval timeout, now, end; + int result; + + timeout.tv_sec = timo / 1000; + timeout.tv_usec = (timo % 1000) * 1000; + gettimeofday(&now, NULL); + timeradd(&now, &timeout, &end); + + for (;;) { + struct timespec ts; + + ts.tv_sec = timeout.tv_sec; + ts.tv_nsec = timeout.tv_usec * 1000; + result = kevent(kqfd, changelist, nchanges, eventlist, nevents, + &ts); + if (result == -1 && (errno == EINTR)) { + gettimeofday(&now, NULL); + if (timercmp(&now, &end, >=)) + return 0; + timersub(&end, &now, &timeout); + } else { + return result; + } + } +} + +JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_keventSize + (JNIEnv *env, jclass cls) +{ + return sizeof(struct kevent); +} + +JNIEXPORT void JNICALL Java_sun_nio_ch_Kqueue_putKevent + (JNIEnv *env, jclass cls, jlong address, jint index, jint fd, jshort flags, jshort filter) +{ + struct kevent *ev = (struct kevent *)jlong_to_ptr(address); + + ev[index].ident = fd; + ev[index].flags = flags; + ev[index].filter = filter; + ev[index].fflags = 0; + ev[index].data = 0; + ev[index].udata = NULL; +} + +JNIEXPORT jshort JNICALL Java_sun_nio_ch_Kqueue_getKeventFilter + (JNIEnv *env, jclass cls, jlong address, jint index) +{ + struct kevent *ev = (struct kevent *)jlong_to_ptr(address); + + return ev[index].filter; +} + +JNIEXPORT jshort JNICALL Java_sun_nio_ch_Kqueue_getKeventFlags + (JNIEnv *env, jclass cls, jlong address, jint index) +{ + struct kevent *ev = (struct kevent *)jlong_to_ptr(address); + + return ev[index].flags; +} + +JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_getKeventIdent + (JNIEnv *env, jclass cls, jlong address, jint index) +{ + struct kevent *ev = (struct kevent *)jlong_to_ptr(address); + + return (int)ev[index].ident; +} + +#ifdef __cplusplus +} +#endif Index: jdk/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c =================================================================== --- jdk/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c (revision 0) +++ jdk/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c (revision 16) @@ -0,0 +1,30 @@ +/* + * Scratched by davidxu@freebsd.org + */ + +#include "jni.h" +#include "jni_util.h" +#include "jvm.h" +#include "jlong.h" +#include "nio_util.h" + +#include "sun_nio_ch_KqueueArrayWrapper.h" + +#ifdef __cplusplus +extern "C" { +#endif + +JNIEXPORT void JNICALL Java_sun_nio_ch_KqueueArrayWrapper_interrupt + (JNIEnv *env, jclass cls, jint fd) +{ + int fakebuf[1]; + fakebuf[0] = 1; + if (write(fd, fakebuf, 1) < 0) { + JNU_ThrowIOExceptionWithLastError(env, + "Write to interrupt fd failed"); + } +} + +#ifdef __cplusplus +} +#endif Index: jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java (revision 1) +++ jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java (revision 16) @@ -47,6 +47,10 @@ public static SelectorProvider create() { String osname = AccessController.doPrivileged( new GetPropertyAction("os.name")); + if ("FreeBSD".equals(osname)) { + return new sun.nio.ch.KqueueSelectorProvider(); + } + if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } Index: jdk/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java (revision 0) +++ jdk/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java (revision 16) @@ -0,0 +1,204 @@ +/* + * scratched by davidxu@freebsd.org + */ + +package sun.nio.ch; + +import java.io.IOException; +import java.nio.channels.*; +import java.nio.channels.spi.*; +import java.util.*; +import sun.misc.*; + + +/** + * An implementation of Selector for FreeBSD. + */ +class KqueueSelectorImpl + extends SelectorImpl +{ + + // File descriptors used for interrupt + protected int fd0; + protected int fd1; + + // The kqueue object + KqueueArrayWrapper kqueueWrapper; + + // The number of valid channels in this Selector's poll array + private int totalChannels; + + // Maps from file descriptors to keys + private HashMap fdToKey; + + // True if this Selector has been closed + private boolean closed = false; + + // Lock for interrupt triggering and clearing + private Object interruptLock = new Object(); + private boolean interruptTriggered = false; + + private BitSet updatedSet; + + /** + * Package private constructor called by factory method in + * the abstract superclass Selector. + */ + KqueueSelectorImpl(SelectorProvider sp) { + super(sp); + int[] fdes = new int[2]; + IOUtil.initPipe(fdes, false); + fd0 = fdes[0]; + fd1 = fdes[1]; + kqueueWrapper = new KqueueArrayWrapper(); + totalChannels = 1; + kqueueWrapper.initInterrupt(fd0, fd1); + updatedSet = new BitSet(); + fdToKey = new HashMap(); + } + + protected int doSelect(long timeout) + throws IOException + { + if (closed) + throw new ClosedSelectorException(); + processDeregisterQueue(); + try { + begin(); + kqueueWrapper.poll(timeout); + } finally { + end(); + } + processDeregisterQueue(); + int numKeysUpdated = updateSelectedKeys(); + if (kqueueWrapper.interrupted()) { + // Clear the wakeup pipe + synchronized (interruptLock) { + kqueueWrapper.clearInterrupted(); + IOUtil.drain(fd0); + interruptTriggered = false; + } + } + return numKeysUpdated; + } + + /** + * Update the keys whose fd's have been selected by the kqueue. + * Add the ready keys to the ready queue. + */ + private int updateSelectedKeys() { + int entries = kqueueWrapper.updated; + int numKeysUpdated = 0; + SelectionKeyImpl ski; + int fd; + int i; + + updatedSet.clear(); + for (i = 0; i < entries; i++) { + fd = kqueueWrapper.getDescriptor(i); + ski = (SelectionKeyImpl) fdToKey.get(new Integer(fd)); + // ski is null in the case of an interrupt + if (ski != null) + ski.nioReadyOps(0); + } + + for (i = 0; i < entries; i++) { + fd = kqueueWrapper.getDescriptor(i); + ski = (SelectionKeyImpl) fdToKey.get(new Integer(fd)); + // ski is null in the case of an interrupt + if (ski != null) { + int rOps = kqueueWrapper.getReventOps(i); + if (selectedKeys.contains(ski)) { + if (ski.channel.translateAndUpdateReadyOps(rOps, ski)) { + if (!updatedSet.get(fd)) { + updatedSet.set(fd); + numKeysUpdated++; + } + } + } else { + ski.channel.translateAndSetReadyOps(rOps, ski); + if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { + selectedKeys.add(ski); + if (!updatedSet.get(fd)) { + updatedSet.set(fd); + numKeysUpdated++; + } + } + } + } + } + return numKeysUpdated; + } + + protected void implClose() throws IOException { + if (!closed) { + closed = true; + FileDispatcherImpl.closeIntFD(fd0); + FileDispatcherImpl.closeIntFD(fd1); + if (kqueueWrapper != null) { + kqueueWrapper.release(fd0); + kqueueWrapper.closeKqueueFD(); + kqueueWrapper = null; + selectedKeys = null; + + // Deregister channels + Iterator i = keys.iterator(); + while (i.hasNext()) { + SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); + deregister(ski); + SelectableChannel selch = ski.channel(); + if (!selch.isOpen() && !selch.isRegistered()) + ((SelChImpl)selch).kill(); + i.remove(); + } + totalChannels = 0; + + } + fd0 = -1; + fd1 = -1; + } + } + + protected void implRegister(SelectionKeyImpl ski) { + int fd = IOUtil.fdVal(ski.channel.getFD()); + fdToKey.put(new Integer(fd), ski); + totalChannels++; + keys.add(ski); + } + + protected void implDereg(SelectionKeyImpl ski) throws IOException { + int i = ski.getIndex(); + assert (i >= 0); + int fd = ski.channel.getFDVal(); + fdToKey.remove(new Integer(fd)); + kqueueWrapper.release(fd); + totalChannels--; + ski.setIndex(-1); + keys.remove(ski); + selectedKeys.remove(ski); + deregister((AbstractSelectionKey)ski); + SelectableChannel selch = ski.channel(); + if (!selch.isOpen() && !selch.isRegistered()) + ((SelChImpl)selch).kill(); + } + + void putEventOps(SelectionKeyImpl sk, int ops) { + int fd = IOUtil.fdVal(sk.channel.getFD()); + kqueueWrapper.setInterest(fd, ops); + } + + public Selector wakeup() { + synchronized (interruptLock) { + if (!interruptTriggered) { + kqueueWrapper.interrupt(); + interruptTriggered = true; + } + } + return this; + } + + static { + Util.load(); + } + +} Index: jdk/src/solaris/classes/sun/nio/ch/Kqueue.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/Kqueue.java (revision 0) +++ jdk/src/solaris/classes/sun/nio/ch/Kqueue.java (revision 16) @@ -0,0 +1,49 @@ +/* + * Scratched by davidxu@freebsd.org + */ + +package sun.nio.ch; + +import java.io.IOException; +import sun.misc.Unsafe; + +class Kqueue { + // Kevent filters + static final short EVFILT_READ = -1; + static final short EVFILT_WRITE = -2; + + // Kevent flags + static final short EV_ADD = 0x0001; + static final short EV_DELETE = 0x0002; + static final short EV_ONESHOT = 0x0010; + static final short EV_ERROR = 0x4000; + + private static final Unsafe unsafe = Unsafe.getUnsafe(); + static final int SIZEOF_KEVENT = keventSize(); + + private Kqueue() {} + + /** + * Allocates a poll array to handle up to {@code count} events. + */ + static long allocatePollArray(int count) { + return unsafe.allocateMemory(count * SIZEOF_KEVENT); + } + + /** + * Free a poll array + */ + static void freePollArray(long address) { + unsafe.freeMemory(address); + } + + static native int kqueue(); + static native int keventChange(int kqfd, int fd, short flags, short filter); + static native int kevent(int kqfd, long changeList, int nchanges, long eventList, + int nevents, long timeout); + static native int keventSize(); + static native void putKevent(long address, int index, int fd, short flag, short filter); + static native short getKeventFilter(long address, int index); + static native short getKeventFlags(long address, int index); + static native int getKeventIdent(long address, int index); +} Index: jdk/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java (revision 0) +++ jdk/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java (revision 16) @@ -0,0 +1,211 @@ +/* + * Scratched by davidxu@freebsd.org + */ + +package sun.nio.ch; + +import sun.misc.*; +import java.io.IOException; +import java.util.HashMap; +import java.util.Set; +import java.util.Arrays; +import static sun.nio.ch.Kqueue.*; + +class KqueueArrayWrapper { + + // Event masks copied from class AbstractPollArrayWrapper + static final short POLLIN = 0x0001; + static final short POLLOUT = 0x0004; + static final short POLLERR = 0x0008; + static final short POLLHUP = 0x0010; + static final short POLLNVAL = 0x0020; + static final short POLLREMOVE = 0x0800; + + // Zero mask to unregister events from kqueue + static final Integer ZERO_MASK = new Integer(0); + + // Capacity increment of some arrays + static final int capacityIncr = 100; + + KqueueArrayWrapper() { + int allocationSize; + + // initial size of event array + pollKeventSize = capacityIncr * 2; + allocationSize = pollKeventSize * SIZEOF_KEVENT; + pollKeventArray = new AllocatedNativeObject(allocationSize, true); + kqfd = kqueue(); + } + + // Machinery for remembering fd registration changes + private HashMap updateMap = new HashMap(); + private int[] oldMasks = new int[capacityIncr]; + + // kevent array to receive + private AllocatedNativeObject pollKeventArray; + + // current size of pollKeventArray + int pollKeventSize; + + // the pollKeventSize should be larger than this + int nextKeventSize; + + // The fd of the kqueue() + int kqfd; + + // The fd of the interrupt line going out + int outgoingInterruptFD; + + // The fd of the interrupt line coming in + int incomingInterruptFD; + + // The index of the interrupt FD + int interruptedIndex; + + // Number of updated kevent entries + int updated; + + // ensure some array sizes are large enough with a given file handle + void ensureFd(int fd) { + ensureNextEventFd(fd); + if (oldMasks.length < fd+1) + oldMasks = Arrays.copyOf(oldMasks, fd+capacityIncr); + } + + void ensureNextEventFd(int fd) { + // each file handle may have two filters, read and write. + if (nextKeventSize / 2 < fd+1) + nextKeventSize = (fd+1) * 2; + } + + void resizeEventBuffer() { + if (nextKeventSize > pollKeventSize) { + pollKeventArray.free(); + pollKeventSize = nextKeventSize + capacityIncr * 2; + int allocationSize = pollKeventSize * SIZEOF_KEVENT; + pollKeventArray = new AllocatedNativeObject(allocationSize, true); + } + } + + void initInterrupt(int fd0, int fd1) { + outgoingInterruptFD = fd1; + incomingInterruptFD = fd0; + ensureFd(fd0); + keventChange(kqfd, fd0, EV_ADD, EVFILT_READ); + } + + int getReventOps(int i) { + short filter = getKeventFilter(pollKeventArray.address(), i); + short flags = getKeventFlags(pollKeventArray.address(), i); + if ((flags & EV_ERROR) != 0) + return POLLERR; + if (filter == EVFILT_READ) + return POLLIN; + if (filter == EVFILT_WRITE) + return POLLOUT; + return (0); + } + + int getDescriptor(int i) { + return getKeventIdent(pollKeventArray.address(), i); + } + + void setInterest(int fd, int mask) { + if (fd <0) + throw new IllegalArgumentException("file handle less than 0"); + synchronized (updateMap) { + ensureFd(fd); + updateMap.put(new Integer(fd), new Integer(mask)); + } + } + + void release(int fd) { + synchronized (updateMap) { + updateMap.put(new Integer(fd), ZERO_MASK); + } + } + + void closeKqueueFD() throws IOException { + FileDispatcherImpl.closeIntFD(kqfd); + pollKeventArray.free(); + } + + int poll(long timeout) { + int changeCount = updateRegistrations(); + updated = kevent(kqfd, pollKeventArray.address(), changeCount, + pollKeventArray.address(), pollKeventSize, timeout); + for (int i = 0; i < updated; i++) { + if (getDescriptor(i) == incomingInterruptFD) { + interruptedIndex = i; + interrupted = true; + break; + } + } + return updated; + } + + int updateRegistrations() { + int index = 0; + synchronized (updateMap) { + resizeEventBuffer(); + + Set s = updateMap.keySet(); + /* + * Because resizeEventBuffer may reallocate event buffer, + * we must retrieve fresh address here. + */ + long address = pollKeventArray.address(); + + for (Integer fd : s) { + Integer newmask = updateMap.get(fd); + int oldmask = oldMasks[fd]; + if ((oldmask & POLLIN) != 0) { + if ((newmask & POLLIN) == 0) { + putKevent(address, index, fd.intValue(), EV_DELETE, EVFILT_READ); + index++; + } + } else { + if ((newmask & POLLIN) != 0) { + putKevent(address, index, fd.intValue(), EV_ADD, EVFILT_READ); + index++; + } + } + + if ((oldmask & POLLOUT) != 0) { + if ((newmask & POLLOUT) == 0) { + putKevent(address, index, fd.intValue(), EV_DELETE, EVFILT_WRITE); + index++; + } + } else { + if ((newmask & POLLOUT) != 0) { + putKevent(address, index, fd.intValue(), EV_ADD, EVFILT_WRITE); + index++; + } + } + oldMasks[fd] = newmask; + } + updateMap.clear(); + } + return index; + } + + boolean interrupted = false; + + public void interrupt() { + interrupt(outgoingInterruptFD); + } + + public int interruptedIndex() { + return interruptedIndex; + } + + boolean interrupted() { + return interrupted; + } + + void clearInterrupted() { + interrupted = false; + } + + private static native void interrupt(int fd); +} Index: jdk/src/solaris/classes/sun/nio/ch/DefaultAsynchronousChannelProvider.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/DefaultAsynchronousChannelProvider.java (revision 1) +++ jdk/src/solaris/classes/sun/nio/ch/DefaultAsynchronousChannelProvider.java (revision 16) @@ -46,6 +46,8 @@ public static AsynchronousChannelProvider create() { String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); + if (osname.equals("FreeBSD")) + return new FreeBSDAsynchronousChannelProvider(); if (osname.equals("SunOS")) return new SolarisAsynchronousChannelProvider(); if (osname.equals("Linux")) Index: jdk/src/solaris/classes/sun/nio/ch/KqueuePort.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/KqueuePort.java (revision 0) +++ jdk/src/solaris/classes/sun/nio/ch/KqueuePort.java (revision 16) @@ -0,0 +1,321 @@ +/* + * Scratched by davidxu@FreeBSD.org + */ + +package sun.nio.ch; + +import java.nio.channels.spi.AsynchronousChannelProvider; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import static sun.nio.ch.Kqueue.*; + +/** + * AsynchronousChannelGroup implementation based on the FreeBSD kqueue facility. + */ + +final class KqueuePort + extends Port +{ +/* + // Kevent filters + static final short EVFILT_READ = -1; + static final short EVFILT_WRITE = -2; + + // Kevent flags + static final char EV_ADD = 0x0001; + static final char EV_DELETE = 0x0002; + static final char EV_EOF = 0x8000; + static final char EV_ERROR = 0x4000; +*/ + + // maximum number of events to poll at a time + private static final int MAX_KEVENTS = 512; + + // kqueue file descriptor + private final int kqfd; + + // true if kqueue closed + private boolean closed; + + // socket pair used for wakeup + private final int sp[]; + + // number of wakeups pending + private final AtomicInteger wakeupCount = new AtomicInteger(); + + // address of the poll array passed to kevent() + private final long address; + + // encapsulates an event for a channel + static class Event { + final PollableChannel channel; + final int events; + + Event(PollableChannel channel, int events) { + this.channel = channel; + this.events = events; + } + + PollableChannel channel() { return channel; } + int events() { return events; } + } + + // queue of events for cases that a polling thread dequeues more than one + // event + private final ArrayBlockingQueue queue; + private final Event NEED_TO_POLL = new Event(null, 0); + private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); + + KqueuePort(AsynchronousChannelProvider provider, ThreadPool pool) + throws IOException + { + super(provider, pool); + + // open kqueue + this.kqfd = kqueue(); + + // create socket pair for wakeup mechanism + int[] sv = new int[2]; + try { + socketpair(sv); + // register one end with epoll + keventChange(kqfd, sv[0], EV_ADD, EVFILT_READ); + } catch (IOException x) { + close0(kqfd); + throw x; + } + this.sp = sv; + + // allocate the poll array + this.address = allocatePollArray(MAX_KEVENTS); + + // create the queue and offer the special event to ensure that the first + // threads polls + this.queue = new ArrayBlockingQueue(MAX_KEVENTS); + this.queue.offer(NEED_TO_POLL); + } + + KqueuePort start() { + startThreads(new EventHandlerTask()); + return this; + } + + /** + * Release all resources + */ + private void implClose() { + synchronized (this) { + if (closed) + return; + closed = true; + } + freePollArray(address); + close0(sp[0]); + close0(sp[1]); + close0(kqfd); + } + + private void wakeup() { + if (wakeupCount.incrementAndGet() == 1) { + // write byte to socketpair to force wakeup + try { + interrupt(sp[1]); + } catch (IOException x) { + throw new AssertionError(x); + } + } + } + + @Override + void executeOnHandlerTask(Runnable task) { + synchronized (this) { + if (closed) + throw new RejectedExecutionException(); + offerTask(task); + wakeup(); + } + } + + @Override + void shutdownHandlerTasks() { + /* + * If no tasks are running then just release resources; otherwise + * write to the one end of the socketpair to wakeup any polling threads. + */ + int nThreads = threadCount(); + if (nThreads == 0) { + implClose(); + } else { + // send interrupt to each thread + while (nThreads-- > 0) { + wakeup(); + } + } + } + + // invoke by clients to register a file descriptor + @Override + void startPoll(int fd, int events) { + + if ((events & POLLIN) != 0) + keventChange(kqfd, fd, (short)(EV_ONESHOT|EV_ADD), EVFILT_READ); + if ((events & POLLOUT) != 0) + keventChange(kqfd, fd, (short)(EV_ONESHOT|EV_ADD), EVFILT_WRITE); + } + + /* + * Task to process events from kevent and dispatch to the channel's + * onEvent handler. + * + * Events are retreived from kevent in batch and offered to a BlockingQueue + * where they are consumed by handler threads. A special "NEED_TO_POLL" + * event is used to signal one consumer to re-poll when all events have + * been consumed. + */ + private class EventHandlerTask implements Runnable { + private Event poll() throws IOException { + try { + for (;;) { + int n = kevent(kqfd, 0, 0, address, MAX_KEVENTS, -1); + /* + * 'n' events have been read. Here we map them to their + * corresponding channel in batch and queue n-1 so that + * they can be handled by other handler threads. The last + * event is handled by this thread (and so is not queued). + */ + fdToChannelLock.readLock().lock(); + try { + while (n-- > 0) { + int fd = getKeventIdent(address, n); + + // wakeup + if (fd == sp[0]) { + if (wakeupCount.decrementAndGet() == 0) { + // no more wakeups so drain pipe + drain1(sp[0]); + } + + // queue special event if there are more events + // to handle. + if (n > 0) { + queue.offer(EXECUTE_TASK_OR_SHUTDOWN); + continue; + } + return EXECUTE_TASK_OR_SHUTDOWN; + } + + PollableChannel channel = fdToChannel.get(fd); + if (channel != null) { + int events = getEvents(address, n); + Event ev = new Event(channel, events); + + // n-1 events are queued; This thread handles + // the last one except for the wakeup + if (n > 0) { + queue.offer(ev); + } else { + return ev; + } + } + } + } finally { + fdToChannelLock.readLock().unlock(); + } + } + } finally { + // to ensure that some thread will poll when all events have + // been consumed + queue.offer(NEED_TO_POLL); + } + } + + public void run() { + Invoker.GroupAndInvokeCount myGroupAndInvokeCount = + Invoker.getGroupAndInvokeCount(); + final boolean isPooledThread = (myGroupAndInvokeCount != null); + boolean replaceMe = false; + Event ev; + try { + for (;;) { + // reset invoke count + if (isPooledThread) + myGroupAndInvokeCount.resetInvokeCount(); + + try { + replaceMe = false; + ev = queue.take(); + + // no events and this thread has been "selected" to + // poll for more. + if (ev == NEED_TO_POLL) { + try { + ev = poll(); + } catch (IOException x) { + x.printStackTrace(); + return; + } + } + } catch (InterruptedException x) { + continue; + } + + // handle wakeup to execute task or shutdown + if (ev == EXECUTE_TASK_OR_SHUTDOWN) { + Runnable task = pollTask(); + if (task == null) { + // shutdown request + return; + } + // run task (may throw error/exception) + replaceMe = true; + task.run(); + continue; + } + + // process event + try { + ev.channel().onEvent(ev.events(), isPooledThread); + } catch (Error x) { + replaceMe = true; throw x; + } catch (RuntimeException x) { + replaceMe = true; throw x; + } + } + } finally { + // last handler to exit when shutdown releases resources + int remaining = threadExit(this, replaceMe); + if (remaining == 0 && isShutdown()) { + implClose(); + } + } + } + } + + static int getEvents(long address, int index) { + short filter = getKeventFilter(address, index); + short flags = getKeventFlags(address, index); + if ((flags & EV_ERROR) != 0) + return POLLERR; + if (filter == EVFILT_READ) + return POLLIN; + if (filter == EVFILT_WRITE) + return POLLOUT; + return (0); + } + + // -- Native methods -- + + private static native void socketpair(int[] sv) throws IOException; + + private static native void interrupt(int fd) throws IOException; + + private static native void drain1(int fd) throws IOException; + + private static native void close0(int fd); + + static { + Util.load(); + } +} Index: jdk/src/solaris/classes/sun/nio/ch/FreeBSDAsynchronousChannelProvider.java =================================================================== --- jdk/src/solaris/classes/sun/nio/ch/FreeBSDAsynchronousChannelProvider.java (revision 0) +++ jdk/src/solaris/classes/sun/nio/ch/FreeBSDAsynchronousChannelProvider.java (revision 16) @@ -0,0 +1,75 @@ + +package sun.nio.ch; + +import java.nio.channels.*; +import java.nio.channels.spi.AsynchronousChannelProvider; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.net.ProtocolFamily; +import java.io.IOException; + +public class FreeBSDAsynchronousChannelProvider + extends AsynchronousChannelProvider +{ + private static volatile KqueuePort defaultPort; + + private KqueuePort defaultEventPort() throws IOException { + if (defaultPort == null) { + synchronized (FreeBSDAsynchronousChannelProvider.class) { + if (defaultPort == null) { + defaultPort = new KqueuePort(this, ThreadPool.getDefault()).start(); + } + } + } + return defaultPort; + } + + public FreeBSDAsynchronousChannelProvider() { + } + + @Override + public AsynchronousChannelGroup openAsynchronousChannelGroup(int nThreads, ThreadFactory factory) + throws IOException + { + return new KqueuePort(this, ThreadPool.create(nThreads, factory)).start(); + } + + @Override + public AsynchronousChannelGroup openAsynchronousChannelGroup(ExecutorService executor, int initialSize) + throws IOException + { + return new KqueuePort(this, ThreadPool.wrap(executor, initialSize)).start(); + } + + private Port toPort(AsynchronousChannelGroup group) throws IOException { + if (group == null) { + return defaultEventPort(); + } else { + if (!(group instanceof KqueuePort)) + throw new IllegalChannelGroupException(); + return (Port)group; + } + } + + @Override + public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup group) + throws IOException + { + return new UnixAsynchronousServerSocketChannelImpl(toPort(group)); + } + + @Override + public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group) + throws IOException + { + return new UnixAsynchronousSocketChannelImpl(toPort(group)); + } + + @Override + public AsynchronousDatagramChannel openAsynchronousDatagramChannel(ProtocolFamily family, + AsynchronousChannelGroup group) + throws IOException + { + return new SimpleAsynchronousDatagramChannelImpl(family, toPort(group)); + } +} Index: jdk/make/java/nio/Makefile =================================================================== --- jdk/make/java/nio/Makefile (revision 1) +++ jdk/make/java/nio/Makefile (revision 16) @@ -264,7 +264,12 @@ ifeq ($(PLATFORM), bsd) FILES_java += \ sun/nio/ch/AbstractPollSelectorImpl.java \ + sun/nio/ch/FreeBSDAsynchronousChannelProvider.java \ sun/nio/ch/InheritedChannel.java \ + sun/nio/ch/Kqueue.java \ + sun/nio/ch/KqueueArrayWrapper.java \ + sun/nio/ch/KqueueSelectorProvider.java \ + sun/nio/ch/KqueueSelectorImpl.java \ sun/nio/ch/PollSelectorProvider.java \ sun/nio/ch/PollSelectorImpl.java \ sun/nio/ch/Port.java \ @@ -299,6 +304,9 @@ FILES_c += \ InheritedChannel.c \ + Kqueue.c \ + KqueueArrayWrapper.c \ + KqueuePort.c \ NativeThread.c \ PollArrayWrapper.c \ UnixAsynchronousServerSocketChannelImpl.c \ @@ -311,6 +319,9 @@ FILES_export += \ sun/nio/ch/InheritedChannel.java \ + sun/nio/ch/Kqueue.java \ + sun/nio/ch/KqueueArrayWrapper.java \ + sun/nio/ch/KqueuePort.java \ sun/nio/ch/NativeThread.java \ sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java \ sun/nio/ch/UnixAsynchronousSocketChannelImpl.java \