#!/usr/bin/env python

# Improved build scheduler.  We try to build leaf packages (those
# which can be built immediately without requiring additional
# dependencies to be built) in the order such that the ones required
# by the longest dependency chains are built first.
#
# This has the effect of favouring deep parts of the package tree and
# evening out the depth over time, hopefully avoiding the situation
# where the entire cluster waits for a deep part of the tree to
# build on a small number of machines
#
# Other advantages are that this system is easily customizable and
# will let us customize things like the matching policy of jobs to
# machines.
#
# TODO:
# * Fix indexfile naming
# * External queue manager (in progress)
# * Mark completed packages instead of deleting them
# * check mtime for package staleness (cf make)
# * Check for parent mtimes after finishing child
#
# XXX note added: this was the first nontrivial python code I wrote,
# so it needs another pass redoing some things properly

import os, sys, threading, time, subprocess, fcntl, operator
#from itertools import ifilter, imap
from random import choice

def parseindex(indexfile):

    tmp={}
    pkghash={}
    for i in file(indexfile):
        line=i.rstrip().split("|")
        pkg = line[0]
        tmp[pkg] = line[1:]

        # XXX hash category names too

        # Trick python into storing package names by reference instead of copying strings and wasting 60MB
        pkghash[pkg] = pkg

    index=dict.fromkeys(tmp.keys())
    for pkg in tmp.iterkeys():
        line = tmp[pkg]
        data={'name': pkg, 'path':line[0],
                #'prefix':line[1],
                #'comment':line[2],
                #'descr':line[3],
                #'maintainer':line[4],
                'categories':line[5], # XXX duplicates strings
                'bdep':[pkghash[i] for i in line[6].split(None)],
                'rdep':[pkghash[i] for i in line[7].split(None)],
                #'www':line[8],
                'edep':[pkghash[i] for i in line[9].split(None)],
                'pdep':[pkghash[i] for i in line[10].split(None)],
                'fdep':[pkghash[i] for i in line[11].split(None)],
                'height':None}
        if index[pkg] is None:
            index[pkg] = data
        else:
            index[pkg].update(data)
        if not index[pkg].has_key('parents'):
            index[pkg]['parents'] = []

        # XXX iter?
        deps=set()
        for j in ['bdep','rdep','edep','fdep','pdep']:
            deps.update(set(index[pkg][j]))
        index[pkg]['deps'] = [pkghash[i] for i in deps]

        for j in deps:
            # This grossness is needed to avoid a second pass through
            # the index, because we might be about to refer to
            # packages that have not yet been processed
            # XXX just use setdefault() and get rid of has_key in favour of try
            if index[j] is not None:
                if index[j].has_key('parents'):
                    index[j]['parents'].append(pkghash[pkg])
                else:
                    index[j]['parents'] = [pkghash[pkg]]
            else:
                index[j] = {'parents':[pkghash[pkg]]}

    return index

def gettargets(index, targets):
    """ split command line arguments into list of packages to build.  Returns set or iterable """
    # XXX make this return the full recursive list and use this later for processing wqueue

    plist = set()
    if len(targets) == 0:
        targets = ["all"]
    for i in targets:
        if i == "all":
            plist = index.iterkeys()
            break
        if i.endswith("-all"):
            cat = i.rpartition("-")[0]
            plist.update(j for j in index.iterkeys() if cat in index[j]['categories'])
        elif i.rstrip(".tbz") in index.iterkeys():
            plist.update([i.rstrip(".tbz")])

    return plist

def heightindex(index, targets):
    """ Initial population of height tree """

    for i in targets:
        heightdown(index, i)

def heightdown(index, pkgname):
    """
    Recursively populate the height tree down from a given package,
    assuming empty values on entries not yet visited
    """

    pkg=index[pkgname]
    if pkg['height'] is None:
        if len(pkg['deps']) > 0:
            max = 0
            for i in pkg['deps']:
                w = heightdown(index, i)
                if w > max:
                    max = w
            pkg['height'] = max + 1
        else:
            pkg['height'] = 1
    return pkg['height']

def heightup(index, pkgname):
    """ Recalculate the height tree going upwards from a package """

    if not index.has_key(pkgname):
        raise KeyError

    parents=set(index[pkgname]['parents'])

    while len(parents) > 0:
        # XXX use a deque?
        pkgname = parents.pop()
        if not index.has_key(pkgname):
            # XXX can this happen?
            continue
        pkg=index[pkgname]
        oldheight=pkg['height']
        if oldheight is None:
            # Parent is in our build target list
            continue
        if len(pkg['deps']) == 0:
            newheight = 1
        else:
            newheight=max(index[j]['height'] for j in pkg['deps']) + 1
        if newheight > oldheight:
            print "%s height increasing: %d -> %d", pkg, oldheight, newheight
            assert(False)
        if newheight != oldheight:
            pkg['height'] = newheight
            parents.update(pkg['parents'])

