Sunday, July 15, 2012

GPars DataflowQueues

//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //

//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //

import static groovyx.gpars.dataflow.Dataflow.operator
import groovyx.gpars.dataflow.*


def _checkout = { String a ->
    Thread.sleep( (Long)Math.random() * 5 ); // a bit of random delay for fun
    println ">checking out...$a"
    return "checkout-$a"
}
def _compileSources = { a ->
    println ">compiling...$a"
    return "compileSources-$a"
}
def _generateAPIDoc = { a ->
    println ">api Docs...$a"
    return "generateAPIDoc-$a"
}
def _generateUserDocumentation = { a ->
    println ">user Docs...$a"
    return "generateUserDocumentation-$a"
}
def _packageProject = { project, api, doc ->
    println ">packaging...$project $api $doc"
    return "packageProject-$project $api $doc"
}
def _deploy = { a ->
    println ">deploying...$a"
    return (!!Math.round(Math.random()))
}


/* We need a Broadcaster and Queues to wire elements together */
def checkedOutProjects_B = new DataflowBroadcast()
def urls_Q = new DataflowQueue()
def compiledProjects_Q = new DataflowQueue()
def apiDocs_Q = new DataflowQueue()
def userDocs_Q = new DataflowQueue()
def packages_Q = new DataflowQueue()
def done_Q = new DataflowQueue()



/* Here's the composition of individual build steps into a process */
operator(inputs: [urls_Q], outputs: [checkedOutProjects_B], maxForks: 3) { url ->
    bindAllOutputs _checkout(url)
}
operator([checkedOutProjects_B.createReadChannel()], [compiledProjects_Q]) { projectRoot ->
    bindOutput _compileSources(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), apiDocs_Q) { projectRoot ->
    bindOutput _generateAPIDoc(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), userDocs_Q) { projectRoot ->
    bindOutput _generateUserDocumentation(projectRoot)
}
operator([compiledProjects_Q, apiDocs_Q, userDocs_Q], [packages_Q]) { project, api, doc ->
    bindOutput _packageProject(project, api, doc)
}

def deployer = operator(packages_Q, done_Q) { packagedProject ->
    boolean ok = _deploy(packagedProject)
    println "! Deployed? $ok"
    bindOutput (ok)
}


/* add data, start the machine a rollin! */
5.times { urls_Q << "url #$it" }
5.times { urls_Q << "url #${++it*5}" }


/* Now we're set up, and can just wait for the build to finish */
println "==Starting the build process. This line MIGHT NOT be printed out first ...=="

//deployer.join()  //Wait for the last operator in the network to finish??

No comments:

Post a Comment