def deleteup(index, pkgname):
    if not index.has_key(pkgname):
        raise KeyError

    parents=set([pkgname])

    children=[]
    removed=[]
    while len(parents) > 0:
        pkgname = parents.pop()
        if not index.has_key(pkgname):
            # Parent was already deleted via another path
            # XXX can happen?
            continue
        if index[pkgname]['height'] is None:
            # parent is not in our list of build targets
            continue
        pkg=index[pkgname]

        children.extend(pkg['deps'])
        parents.update(pkg['parents'])
        removed.append(pkgname)
        del index[pkgname]

    removed = set(removed)
    children = set(children)
#    print "Removed %d packages, touching %d children" % (len(removed), len(children))

    for i in children.difference(removed):
        par=index[i]['parents']
        index[i]['parents'] = list(set(par).difference(removed))

# XXX return an iter?
def selectheights(index, level):
    return [i for i in index.iterkeys() if index[i]['height'] == level]

def rank(index, ready, sortd, max = None):
    """ rank the list of ready packages according to those listed as
    dependencies in successive entries of the sorted list """

    input=set(ready)
    output = []
    count = 0
    print "Working on depth ",
    for i in sortd:
        deps = set(index[i]['deps'])
        both = deps.intersection(input)
        if len(both) > 0:
            print "%d " % index[i]['height'],
            input.difference_update(both)
            output.extend(list(both))
            if len(input) == 0:
                break
        if max:
            count+=len(both)
            if count > max:
                return output
    print
    output.extend(list(input))

    return output

def jobsuccess(index, job):

    pkg = index[job]
    # Build succeeded
    for i in pkg['parents']:
        index[i]['deps'].remove(job)

    # deps/parents tree now partially inconsistent but this is
    # what we need to avoid counting the height of the entry
    # we are about to remove (which would make it a NOP)
    heightup(index, job)

    del index[job]

def jobfailure(index, job):

    # Build failed
    deleteup(index, job)
    
class worker(threading.Thread):

    lock = threading.Lock()

    # List of running threads
    tlist = []

    # List of running jobs
    running = []

    # Used to signal dispatcher when we finish a job
    event = threading.Event()

    def __init__(self, mach, job, queue, arch, branch):
        threading.Thread.__init__(self) 
        self.job = job
        self.mach = mach
        self.queue = queue
        self.arch = arch
        self.branch = branch

    def run(self): 
        global index

        pkg = index[self.job]

        if len(pkg['deps']) != 0:
            print "Running job with non-empty deps: %s" % pkg
            assert(False)

        print "Running job %s" % (self.job)
        while True:
            retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']])
            self.queue.release(self.mach)
            if retcode != 254:
                break

            # Failed to obtain job slot
            time.sleep(15)
            (self.mach, dummy) = self.queue.pick()
            print "Retrying on %s" % self.mach

        print "Finished job %s" % self.job,

        if retcode == 0:
            status = True
            print
        else:
            status = False
            print " with status %d" % retcode

        worker.lock.acquire()
        worker.running.remove(self.job)
        worker.tlist.remove(self)
        if status == True:
            jobsuccess(index, self.job)
        else:
            jobfailure(index, self.job)

        # Wake up dispatcher in case it was blocked
        worker.event.set()
        worker.event.clear()

        worker.lock.release()

    @staticmethod
    def dispatch(mach, job, queue, arch, branch):
        worker.lock.acquire()
        wrk = worker(mach, job, queue, arch, branch)
        worker.tlist.append(wrk)
        worker.lock.release()
        wrk.start()

class machqueue(object):
    # XXX this will all be punted into the external queue manager

    path = '';
    fd = -1;

    # fcntl locks are per-process, so the fcntl lock acquisition will
    # succeed if another thread already holds it.  We need the fcntl
    # lock for external visibility between processes but also need an
    # internal lock for protecting against out own threads.
    ilock = threading.Lock()

    def __init__(self, path):
        super(machqueue, self).__init__()
        self.path = path
        self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT)
        
#        print "Initializing with %s %d" % (self.path, self.fd)

    def lock(self):
        print "Locking...",
#        ret = fcntl.lockf(self.fd, fcntl.LOCK_EX)
        self.ilock.acquire()
        print "success"

    def unlock(self):
        print "Unlocking fd"
        self.ilock.release()
#        ret = fcntl.lockf(self.fd, fcntl.LOCK_UN)

    def poll(self):
        """ Return currently available machines """

        mfile = file(self.path + "../mlist", "r")
        mlist = mfile.readlines()
        mfile.close()
        mlist = [i.rstrip() for i in mlist] # Chop \n

        list = os.listdir(self.path)
        special = []
        machines = []
        for i in list:
            if i.startswith('.'):
                special.append(i)
            else:
                if i in mlist:
                    machines.append(i)
                else:
                    os.unlink(self.path + i)

        print "Found machines %s" % machines
        return (machines, special)

    def pick(self):
        """ Choose a random machine from the queue """

        min = 999
        while min == 999:
            while True:
                self.lock()
                (machines, special) = self.poll()
                if len(machines):
                    break
                else:
                    self.unlock()
                    time.sleep(15)
                    # XXX Use kqueue to monitor for changes

            list = []
            # XXX Choose as fraction of capacity
            for i in machines:
                f = file(self.path + i, "r")
                out = f.readline().rstrip()
                try:
                    load = int(out)
                except ValueError:
                    print "Bad value for %s: %s" % (i, out)
                    load = 999
                f.close()
                if load < min:
                    min = load
                    list=[]
                if load == min:
                    list.append(i)
            print "(%s, %d)" % (list, load)

            if min == 999:
                print "Bad queue length for %s" % list
                self.unlock()

        machine = choice(list)
        # XXX hook up config files
        if min == 11:
            # Queue full
            os.unlink(self.path + machine)
        else:
            f = file(self.path + machine, "w")
            f.write("%d\n" % (min + 1))
            f.flush()
            f.close()

        self.unlock()
        return (machine, special)

    def release(self, mach):
        self.lock()
        print "Releasing %s" % mach,
        if os.path.exists(self.path + mach):
            f = file(self.path + mach, "r+")
            out = f.readline().rstrip()
            try:
                load = int(out)
            except ValueError:
                print "Queue error on release of %s: %s" % (mach, out)
                load = 12 #XXX
        else:
            f = file(self.path + mach, "w")
            load = 12 #XXX

#        f.truncate(0)
        f.write("%d\n" % (load - 1))
        print "...now %d" % (load - 1)
        f.flush()
        f.close()
        self.unlock()

def main(arch, branch, args):
    global index

    basedir="/var/portbuild/"+arch+"/"+branch
    portsdir=basedir+"/ports"

    # XXX do this properly
    if branch == "7-exp":
        sufx = "7"
    elif branch == "6-exp":
        sufx = "6"
    else:
        sufx = branch

    indexfile=portsdir+"/INDEX-"+sufx

    qlen = 100

    q = machqueue("/var/portbuild/%s/queue/" % arch)

    print "parseindex..."
    index=parseindex(indexfile)
    print "length = %s" % len(index)

    targets = gettargets(index, args)

    print "heightindex..."
    heightindex(index, targets)

    sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
    wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen)

    # Main work loop
    while len(sortd) > 0:
        worker.lock.acquire()
        print "Remaining %s" % len(sortd)
        while len(wqueue) > 0:
            job = wqueue.pop(0)

            if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)):
                print "Skipping %s since it already exists" % job
                jobsuccess(index, job)
            else:
                worker.running.append(job) # Protect against a queue
                                           # rebalance adding this
                                           # back during build
                worker.lock.release()
                (machine, specials) = q.pick()
                worker.dispatch(machine, job, q, arch, branch)
                worker.lock.acquire()

        if len(wqueue) == 0:
            if len(sortd) == 0:
                # All jobs in progress, wait for children to exit
                break
            print "Rebalancing queue...",
            sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
            if len(sortd) == 0:
                break

            print sortd[0:3]
            if sortd[0][0] == 1:
                # Everything left is depth 1, no need to waste time rebalancing further
                qlen = len(index)

            # Don't add too many deps at once (e.g. after we build a
            # package like gmake), or we will switch to building lots
            # of shallow packages
            ready = [i for i in selectheights(index, 1) if i not in worker.running]
            wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen]
            print "now %s (%s ready)" % (wqueue, len(ready))

        worker.lock.release()

        if len(wqueue) == 0:
            # Ran out of work, wait for workers to free up some more
            print "No work to do, sleeping on workers"
            worker.event.wait()

    for i in worker.tlist:
        i.join()

    print "Finished"

if __name__ == "__main__":
#    from guppy import hpy; h = hpy()

    main(sys.argv[1], sys.argv[2], sys.argv[3:])

#    index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7")
#    print index['gmake-3.81_2']

